From 3da6238ac92aa4f2ec13648f08a5e6ceded6b255 Mon Sep 17 00:00:00 2001 From: Harry Mellor <19981378+hmellor@users.noreply.github.com> Date: Wed, 29 Jan 2025 17:44:41 +0000 Subject: [PATCH] Add `pre-commit` based linting and formatting (#35) * Add pre-commit workflow * Add actionlint * Add generic hooks * Add black, isort, shellcheck * Add requirements and markdown linting * Add toml * Add Dockerfile * Add codespell * Use Node.js version of `markdownlint` * Add `requirements-lint.txt` * Use CLI version of Node.js `markdownlint` * Add `pre-commit` instructions to `Contributing` * `pre-commit run -a` automatic fixes * Exclude helm templates from `check-yaml` * Comment hooks that require installed tools * Make `codespell` happy * Make `actionlint` happy * Disable `shellcheck` until it can be installed properly * Make `markdownlint` happy * Add note about running pre-commit --------- Signed-off-by: Harry Mellor <19981378+hmellor@users.noreply.github.com> --- .github/workflows/ci.yml | 2 +- .github/workflows/helm-lint.yml | 1 - .github/workflows/helm-release.yml | 5 +- .github/workflows/matchers/actionlint.json | 17 ++ .github/workflows/pre-commit.yml | 17 ++ .markdownlint.yaml | 5 + .pre-commit-config.yaml | 45 ++++ README.md | 27 +- helm/README.md | 6 +- helm/ct.yaml | 2 +- helm/lintconf.yaml | 2 +- helm/templates/deployment-vllm-multi.yaml | 10 +- helm/templates/role.yaml | 1 - helm/templates/serviceaccount.yaml | 1 - helm/test.sh | 2 +- helm/values.schema.json | 3 +- helm/values.yaml | 20 +- observability/README.md | 2 +- observability/upgrade.sh | 1 - observability/values.yaml | 4 +- observability/vllm-dashboard.json | 2 +- pyproject.toml | 2 + requirements-lint.txt | 1 + requirements-test.txt | 2 +- setup.py | 4 +- src/tests/README.md | 4 +- src/tests/perftest/fake-openai-server.py | 151 ++++++----- src/tests/perftest/request_generator.py | 61 +++-- src/tests/requirements.txt | 2 +- src/tests/test-openai.py | 29 ++- src/tests/test_session_router.py | 123 +++++---- src/vllm_router/README.md | 1 + src/vllm_router/engine_stats.py | 58 +++-- src/vllm_router/httpx_client.py | 23 +- src/vllm_router/log.py | 4 +- src/vllm_router/perf-test.sh | 2 +- src/vllm_router/protocols.py | 11 +- src/vllm_router/request_stats.py | 61 +++-- src/vllm_router/requirements.txt | 6 +- src/vllm_router/router.py | 283 +++++++++++++-------- src/vllm_router/routing_logic.py | 69 ++--- src/vllm_router/service_discovery.py | 87 ++++--- src/vllm_router/utils.py | 13 +- tutorials/00-install-kubernetes-env.md | 23 +- tutorials/01-minimal-helm-installation.md | 44 +++- tutorials/02-basic-vllm-config.md | 75 +++--- tutorials/03-load-model-from-pv.md | 152 +++++------ tutorials/04-launch-multiple-model.md | 101 ++++---- tutorials/05-offload-kv-cache.md | 68 ++--- tutorials/README.md | 18 +- tutorials/assets/example-04-openai.py | 5 +- utils/install-helm.sh | 2 +- utils/install-kubectl.sh | 2 +- utils/install-minikube-cluster.sh | 2 - 54 files changed, 993 insertions(+), 671 deletions(-) create mode 100644 .github/workflows/matchers/actionlint.json create mode 100644 .github/workflows/pre-commit.yml create mode 100644 .markdownlint.yaml create mode 100644 .pre-commit-config.yaml create mode 100644 pyproject.toml create mode 100644 requirements-lint.txt diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 999b9ff1..6b2f2283 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: '3.8' diff --git a/.github/workflows/helm-lint.yml b/.github/workflows/helm-lint.yml index bf55b67d..45295491 100644 --- a/.github/workflows/helm-lint.yml +++ b/.github/workflows/helm-lint.yml @@ -23,4 +23,3 @@ jobs: - name: Lint open-webui Helm Chart run: | helm lint ./helm - diff --git a/.github/workflows/helm-release.yml b/.github/workflows/helm-release.yml index 27cb7497..e00badc5 100644 --- a/.github/workflows/helm-release.yml +++ b/.github/workflows/helm-release.yml @@ -24,7 +24,7 @@ jobs: git config user.name "$GITHUB_ACTOR" git config user.email "$GITHUB_ACTOR@users.noreply.github.com" - # Could add Prometheus as a dependent chart here if desired + # Could add Prometheus as a dependent chart here if desired # - name: Add Dependency Repos # run: | # helm repo add prometheus-community https://prometheus-community.github.io/helm-charts @@ -52,6 +52,5 @@ jobs: break fi REPO=$(echo '${{ github.repository }}' | tr '[:upper:]' '[:lower:]') - helm push "${pkg}" oci://ghcr.io/$REPO + helm push "${pkg}" "oci://ghcr.io/$REPO" done - diff --git a/.github/workflows/matchers/actionlint.json b/.github/workflows/matchers/actionlint.json new file mode 100644 index 00000000..09211db2 --- /dev/null +++ b/.github/workflows/matchers/actionlint.json @@ -0,0 +1,17 @@ +{ + "problemMatcher": [ + { + "owner": "actionlint", + "pattern": [ + { + "regexp": "^(?:\\x1b\\[\\d+m)?(.+?)(?:\\x1b\\[\\d+m)*:(?:\\x1b\\[\\d+m)*(\\d+)(?:\\x1b\\[\\d+m)*:(?:\\x1b\\[\\d+m)*(\\d+)(?:\\x1b\\[\\d+m)*: (?:\\x1b\\[\\d+m)*(.+?)(?:\\x1b\\[\\d+m)* \\[(.+?)\\]$", + "file": 1, + "line": 2, + "column": 3, + "message": 4, + "code": 5 + } + ] + } + ] + } diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml new file mode 100644 index 00000000..8c72a709 --- /dev/null +++ b/.github/workflows/pre-commit.yml @@ -0,0 +1,17 @@ +name: pre-commit + +on: + pull_request: + push: + branches: [main] + +jobs: + pre-commit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0 + with: + python-version: "3.12" + - run: echo "::add-matcher::.github/workflows/matchers/actionlint.json" + - uses: pre-commit/action@2c7b3805fd2a0fd8c1884dcaebf91fc102a13ecd # v3.0.1 diff --git a/.markdownlint.yaml b/.markdownlint.yaml new file mode 100644 index 00000000..70febfac --- /dev/null +++ b/.markdownlint.yaml @@ -0,0 +1,5 @@ +MD013: false # line-length +MD028: false # no-blanks-blockquote +MD029: # ol-prefix + style: ordered +MD033: false # no-inline-html diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 00000000..5f90325e --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,45 @@ +repos: +- repo: https://github.com/rhysd/actionlint + rev: v1.7.7 + hooks: + - id: actionlint +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: check-json + - id: check-toml + - id: check-yaml + exclude: ^helm/templates/ + - id: end-of-file-fixer + - id: requirements-txt-fixer + - id: trailing-whitespace +# TODO: Enable these hooks when environment issues are resolved +# - repo: https://github.com/hadolint/hadolint +# rev: v2.12.0 +# hooks: +# - id: hadolint +# - repo: https://github.com/gruntwork-io/pre-commit +# rev: v0.1.25 +# hooks: +# - id: helmlint +- repo: https://github.com/psf/black + rev: '25.1.0' + hooks: + - id: black +- repo: https://github.com/pycqa/isort + rev: '6.0.0' + hooks: + - id: isort +# TODO: Enable this hook when environment issues are resolved +# - repo: https://github.com/koalaman/shellcheck-precommit +# rev: v0.10.0 +# hooks: +# - id: shellcheck +- repo: https://github.com/igorshubovych/markdownlint-cli + rev: v0.44.0 + hooks: + - id: markdownlint +- repo: https://github.com/codespell-project/codespell + rev: v2.4.1 + hooks: + - id: codespell diff --git a/README.md b/README.md index ac5c1af9..1bddc352 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,4 @@ -# vLLM Production Stack: reference stack for production vLLM deployment - +# vLLM Production Stack: reference stack for production vLLM deployment **vLLM Production Stack** project provides a reference implementation on how to build an inference stack on top of vLLM, which allows you to: @@ -7,7 +6,7 @@ - πŸ’» Monitor the through a web dashboard - πŸ˜„ Enjoy the performance benefits brought by request routing and KV cache offloading -## Latest News: +## Latest News - πŸ”₯ vLLM Production Stack is released! Checkout our [release blogs](https://blog.lmcache.ai/2025-01-21-stack-release) [01-22-2025] - ✨Join us at #production-stack channel of vLLM [slack](https://slack.vllm.ai/), LMCache [slack](https://join.slack.com/t/lmcacheworkspace/shared_invite/zt-2viziwhue-5Amprc9k5hcIdXT7XevTaQ), or fill out this [interest form](https://forms.gle/wSoeNpncmPVdXppg8) for a chat! @@ -20,7 +19,6 @@ The stack is set up using [Helm](https://helm.sh/docs/), and contains the follow - **Request router**: Directs requests to appropriate backends based on routing keys or session IDs to maximize KV cache reuse. - **Observability stack**: monitors the metrics of the backends through [Prometheus](https://github.com/prometheus/prometheus) + [Grafana](https://grafana.com/) - Architecture of the stack ## Roadmap @@ -42,6 +40,7 @@ We are actively working on this project and will release the following features ### Deployment vLLM Production Stack can be deployed via helm charts. Clone the repo to local and execute the following commands for a minimal deployment: + ```bash git clone https://github.com/vllm-project/production-stack.git cd production-stack/ @@ -55,21 +54,18 @@ To validate the installation and and send query to the stack, refer to [this tut For more information about customizing the helm chart, please refer to [values.yaml](https://github.com/vllm-project/production-stack/blob/main/helm/values.yaml) and our other [tutorials](https://github.com/vllm-project/production-stack/tree/main/tutorials). - ### Uninstall ```bash sudo helm uninstall vllm ``` - ## Grafana Dashboard ### Features The Grafana dashboard provides the following insights: - 1. **Available vLLM Instances**: Displays the number of healthy instances. 2. **Request Latency Distribution**: Visualizes end-to-end request latency. 3. **Time-to-First-Token (TTFT) Distribution**: Monitors response times for token generation. @@ -98,7 +94,6 @@ The router ensures efficient request distribution among backends. It supports: - Session-ID based routing - (WIP) prefix-aware routing - ## Contributing Contributions are welcome! Please follow the standard GitHub flow: @@ -107,6 +102,21 @@ Contributions are welcome! Please follow the standard GitHub flow: 2. Create a feature branch. 3. Submit a pull request with detailed descriptions. +We use `pre-commit` for formatting, it is installed as follows: + +```bash +pip install -r requirements-lint.txt +pre-commit install +``` + +It will run automatically before every commit. You cana also run it manually on all files with: + +```bash +pre-commit run --all-files +``` + +> You can read more about `pre-commit` at . + ## License This project is licensed under the MIT License. See the `LICENSE` file for details. @@ -114,4 +124,3 @@ This project is licensed under the MIT License. See the `LICENSE` file for detai --- For any issues or questions, feel free to open an issue or contact the maintainers. - diff --git a/helm/README.md b/helm/README.md index 243fa758..c05eaf05 100644 --- a/helm/README.md +++ b/helm/README.md @@ -2,14 +2,14 @@ This helm chart lets users deploy multiple serving engines and a router into the Kubernetes cluster. -## Key features: +## Key features - Support running multiple serving engines with multiple different models -- Load the model weights directly from the existing PersistentVolumes +- Load the model weights directly from the existing PersistentVolumes ## Prerequisites -1. A running Kubernetes cluster with GPU. (You can set it up through `minikube`: https://minikube.sigs.k8s.io/docs/tutorials/nvidia/) +1. A running Kubernetes cluster with GPU. (You can set it up through `minikube`: ) 2. [Helm](https://helm.sh/docs/intro/install/) ## Install the helm chart diff --git a/helm/ct.yaml b/helm/ct.yaml index d273e118..157243f9 100644 --- a/helm/ct.yaml +++ b/helm/ct.yaml @@ -1,3 +1,3 @@ chart-dirs: - charts -validate-maintainers: false \ No newline at end of file +validate-maintainers: false diff --git a/helm/lintconf.yaml b/helm/lintconf.yaml index c8e8c5d7..97dbfbad 100644 --- a/helm/lintconf.yaml +++ b/helm/lintconf.yaml @@ -39,4 +39,4 @@ rules: type: unix trailing-spaces: enable truthy: - level: warning \ No newline at end of file + level: warning diff --git a/helm/templates/deployment-vllm-multi.yaml b/helm/templates/deployment-vllm-multi.yaml index 9d1ff17c..33f5d86d 100644 --- a/helm/templates/deployment-vllm-multi.yaml +++ b/helm/templates/deployment-vllm-multi.yaml @@ -67,7 +67,7 @@ spec: value: /data {{- if $modelSpec.hf_token }} - name: HF_TOKEN - valueFrom: + valueFrom: secretKeyRef: name: {{ .Release.Name }}-secrets key: hf_token_{{ $modelSpec.name }} @@ -89,7 +89,7 @@ spec: value: "{{ $modelSpec.lmcacheConfig.cpuOffloadingBufferSize }}" {{- end }} {{- if $modelSpec.lmcacheConfig.diskOffloadingBufferSize }} - - name: LMCACHE_LOCAL_DISK + - name: LMCACHE_LOCAL_DISK value: "True" - name: LMCACHE_MAX_LOCAL_DISK_SIZE value: "{{ $modelSpec.lmcacheConfig.diskOffloadingBufferSize }}" @@ -99,7 +99,7 @@ spec: envFrom: - configMapRef: name: "{{ .Release.Name }}-configs" - {{- end }} + {{- end }} ports: - name: {{ include "chart.container-port-name" . }} containerPort: {{ include "chart.container-port" . }} @@ -123,7 +123,7 @@ spec: {{- if .Values.servingEngineSpec.runtimeClassName }} runtimeClassName: nvidia - {{- end }} + {{- end }} {{- if $modelSpec.nodeSelectorTerms}} affinity: nodeAffinity: @@ -132,7 +132,7 @@ spec: {{- with $modelSpec.nodeSelectorTerms }} {{- toYaml . | nindent 12 }} {{- end }} - {{- end }} + {{- end }} {{- end }} --- {{- end }} diff --git a/helm/templates/role.yaml b/helm/templates/role.yaml index 31ed57c5..a138fb50 100644 --- a/helm/templates/role.yaml +++ b/helm/templates/role.yaml @@ -7,4 +7,3 @@ rules: - apiGroups: [""] # "" indicates the core API group resources: ["pods"] verbs: ["get", "watch", "list"] - diff --git a/helm/templates/serviceaccount.yaml b/helm/templates/serviceaccount.yaml index bcb0f20f..93face5c 100644 --- a/helm/templates/serviceaccount.yaml +++ b/helm/templates/serviceaccount.yaml @@ -3,4 +3,3 @@ kind: ServiceAccount metadata: name: "{{ .Release.Name }}-router-service-account" namespace: {{ .Release.Namespace }} - diff --git a/helm/test.sh b/helm/test.sh index edbd8c4d..effd0b51 100644 --- a/helm/test.sh +++ b/helm/test.sh @@ -1,2 +1,2 @@ -#helm upgrade --install --create-namespace --namespace=ns-vllm test-vllm . -f values-yihua.yaml +#helm upgrade --install --create-namespace --namespace=ns-vllm test-vllm . -f values-yihua.yaml helm upgrade --install test-vllm . -f values-additional.yaml #--create-namespace --namespace=vllm diff --git a/helm/values.schema.json b/helm/values.schema.json index f23d73a9..0e520bb3 100644 --- a/helm/values.schema.json +++ b/helm/values.schema.json @@ -140,7 +140,7 @@ } } }, - "runtimeClassName": { + "runtimeClassName": { "type": "string" } } @@ -170,4 +170,3 @@ } } } - diff --git a/helm/values.yaml b/helm/values.yaml index 15afe5ae..47f8fe92 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -51,13 +51,13 @@ servingEngineSpec: # # requestCPU: 10 # requestMemory: "64Gi" - # requestGPU: 1 + # requestGPU: 1 # # pvcStorage: "50Gi" - # pvcMatchLabels: + # pvcMatchLabels: # model: "mistral" # - # vllmConfig: + # vllmConfig: # enableChunkedPrefill: false # enablePrefixCaching: false # maxModelLen: 16384 @@ -80,14 +80,14 @@ servingEngineSpec: # - "NVIDIA-RTX-A6000" modelSpec: [] - # -- Container port + # -- Container port containerPort: 8000 - # -- Service port + # -- Service port servicePort: 80 - + # -- Set other environment variables from config map configs: {} - + # -- Readiness probe configuration startupProbe: # -- Number of seconds after the container has started before startup probe is initiated @@ -102,7 +102,7 @@ servingEngineSpec: path: /health # -- Name or number of the port to access on the container, on which the server is listening port: 8000 - + # -- Liveness probe configuration livenessProbe: # -- Number of seconds after the container has started before liveness probe is initiated @@ -117,7 +117,7 @@ servingEngineSpec: path: /health # -- Name or number of the port to access on the container, on which the server is listening port: 8000 - + # -- Disruption Budget Configuration maxUnavailablePodDisruptionBudget: "" @@ -135,7 +135,7 @@ servingEngineSpec: routerSpec: # -- Number of replicas replicaCount: 1 - + # -- Container port containerPort: 8000 diff --git a/observability/README.md b/observability/README.md index 4fe1a492..b0837342 100644 --- a/observability/README.md +++ b/observability/README.md @@ -4,7 +4,7 @@ ## Deploy the observability stack -The observability stack is based on [kube-prom-stack](https://github.com/prometheus-community/helm-charts/blob/main/charts/kube-prometheus-stack/README.md). +The observability stack is based on [kube-prom-stack](https://github.com/prometheus-community/helm-charts/blob/main/charts/kube-prometheus-stack/README.md). To launch the observability stack: diff --git a/observability/upgrade.sh b/observability/upgrade.sh index 9bc868a0..36cbd551 100644 --- a/observability/upgrade.sh +++ b/observability/upgrade.sh @@ -1,4 +1,3 @@ helm upgrade kube-prom-stack prometheus-community/kube-prometheus-stack \ --namespace monitoring \ -f "values.yaml" - diff --git a/observability/values.yaml b/observability/values.yaml index 77a073d6..4b6b713d 100644 --- a/observability/values.yaml +++ b/observability/values.yaml @@ -1,6 +1,6 @@ ## Create default rules for monitoring the cluster # -# Disable `etcd` and `kubeScheduler` rules (managed by DOKS, so metrics are not accesible) +# Disable `etcd` and `kubeScheduler` rules (managed by DOKS, so metrics are not accessible) defaultRules: create: true rules: @@ -102,7 +102,7 @@ prometheus: environment: test release: test namespaceSelector: - matchNames: + matchNames: - default endpoints: - port: "service-port" diff --git a/observability/vllm-dashboard.json b/observability/vllm-dashboard.json index 2880c8ae..73b9ca11 100644 --- a/observability/vllm-dashboard.json +++ b/observability/vllm-dashboard.json @@ -91,7 +91,7 @@ "useBackend": false } ], - "title": "Available vLLM instaces", + "title": "Available vLLM instances", "type": "stat" }, { diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 00000000..5d7bf33d --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,2 @@ +[tool.isort] +profile = "black" diff --git a/requirements-lint.txt b/requirements-lint.txt new file mode 100644 index 00000000..416634f5 --- /dev/null +++ b/requirements-lint.txt @@ -0,0 +1 @@ +pre-commit diff --git a/requirements-test.txt b/requirements-test.txt index 55b033e9..e079f8a6 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1 +1 @@ -pytest \ No newline at end of file +pytest diff --git a/setup.py b/setup.py index 2c25b996..fe741136 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -from setuptools import setup, find_packages +from setuptools import find_packages, setup setup( name="vllm-router", @@ -28,5 +28,5 @@ "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", ], - python_requires='>=3.7', + python_requires=">=3.7", ) diff --git a/src/tests/README.md b/src/tests/README.md index c2f83c1f..366393d2 100644 --- a/src/tests/README.md +++ b/src/tests/README.md @@ -15,6 +15,7 @@ MODEL = "meta-llama/Llama-3.1-8B-Instruct" ``` Then, execute the following command in terminal: + ```bash python3 test-openai.py ``` @@ -30,7 +31,7 @@ The `perftest/` folder contains the performance test scripts for the router. Spe - `run-server.sh` and `run-multi-server.sh`: launches one or multiple mock-up OpenAI API server - `clean-up.sh`: kills the mock-up OpenAI API server processes. -### Example router performance test: +### Example router performance test Here's an example setup of running the router performance test: @@ -39,4 +40,3 @@ Here's an example setup of running the router performance test: - **Step 1**: launch the mock-up OpenAI API server by `bash run-multi-server.sh 4 500` - **Step 2**: launch the router locally. See `src/router/perf-test.sh` - **Step 3**: launch the request generator by `python3 request_generator.py --qps 10 --num-workers 32` - diff --git a/src/tests/perftest/fake-openai-server.py b/src/tests/perftest/fake-openai-server.py index fcc7b613..6f1bcdc4 100644 --- a/src/tests/perftest/fake-openai-server.py +++ b/src/tests/perftest/fake-openai-server.py @@ -5,25 +5,40 @@ --max-tokens: maximum number of tokens to generate in the response if max_tokens is not provided in the request --speed: number of tokens per second per request """ -from typing import (AsyncGenerator, AsyncIterator, Callable, Dict, Final, List, - Optional) -import asyncio + import argparse +import asyncio import time -from fastapi import FastAPI, Request, HTTPException -from fastapi.responses import StreamingResponse, JSONResponse, Response - -from vllm.entrypoints.chat_utils import (ChatTemplateContentFormatOption, - ConversationMessage) +from typing import AsyncGenerator, AsyncIterator, Callable, Dict, Final, List, Optional + +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse, Response, StreamingResponse +from vllm.entrypoints.chat_utils import ( + ChatTemplateContentFormatOption, + ConversationMessage, +) from vllm.entrypoints.logger import RequestLogger from vllm.entrypoints.openai.protocol import ( - ChatCompletionLogProb, ChatCompletionLogProbs, - ChatCompletionLogProbsContent, ChatCompletionNamedToolChoiceParam, - ChatCompletionRequest, ChatCompletionResponse, - ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice, - ChatCompletionStreamResponse, ChatMessage, DeltaFunctionCall, DeltaMessage, - DeltaToolCall, ErrorResponse, FunctionCall, PromptTokenUsageInfo, - RequestResponseMetadata, ToolCall, UsageInfo) + ChatCompletionLogProb, + ChatCompletionLogProbs, + ChatCompletionLogProbsContent, + ChatCompletionNamedToolChoiceParam, + ChatCompletionRequest, + ChatCompletionResponse, + ChatCompletionResponseChoice, + ChatCompletionResponseStreamChoice, + ChatCompletionStreamResponse, + ChatMessage, + DeltaFunctionCall, + DeltaMessage, + DeltaToolCall, + ErrorResponse, + FunctionCall, + PromptTokenUsageInfo, + RequestResponseMetadata, + ToolCall, + UsageInfo, +) app = FastAPI() REQUEST_ID = 0 @@ -31,12 +46,13 @@ MODEL_NAME = "fake_model_name" NUM_RUNNING_REQUESTS = 0 + async def generate_fake_response( - request_id: str, - model_name: str, - num_tokens: int, - tokens_per_sec: float, - ): + request_id: str, + model_name: str, + num_tokens: int, + tokens_per_sec: float, +): async def sleep_to_target(target: float): sleep_time = target - time.time() if sleep_time > 0: @@ -47,25 +63,27 @@ async def sleep_to_target(target: float): if GLOBAL_ARGS.ttft > 0: await asyncio.sleep(GLOBAL_ARGS.ttft) - + NUM_RUNNING_REQUESTS += 1 created_time = int(time.time()) chunk_object_type: Final = "chat.completion.chunk" choice_data = ChatCompletionResponseStreamChoice( - index = 0, - delta = DeltaMessage( - role = "assistant", - content="", - ), - logprobs = None, - finish_reason = None) + index=0, + delta=DeltaMessage( + role="assistant", + content="", + ), + logprobs=None, + finish_reason=None, + ) chunk = ChatCompletionStreamResponse( - id = request_id, - object = chunk_object_type, - created = created_time, - choices = [choice_data], - model = model_name) + id=request_id, + object=chunk_object_type, + created=created_time, + choices=[choice_data], + model=model_name, + ) data = chunk.model_dump_json(exclude_unset=True) token_batch = 20 @@ -75,42 +93,42 @@ async def sleep_to_target(target: float): text = "Hello " choice_data = ChatCompletionResponseStreamChoice( - index = 0, - delta = DeltaMessage(content=text), - logprobs = None, - finish_reason = None) + index=0, delta=DeltaMessage(content=text), logprobs=None, finish_reason=None + ) chunk = ChatCompletionStreamResponse( - id = request_id, - object = chunk_object_type, - created = created_time, - choices = [choice_data], - model = model_name) + id=request_id, + object=chunk_object_type, + created=created_time, + choices=[choice_data], + model=model_name, + ) data = chunk.model_dump_json(exclude_unset=True) yield f"data: {data}\n\n" await sleep_to_target(num_tokens / tokens_per_sec + start) choice_data = ChatCompletionResponseStreamChoice( - index = 0, - delta = DeltaMessage( - content="\n", - ), - logprobs = None, - finish_reason = "length") + index=0, + delta=DeltaMessage( + content="\n", + ), + logprobs=None, + finish_reason="length", + ) chunk = ChatCompletionStreamResponse( - id = request_id, - object = chunk_object_type, - created = created_time, - choices = [choice_data], - model = model_name) + id=request_id, + object=chunk_object_type, + created=created_time, + choices=[choice_data], + model=model_name, + ) chunk.usage = UsageInfo( - prompt_tokens=0, - completion_tokens=num_tokens, - total_tokens=num_tokens, - ) - + prompt_tokens=0, + completion_tokens=num_tokens, + total_tokens=num_tokens, + ) yield f"data: {data}\n\n" yield "data: [DONE]\n\n" @@ -118,19 +136,25 @@ async def sleep_to_target(target: float): NUM_RUNNING_REQUESTS -= 1 elapsed = time.time() - start thp = num_tokens / elapsed - print(f"Finished request with id: {request_id}, elapsed time {elapsed}, throughput {thp}") + print( + f"Finished request with id: {request_id}, elapsed time {elapsed}, throughput {thp}" + ) + @app.post("/v1/chat/completions") async def chat_completions(request: ChatCompletionRequest, raw_request: Request): global REQUEST_ID, MODEL_NAME REQUEST_ID += 1 - request_id = raw_request.get('x-request-id', f"fake_request_id_{REQUEST_ID}") + request_id = raw_request.get("x-request-id", f"fake_request_id_{REQUEST_ID}") print(f"Received request with id: {request_id} at {time.time()}") model_name = MODEL_NAME num_tokens = request.max_tokens if request.max_tokens else 100 tokens_per_sec = GLOBAL_ARGS.speed - return StreamingResponse(generate_fake_response(request_id, model_name, num_tokens, tokens_per_sec), - media_type="text/event-stream") + return StreamingResponse( + generate_fake_response(request_id, model_name, num_tokens, tokens_per_sec), + media_type="text/event-stream", + ) + @app.get("/metrics") async def metrics(): @@ -147,6 +171,7 @@ async def metrics(): return Response(content=content, media_type="text/plain") + def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--port", type=int, default=9000) @@ -157,7 +182,9 @@ def parse_args(): args = parser.parse_args() return args + if __name__ == "__main__": import uvicorn + GLOBAL_ARGS = parse_args() uvicorn.run(app, host=GLOBAL_ARGS.host, port=GLOBAL_ARGS.port) diff --git a/src/tests/perftest/request_generator.py b/src/tests/perftest/request_generator.py index 1da899d5..a264e9e3 100644 --- a/src/tests/perftest/request_generator.py +++ b/src/tests/perftest/request_generator.py @@ -1,10 +1,11 @@ -import openai -import time -import threading import argparse import multiprocessing -import time import os +import threading +import time + +import openai + def response_consumer(response_stream, start_time): chunk_messages = [] @@ -19,12 +20,17 @@ def response_consumer(response_stream, start_time): token_time = time.time() chunk_messages.append(chunk_message) except Exception as e: - print(f"Error in consumer thread {threading.current_thread().name} of process {os.getpid()}: {e}") + print( + f"Error in consumer thread {threading.current_thread().name} of process {os.getpid()}: {e}" + ) final_words = "".join(chunk_messages) end_time = time.time() - response_len = len(final_words.split(' ')) + response_len = len(final_words.split(" ")) throughput = response_len / (end_time - start_time) - print(f"Process {os.getpid()} got a response of: {response_len} words in {end_time-start_time:.2f} seconds (throughput: {throughput:.2f} w/s, ttft: {token_time - start_time:.4f}) at {end_time}") + print( + f"Process {os.getpid()} got a response of: {response_len} words in {end_time-start_time:.2f} seconds (throughput: {throughput:.2f} w/s, ttft: {token_time - start_time:.4f}) at {end_time}" + ) + def worker(api_key, base_url, model, qps_per_worker): request_id = 0 @@ -36,22 +42,36 @@ def worker(api_key, base_url, model, qps_per_worker): print("Send request at ", start) try: custom_headers = { - 'x-user-id': str(os.getpid()), # Unique user ID for each process - 'x-request-id': str(os.getpid()) + f"req-{request_id}", # Unique session ID for each process + "x-user-id": str(os.getpid()), # Unique user ID for each process + "x-request-id": str(os.getpid()) + + f"req-{request_id}", # Unique session ID for each process } request_id += 1 response_stream = client.chat.completions.create( - messages=[{"role": "user", "content": "Tell me a joke about artificial intelligence."}], + messages=[ + { + "role": "user", + "content": "Tell me a joke about artificial intelligence.", + } + ], model=model, temperature=0, stream=True, max_tokens=500, - extra_headers=custom_headers + extra_headers=custom_headers, ) start_time = time.time() - print("Process {} sent a request at {:.4f}, connection overhead: {:.4f}".format(os.getpid(), start_time, start_time - start)) + print( + "Process {} sent a request at {:.4f}, connection overhead: {:.4f}".format( + os.getpid(), start_time, start_time - start + ) + ) - consumer_thread = threading.Thread(target=response_consumer, args=(response_stream, start_time), daemon=True) + consumer_thread = threading.Thread( + target=response_consumer, + args=(response_stream, start_time), + daemon=True, + ) consumer_thread.start() except Exception as e: print(f"Error in process {os.getpid()}: {e}") @@ -61,10 +81,15 @@ def worker(api_key, base_url, model, qps_per_worker): else: print("WARNING: Process {} is too slow".format(os.getpid())) + def main(): parser = argparse.ArgumentParser(description="Stress test an OpenAI API server") - parser.add_argument("--qps", type=float, required=True, help="Total queries per second") - parser.add_argument("--num-workers", type=int, required=True, help="Number of worker processes") + parser.add_argument( + "--qps", type=float, required=True, help="Total queries per second" + ) + parser.add_argument( + "--num-workers", type=int, required=True, help="Number of worker processes" + ) args = parser.parse_args() qps_per_worker = args.qps / args.num_workers @@ -75,13 +100,15 @@ def main(): model = "fake_model_name" for _ in range(args.num_workers): - p = multiprocessing.Process(target=worker, args=(api_key, base_url, model, qps_per_worker)) + p = multiprocessing.Process( + target=worker, args=(api_key, base_url, model, qps_per_worker) + ) p.start() processes.append(p) for p in processes: p.join() + if __name__ == "__main__": main() - diff --git a/src/tests/requirements.txt b/src/tests/requirements.txt index d23d558d..c9376787 100644 --- a/src/tests/requirements.txt +++ b/src/tests/requirements.txt @@ -1,3 +1,3 @@ fastapi -uvicorn httpx +uvicorn diff --git a/src/tests/test-openai.py b/src/tests/test-openai.py index 28160715..51a45b43 100644 --- a/src/tests/test-openai.py +++ b/src/tests/test-openai.py @@ -3,21 +3,22 @@ BASE_URL = "http://localhost:8080/" MODEL = "mistralai/Mistral-7B-Instruct-v0.2" -client = openai.OpenAI(api_key = "EMPTY", base_url = BASE_URL) +client = openai.OpenAI(api_key="EMPTY", base_url=BASE_URL) custom_headers = { - 'HERE': 'THERE', + "HERE": "THERE", } response = client.chat.completions.create( - messages=[ - #{"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Tell me a joke about artificial intelligence."} - ], - model = MODEL, - temperature = 0, - stream = True, - max_tokens = 100, - extra_headers=custom_headers) + messages=[ + # {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Tell me a joke about artificial intelligence."} + ], + model=MODEL, + temperature=0, + stream=True, + max_tokens=100, + extra_headers=custom_headers, +) for tok in response: @@ -25,9 +26,9 @@ continue chunk_message = tok.choices[0].delta.content if chunk_message is not None: - print(chunk_message, end = "", flush = True) - #if first_token_time is None and chunk_message != "": + print(chunk_message, end="", flush=True) + # if first_token_time is None and chunk_message != "": # first_token_time = time.time() - #words += chunk_message + # words += chunk_message print("") diff --git a/src/tests/test_session_router.py b/src/tests/test_session_router.py index f2dd6077..7e857ade 100644 --- a/src/tests/test_session_router.py +++ b/src/tests/test_session_router.py @@ -1,7 +1,8 @@ -import pytest -from unittest.mock import Mock -from typing import List, Dict import sys +from typing import Dict, List +from unittest.mock import Mock + +import pytest from vllm_router.routing_logic import SessionRouter @@ -20,6 +21,7 @@ class Request: def __init__(self, headers: Dict[str, str]): self.headers = headers + # Test cases @@ -27,28 +29,35 @@ def test_route_request_with_session_id(): """ Test routing when a session ID is present in the request headers. """ - endpoints = [EndpointInfo(url="http://engine1.com"), - EndpointInfo(url="http://engine2.com")] + endpoints = [ + EndpointInfo(url="http://engine1.com"), + EndpointInfo(url="http://engine2.com"), + ] request_stats = { - "http://engine1.com": RequestStats(qps=10), "http://engine2.com": RequestStats(qps=5)} + "http://engine1.com": RequestStats(qps=10), + "http://engine2.com": RequestStats(qps=5), + } request = Request(headers={"session_id": "abc123"}) router = SessionRouter(session_key="session_id") url = router.route_request(endpoints, None, request_stats, request) # Ensure the same session ID always maps to the same endpoint - assert url == router.route_request( - endpoints, None, request_stats, request) + assert url == router.route_request(endpoints, None, request_stats, request) def test_route_request_without_session_id(): """ Test routing when no session ID is present in the request headers. """ - endpoints = [EndpointInfo(url="http://engine1.com"), - EndpointInfo(url="http://engine2.com")] + endpoints = [ + EndpointInfo(url="http://engine1.com"), + EndpointInfo(url="http://engine2.com"), + ] request_stats = { - "http://engine1.com": RequestStats(qps=10), "http://engine2.com": RequestStats(qps=5)} + "http://engine1.com": RequestStats(qps=10), + "http://engine2.com": RequestStats(qps=5), + } request = Request(headers={}) # No session ID router = SessionRouter(session_key="session_id") @@ -62,10 +71,14 @@ def test_route_request_with_dynamic_endpoints(): """ Test routing when the list of endpoints changes dynamically. """ - endpoints = [EndpointInfo(url="http://engine1.com"), - EndpointInfo(url="http://engine2.com")] + endpoints = [ + EndpointInfo(url="http://engine1.com"), + EndpointInfo(url="http://engine2.com"), + ] request_stats = { - "http://engine1.com": RequestStats(qps=10), "http://engine2.com": RequestStats(qps=5)} + "http://engine1.com": RequestStats(qps=10), + "http://engine2.com": RequestStats(qps=5), + } request = Request(headers={"session_id": "abc123"}) router = SessionRouter(session_key="session_id") @@ -100,24 +113,26 @@ def test_consistent_hashing_remove_node_multiple_sessions(): router = SessionRouter(session_key="session_id") # Route with initial endpoints - urls_before = [router.route_request( - endpoints, None, request_stats, req) for req in requests] + urls_before = [ + router.route_request(endpoints, None, request_stats, req) for req in requests + ] # Remove an endpoint removed_endpoint = endpoints.pop(1) # Remove http://engine2.com del request_stats[removed_endpoint.url] # Route with the updated endpoints - urls_after = [router.route_request( - endpoints, None, request_stats, req) for req in requests] + urls_after = [ + router.route_request(endpoints, None, request_stats, req) for req in requests + ] # Ensure all session IDs are still mapped to valid endpoints - assert all(url in [endpoint.url for endpoint in endpoints] - for url in urls_after) + assert all(url in [endpoint.url for endpoint in endpoints] for url in urls_after) # Calculate the number of remapped session IDs - remapped_count = sum(1 for before, after in zip( - urls_before, urls_after) if before != after) + remapped_count = sum( + 1 for before, after in zip(urls_before, urls_after) if before != after + ) # Ensure minimal reassignment # Only a fraction should be remapped @@ -143,8 +158,9 @@ def test_consistent_hashing_add_node_multiple_sessions(): router = SessionRouter(session_key="session_id") # Route with initial endpoints - urls_before = [router.route_request( - endpoints, None, request_stats, req) for req in requests] + urls_before = [ + router.route_request(endpoints, None, request_stats, req) for req in requests + ] # Add a new endpoint new_endpoint = EndpointInfo(url="http://engine3.com") @@ -152,16 +168,17 @@ def test_consistent_hashing_add_node_multiple_sessions(): request_stats[new_endpoint.url] = RequestStats(qps=2) # Route with the updated endpoints - urls_after = [router.route_request( - endpoints, None, request_stats, req) for req in requests] + urls_after = [ + router.route_request(endpoints, None, request_stats, req) for req in requests + ] # Ensure all session IDs are still mapped to valid endpoints - assert all(url in [endpoint.url for endpoint in endpoints] - for url in urls_after) + assert all(url in [endpoint.url for endpoint in endpoints] for url in urls_after) # Calculate the number of remapped session IDs - remapped_count = sum(1 for before, after in zip( - urls_before, urls_after) if before != after) + remapped_count = sum( + 1 for before, after in zip(urls_before, urls_after) if before != after + ) # Ensure minimal reassignment # Only a fraction should be remapped @@ -187,8 +204,9 @@ def test_consistent_hashing_add_then_remove_node(): router = SessionRouter(session_key="session_id") # Route with initial endpoints - urls_before_add = [router.route_request( - endpoints, None, request_stats, req) for req in requests] + urls_before_add = [ + router.route_request(endpoints, None, request_stats, req) for req in requests + ] # Add a new endpoint new_endpoint = EndpointInfo(url="http://engine3.com") @@ -196,16 +214,19 @@ def test_consistent_hashing_add_then_remove_node(): request_stats[new_endpoint.url] = RequestStats(qps=2) # Route with the updated endpoints (after adding) - urls_after_add = [router.route_request( - endpoints, None, request_stats, req) for req in requests] + urls_after_add = [ + router.route_request(endpoints, None, request_stats, req) for req in requests + ] # Ensure all session IDs are still mapped to valid endpoints - assert all(url in [endpoint.url for endpoint in endpoints] - for url in urls_after_add) + assert all( + url in [endpoint.url for endpoint in endpoints] for url in urls_after_add + ) # Calculate the number of remapped session IDs after adding - remapped_count_after_add = sum(1 for before, after in zip( - urls_before_add, urls_after_add) if before != after) + remapped_count_after_add = sum( + 1 for before, after in zip(urls_before_add, urls_after_add) if before != after + ) # Ensure minimal reassignment after adding assert remapped_count_after_add < len(session_ids) @@ -215,21 +236,29 @@ def test_consistent_hashing_add_then_remove_node(): del request_stats[removed_endpoint.url] # Route with the updated endpoints (after removing) - urls_after_remove = [router.route_request( - endpoints, None, request_stats, req) for req in requests] + urls_after_remove = [ + router.route_request(endpoints, None, request_stats, req) for req in requests + ] # Ensure all session IDs are still mapped to valid endpoints - assert all(url in [endpoint.url for endpoint in endpoints] - for url in urls_after_remove) + assert all( + url in [endpoint.url for endpoint in endpoints] for url in urls_after_remove + ) # Calculate the number of remapped session IDs after removing - remapped_count_after_remove = sum(1 for before, after in zip( - urls_after_add, urls_after_remove) if before != after) + remapped_count_after_remove = sum( + 1 for before, after in zip(urls_after_add, urls_after_remove) if before != after + ) # Ensure minimal reassignment after removing assert remapped_count_after_remove < len(session_ids) # Verify that session IDs mapped to unaffected nodes remain the same - unaffected_count = sum(1 for before, after in zip( - urls_before_add, urls_after_remove) if before == after) - print(f"{unaffected_count} out of {len(session_ids)} session IDs were unaffected by adding and removing a node.") + unaffected_count = sum( + 1 + for before, after in zip(urls_before_add, urls_after_remove) + if before == after + ) + print( + f"{unaffected_count} out of {len(session_ids)} session IDs were unaffected by adding and removing a node." + ) diff --git a/src/vllm_router/README.md b/src/vllm_router/README.md index 0be7b26f..1039986f 100644 --- a/src/vllm_router/README.md +++ b/src/vllm_router/README.md @@ -59,6 +59,7 @@ pip install -e . ``` **Example 1:** running the router locally at port 8000 in front of multiple serving engines: + ```bash vllm-router --port 8000 \ --service-discovery static \ diff --git a/src/vllm_router/engine_stats.py b/src/vllm_router/engine_stats.py index 34cc63be..342b36f5 100644 --- a/src/vllm_router/engine_stats.py +++ b/src/vllm_router/engine_stats.py @@ -1,16 +1,19 @@ -from typing import Dict -import time import threading -import requests +import time from dataclasses import dataclass +from typing import Dict + +import requests from prometheus_client.parser import text_string_to_metric_families -from vllm_router.service_discovery import GetServiceDiscovery from vllm_router.log import init_logger +from vllm_router.service_discovery import GetServiceDiscovery + logger = init_logger(__name__) _global_engine_stats_scraper: "Optional[EngineStatsScraper]" = None + @dataclass class EngineStats: # Number of running requests @@ -41,9 +44,9 @@ def FromVllmScrape(vllm_scrape: str): gpu_cache_hit_rate = 0 for family in text_string_to_metric_families(vllm_scrape): for sample in family.samples: - if sample.name == 'vllm:num_requests_running': + if sample.name == "vllm:num_requests_running": num_running_reqs = sample.value - elif sample.name == 'vllm:num_requests_waiting': + elif sample.name == "vllm:num_requests_waiting": num_queuing_reqs = sample.value elif sample.name == "vllm:gpu_prefix_cache_hit_rate": gpu_cache_hit_rate = sample.value @@ -51,38 +54,36 @@ def FromVllmScrape(vllm_scrape: str): return EngineStats( num_running_requests=num_running_reqs, num_queuing_requests=num_queuing_reqs, - gpu_cache_hit_rate=gpu_cache_hit_rate + gpu_cache_hit_rate=gpu_cache_hit_rate, ) + class EngineStatsScraper: def __init__(self, scrape_interval: float): """ Args: - scrape_interval (float): The interval in seconds + scrape_interval (float): The interval in seconds to scrape the metrics. Raises: - ValueError: if the service discover module is have + ValueError: if the service discover module is have not been initialized. - + """ self.service_discovery = GetServiceDiscovery() self.engine_stats: Dict[str, EngineStats] = {} self.engine_stats_lock = threading.Lock() self.scrape_interval = scrape_interval - self.scrape_thread = threading.Thread( - target=self._scrape_worker, - daemon=True - ) + self.scrape_thread = threading.Thread(target=self._scrape_worker, daemon=True) self.scrape_thread.start() def _scrape_one_endpoint(self, url: str): - """Scrape the metrics and model information from a single + """Scrape the metrics and model information from a single serving engine Args: - url (str): The URL of the serving engine + url (str): The URL of the serving engine (does not contain endpoint) """ try: @@ -103,7 +104,7 @@ def _scrape_metrics(self): engine_stats = self._scrape_one_endpoint(url) if engine_stats: collected_engine_stats[url] = engine_stats - + with self.engine_stats_lock: old_urls = list(self.engine_stats.keys()) for old_url in old_urls: @@ -127,20 +128,19 @@ def get_health(self) -> bool: Check if the EngineStatsScraper is healthy Returns: - bool: True if the EngineStatsScraper is healthy, + bool: True if the EngineStatsScraper is healthy, False otherwise """ return self.scrape_thread.is_alive() -def InitializeEngineStatsScraper( - scrape_interval: float -) -> EngineStatsScraper: + +def InitializeEngineStatsScraper(scrape_interval: float) -> EngineStatsScraper: """ - Initialize the EngineStatsScraper object. This function should be + Initialize the EngineStatsScraper object. This function should be called after the service discovery module has been initialized. Raises: - ValueError: if the service discover module is have + ValueError: if the service discover module is have not been initialized ValueError: if the EngineStatsScraper object has already been @@ -153,12 +153,13 @@ def InitializeEngineStatsScraper( _global_engine_stats_scraper = EngineStatsScraper(scrape_interval) return _global_engine_stats_scraper + def GetEngineStatsScraper() -> EngineStatsScraper: """ Get the EngineStatsScraper object Raises: - ValueError: if the EngineStatsScraper object has not been + ValueError: if the EngineStatsScraper object has not been initialized """ global _global_engine_stats_scraper @@ -167,12 +168,13 @@ def GetEngineStatsScraper() -> EngineStatsScraper: return _global_engine_stats_scraper -#if __name__ == "__main__": + +# if __name__ == "__main__": # from service_discovery import InitializeServiceDiscovery, ServiceDiscoveryType # import time -# InitializeServiceDiscovery(ServiceDiscoveryType.K8S, -# namespace = "default", -# port = 8000, +# InitializeServiceDiscovery(ServiceDiscoveryType.K8S, +# namespace = "default", +# port = 8000, # label_selector = "release=test") # time.sleep(1) # InitializeEngineStatsScraper(10.0) diff --git a/src/vllm_router/httpx_client.py b/src/vllm_router/httpx_client.py index f7a9122f..d78056fe 100644 --- a/src/vllm_router/httpx_client.py +++ b/src/vllm_router/httpx_client.py @@ -1,29 +1,36 @@ import logging -from fastapi import FastAPI + import httpx +from fastapi import FastAPI from vllm_router.log import init_logger + logger = init_logger(__name__) + class HTTPXClientWrapper: async_client = None def start(self): - """ Instantiate the client. Call from the FastAPI startup hook.""" + """Instantiate the client. Call from the FastAPI startup hook.""" self.async_client = httpx.AsyncClient() - logger.info(f'httpx AsyncClient instantiated. Id {id(self.async_client)}') + logger.info(f"httpx AsyncClient instantiated. Id {id(self.async_client)}") async def stop(self): - """ Gracefully shutdown. Call from FastAPI shutdown hook.""" - logger.info(f'httpx async_client.is_closed(): {self.async_client.is_closed} - Now close it. Id (will be unchanged): {id(self.async_client)}') + """Gracefully shutdown. Call from FastAPI shutdown hook.""" + logger.info( + f"httpx async_client.is_closed(): {self.async_client.is_closed} - Now close it. Id (will be unchanged): {id(self.async_client)}" + ) await self.async_client.aclose() - logger.info(f'httpx async_client.is_closed(): {self.async_client.is_closed}. Id (will be unchanged): {id(self.async_client)}') + logger.info( + f"httpx async_client.is_closed(): {self.async_client.is_closed}. Id (will be unchanged): {id(self.async_client)}" + ) self.async_client = None - logger.info('httpx AsyncClient closed') + logger.info("httpx AsyncClient closed") def __call__(self): - """ Calling the instantiated HTTPXClientWrapper returns the wrapped singleton.""" + """Calling the instantiated HTTPXClientWrapper returns the wrapped singleton.""" # Ensure we don't use it if not started / running assert self.async_client is not None return self.async_client diff --git a/src/vllm_router/log.py b/src/vllm_router/log.py index 56fe8e78..657430c2 100644 --- a/src/vllm_router/log.py +++ b/src/vllm_router/log.py @@ -1,11 +1,13 @@ import logging from logging import Logger + def build_format(color): reset = "\x1b[0m" underline = "\x1b[3m" return f"{color}[%(asctime)s] %(levelname)s:{reset} %(message)s {underline}(%(filename)s:%(lineno)d:%(name)s){reset}" + class CustomFormatter(logging.Formatter): grey = "\x1b[1m" @@ -28,6 +30,7 @@ def format(self, record): formatter = logging.Formatter(log_fmt) return formatter.format(record) + def init_logger(name: str, log_level=logging.DEBUG) -> Logger: logger = logging.getLogger(name) @@ -38,4 +41,3 @@ def init_logger(name: str, log_level=logging.DEBUG) -> Logger: logger.setLevel(logging.DEBUG) return logger - diff --git a/src/vllm_router/perf-test.sh b/src/vllm_router/perf-test.sh index dfb0e596..dd11af9e 100644 --- a/src/vllm_router/perf-test.sh +++ b/src/vllm_router/perf-test.sh @@ -11,7 +11,7 @@ python3 router.py --port $1 \ --engine-stats-interval 10 \ --log-stats \ --routing-logic session \ - --session-key "x-user-id" + --session-key "x-user-id" #--routing-logic roundrobin diff --git a/src/vllm_router/protocols.py b/src/vllm_router/protocols.py index b80ad658..2bf553fb 100644 --- a/src/vllm_router/protocols.py +++ b/src/vllm_router/protocols.py @@ -1,6 +1,8 @@ from typing import List, Optional + from pydantic import BaseModel, ConfigDict, Field, model_validator + class OpenAIBaseModel(BaseModel): # OpenAI API does allow extra fields model_config = ConfigDict(extra="allow") @@ -13,7 +15,7 @@ def __log_extra_fields__(cls, data): field_names = set() for field_name, field in cls.model_fields.items(): field_names.add(field_name) - if hasattr(field, 'alias') and field.alias: + if hasattr(field, "alias") and field.alias: field_names.add(field.alias) # Compare against both field names and aliases @@ -21,9 +23,12 @@ def __log_extra_fields__(cls, data): if extra_fields: logger.warning( "The following fields were present in the request " - "but ignored: %s", extra_fields) + "but ignored: %s", + extra_fields, + ) return data + class ErrorResponse(OpenAIBaseModel): object: str = "error" message: str @@ -31,6 +36,7 @@ class ErrorResponse(OpenAIBaseModel): param: Optional[str] = None code: int + class ModelCard(OpenAIBaseModel): id: str object: str = "model" @@ -38,6 +44,7 @@ class ModelCard(OpenAIBaseModel): owned_by: str = "vllm" root: Optional[str] = None + class ModelList(OpenAIBaseModel): object: str = "list" data: List[ModelCard] = Field(default_factory=list) diff --git a/src/vllm_router/request_stats.py b/src/vllm_router/request_stats.py index 249de367..f42dd7e3 100644 --- a/src/vllm_router/request_stats.py +++ b/src/vllm_router/request_stats.py @@ -1,6 +1,6 @@ +from collections import deque from dataclasses import dataclass from typing import Deque, Dict -from collections import deque from vllm_router.log import init_logger @@ -8,6 +8,7 @@ _global_request_stats_monitor = None + @dataclass class RequestStats: # Number of queries per second @@ -34,6 +35,7 @@ class MovingAverageMonitor: """ Monitors the average of the value of in a sliding window """ + def __init__(self, sliding_window_size: float): self.sliding_window_size = sliding_window_size self.timestamps = deque() @@ -45,8 +47,10 @@ def update(self, timestamp: float, value: float): """ self.timestamps.append(timestamp) self.values.append(value) - while len(self.timestamps) > 0 and \ - self.timestamps[0] < timestamp - self.sliding_window_size: + while ( + len(self.timestamps) > 0 + and self.timestamps[0] < timestamp - self.sliding_window_size + ): self.timestamps.popleft() self.values.popleft() @@ -54,7 +58,7 @@ def get_average(self) -> float: """ Get the throughput in the sliding window """ - return sum(self.values) / len(self.values) + return sum(self.values) / len(self.values) def get_sum(self) -> float: """ @@ -62,18 +66,20 @@ def get_sum(self) -> float: """ return sum(self.values) + class RequestStatsMonitor: """ Monitors the request statistics of all serving engines """ - # NOTE (ApostaC): Currently, QPS is calculated based on the number of + + # NOTE (ApostaC): Currently, QPS is calculated based on the number of # arrived requests in the sliding window, but the inter_token_latency and # ttft are calculated based on the number of completed requests in the - # sliding window. + # sliding window. def __init__(self, sliding_window_size: float): """ Args: - sliding_window_size: The size of the sliding window (in seconds) + sliding_window_size: The size of the sliding window (in seconds) to store the request statistics """ self.sliding_window_size = sliding_window_size @@ -109,14 +115,14 @@ def on_new_request(self, engine_url: str, request_id: str, timestamp: float): self.in_prefill_requests[engine_url] += 1 if engine_url not in self.qps_monitors: - self.qps_monitors[engine_url] =\ - MovingAverageMonitor(self.sliding_window_size) + self.qps_monitors[engine_url] = MovingAverageMonitor( + self.sliding_window_size + ) self.qps_monitors[engine_url].update(timestamp, 1) if self.first_query_time is None: self.first_query_time = timestamp - def on_request_response(self, engine_url: str, request_id: str, timestamp: float): """ @@ -137,14 +143,12 @@ def on_request_response(self, engine_url: str, request_id: str, timestamp: float self.in_decoding_requests[engine_url] += 1 if engine_url not in self.ttft_monitors: - self.ttft_monitors[engine_url] = \ - MovingAverageMonitor(self.sliding_window_size) + self.ttft_monitors[engine_url] = MovingAverageMonitor( + self.sliding_window_size + ) self.ttft_monitors[engine_url].update(timestamp, timestamp - coming_time) - def on_request_complete(self, - engine_url: str, - request_id: str, - timestamp: float): + def on_request_complete(self, engine_url: str, request_id: str, timestamp: float): """ Tell the monitor that a request has been completed. @@ -157,11 +161,11 @@ def on_request_complete(self, self.finished_requests[engine_url] = 0 self.in_decoding_requests[engine_url] -= 1 self.finished_requests[engine_url] += 1 - + def get_request_stats( - self, - current_time: float, - ) -> Dict[str, RequestStats]: + self, + current_time: float, + ) -> Dict[str, RequestStats]: """ Get the request statistics for each serving engine @@ -176,10 +180,11 @@ def get_request_stats( """ # Calculate the request statistics ret = {} - + # Get all urls: - urls = set(self.in_prefill_requests.keys())\ - .union(set(self.in_decoding_requests.keys())) + urls = set(self.in_prefill_requests.keys()).union( + set(self.in_decoding_requests.keys()) + ) for engine_url in urls: if engine_url not in self.qps_monitors: @@ -202,17 +207,18 @@ def get_request_stats( in_prefill_requests=in_prefill_requests, in_decoding_requests=in_decoding_requests, finished_requests=finished_requests, - uptime = current_time - self.first_query_time + uptime=current_time - self.first_query_time, ) return ret + def InitializeRequestStatsMonitor(sliding_window_size: float): """ Initialize the global request statistics monitor Args: - sliding_window_size: The size of the sliding window (in seconds) + sliding_window_size: The size of the sliding window (in seconds) to store the request Raises: @@ -225,6 +231,7 @@ def InitializeRequestStatsMonitor(sliding_window_size: float): _global_request_stats_monitor = RequestStatsMonitor(sliding_window_size) return _global_request_stats_monitor + def GetRequestStatsMonitor(): """ Get the global request statistics monitor @@ -237,6 +244,8 @@ def GetRequestStatsMonitor(): """ global _global_request_stats_monitor if _global_request_stats_monitor is None: - raise ValueError("The global request statistics monitor has not been initialized") + raise ValueError( + "The global request statistics monitor has not been initialized" + ) return _global_request_stats_monitor diff --git a/src/vllm_router/requirements.txt b/src/vllm_router/requirements.txt index 629de264..03afa610 100644 --- a/src/vllm_router/requirements.txt +++ b/src/vllm_router/requirements.txt @@ -1,7 +1,7 @@ -numpy fastapi httpx -uvicorn kubernetes +numpy prometheus_client -uhashring \ No newline at end of file +uhashring +uvicorn diff --git a/src/vllm_router/router.py b/src/vllm_router/router.py index 779d5b6b..2c49898d 100644 --- a/src/vllm_router/router.py +++ b/src/vllm_router/router.py @@ -1,20 +1,28 @@ -from contextlib import asynccontextmanager -from fastapi import FastAPI, Request, HTTPException -from fastapi.responses import StreamingResponse, JSONResponse, Response -import time +import argparse +import logging import threading +import time +from contextlib import asynccontextmanager + import httpx import uvicorn -import argparse -import logging +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse, Response, StreamingResponse -from vllm_router.utils import validate_url -from vllm_router.routing_logic import InitializeRoutingLogic, RoutingLogic -from vllm_router.service_discovery import InitializeServiceDiscovery, GetServiceDiscovery, ServiceDiscoveryType -from vllm_router.request_stats import InitializeRequestStatsMonitor, GetRequestStatsMonitor -from vllm_router.engine_stats import InitializeEngineStatsScraper, GetEngineStatsScraper -from vllm_router.protocols import ModelCard, ModelList +from vllm_router.engine_stats import GetEngineStatsScraper, InitializeEngineStatsScraper from vllm_router.httpx_client import HTTPXClientWrapper +from vllm_router.protocols import ModelCard, ModelList +from vllm_router.request_stats import ( + GetRequestStatsMonitor, + InitializeRequestStatsMonitor, +) +from vllm_router.routing_logic import InitializeRoutingLogic, RoutingLogic +from vllm_router.service_discovery import ( + GetServiceDiscovery, + InitializeServiceDiscovery, + ServiceDiscoveryType, +) +from vllm_router.utils import validate_url httpx_client_wrapper = HTTPXClientWrapper() logger = logging.getLogger("uvicorn") @@ -23,17 +31,22 @@ REQUEST_ID = 0 STACK_VERSION = "0.0.1" + @asynccontextmanager async def lifespan(app: FastAPI): httpx_client_wrapper.start() yield await httpx_client_wrapper.stop() -app = FastAPI(lifespan = lifespan) + +app = FastAPI(lifespan=lifespan) # TODO: better request id system -async def process_request(method, header, body, backend_url, request_id, endpoint, debug_request=None): + +async def process_request( + method, header, body, backend_url, request_id, endpoint, debug_request=None +): """ Async generator to stream data from the backend server to the client. """ @@ -41,15 +54,12 @@ async def process_request(method, header, body, backend_url, request_id, endpoin total_len = 0 # Pass response headers to the client start_time = time.time() - GetRequestStatsMonitor().on_new_request( - backend_url, - request_id, - start_time) + GetRequestStatsMonitor().on_new_request(backend_url, request_id, start_time) client = httpx_client_wrapper() async with client.stream( method=method, - url=backend_url + endpoint, + url=backend_url + endpoint, headers=dict(header), content=body, timeout=None, @@ -61,24 +71,21 @@ async def process_request(method, header, body, backend_url, request_id, endpoin async for chunk in backend_response.aiter_bytes(): total_len += len(chunk) if not first_token: - first_token = True + first_token = True GetRequestStatsMonitor().on_request_response( - backend_url, - request_id, - time.time()) + backend_url, request_id, time.time() + ) yield chunk - GetRequestStatsMonitor().on_request_complete( - backend_url, - request_id, - time.time()) + GetRequestStatsMonitor().on_request_complete(backend_url, request_id, time.time()) - #if debug_request: + # if debug_request: # logger.debug(f"Finished the request with request id: {debug_request.headers.get('x-request-id', None)} at {time.time()}") + async def route_general_request(request: Request, endpoint: str): """ - Route the incoming request to the backend server and stream the response + Route the incoming request to the backend server and stream the response back to the client. """ in_router_time = time.time() @@ -92,60 +99,63 @@ async def route_general_request(request: Request, endpoint: str): requested_model = request_json.get("model", None) if requested_model is None: return JSONResponse( - status_code=400, - content={"error": "Invalid request: missing 'model' in request body."}) + status_code=400, + content={"error": "Invalid request: missing 'model' in request body."}, + ) endpoints = GetServiceDiscovery().get_endpoint_info() engine_stats = GetEngineStatsScraper().get_engine_stats() request_stats = GetRequestStatsMonitor().get_request_stats(time.time()) - endpoints = list(filter(lambda x: x.model_name == requested_model, - endpoints)) + endpoints = list(filter(lambda x: x.model_name == requested_model, endpoints)) if len(endpoints) == 0: return JSONResponse( - status_code=400, - content={"error": f"Model {requested_model} not found."}) + status_code=400, content={"error": f"Model {requested_model} not found."} + ) server_url = GLOBAL_ROUTER.route_request( - endpoints, - engine_stats, - request_stats, - request) - + endpoints, engine_stats, request_stats, request + ) curr_time = time.time() - logger.info(f"Routing request {REQUEST_ID} to {server_url} at {curr_time}, " - f"process time = {curr_time - in_router_time:.4f}") + logger.info( + f"Routing request {REQUEST_ID} to {server_url} at {curr_time}, " + f"process time = {curr_time - in_router_time:.4f}" + ) stream_generator = process_request( - request.method, - request.headers, - request_body, - server_url, - request_id, - endpoint = endpoint) + request.method, + request.headers, + request_body, + server_url, + request_id, + endpoint=endpoint, + ) headers, status_code = await anext(stream_generator) return StreamingResponse( - stream_generator, - status_code=status_code, - headers={key: value for key, value in headers.items()}, - ) + stream_generator, + status_code=status_code, + headers={key: value for key, value in headers.items()}, + ) @app.post("/chat/completions") async def route_chat_completition(request: Request): return await route_general_request(request, "/v1/chat/completions") + @app.post("/completions") async def route_completition(request: Request): return await route_general_request(request, "/v1/completions") + @app.get("/version") async def show_version(): ver = {"version": STACK_VERSION} return JSONResponse(content=ver) + @app.get("/models") async def show_models(): endpoints = GetServiceDiscovery().get_endpoint_info() @@ -155,94 +165,147 @@ async def show_models(): if endpoint.model_name in existing_models: continue model_card = ModelCard( - id = endpoint.model_name, - object = "model", - created = endpoint.added_timestamp, - owned_by = "vllm", + id=endpoint.model_name, + object="model", + created=endpoint.added_timestamp, + owned_by="vllm", ) model_cards.append(model_card) existing_models.add(endpoint.model_name) - model_list = ModelList(data = model_cards) + model_list = ModelList(data=model_cards) return JSONResponse(content=model_list.model_dump()) + @app.get("/health") async def health() -> Response: """Health check. check the health of the threads""" if not GetServiceDiscovery().get_health(): return JSONResponse( - content = {"status": "Service discovery module is down."}, - status_code = 503) + content={"status": "Service discovery module is down."}, status_code=503 + ) if not GetEngineStatsScraper().get_health(): return JSONResponse( - content = {"status": "Engine stats scraper is down."}, - status_code = 503) + content={"status": "Engine stats scraper is down."}, status_code=503 + ) return Response(status_code=200) - def validate_args(args): if args.service_discovery not in ["static", "k8s"]: raise ValueError(f"Invalid service discovery type: {args.service_discovery}") if args.service_discovery == "static": if args.static_backends is None: - raise ValueError("Static backends must be provided when using static service discovery.") + raise ValueError( + "Static backends must be provided when using static service discovery." + ) if args.static_models is None: - raise ValueError("Static models must be provided when using static service discovery.") + raise ValueError( + "Static models must be provided when using static service discovery." + ) if args.routing_logic not in ["roundrobin", "session"]: raise ValueError(f"Invalid routing logic: {args.routing_logic}") if args.service_discovery == "static" and args.static_backends is None: - raise ValueError("Static backends must be provided when using static service discovery.") + raise ValueError( + "Static backends must be provided when using static service discovery." + ) if args.service_discovery == "k8s" and args.k8s_port is None: raise ValueError("K8s port must be provided when using K8s service discovery.") if args.routing_logic == "session" and args.session_key is None: - raise ValueError("Session key must be provided when using session routing logic.") + raise ValueError( + "Session key must be provided when using session routing logic." + ) + def parse_args(): parser = argparse.ArgumentParser(description="Run the FastAPI app.") - parser.add_argument("--host", default="0.0.0.0", help="The host to run the server on.") - parser.add_argument("--port", type=int, default=8001, help="The port to run the server on.") + parser.add_argument( + "--host", default="0.0.0.0", help="The host to run the server on." + ) + parser.add_argument( + "--port", type=int, default=8001, help="The port to run the server on." + ) # Service discovery - parser.add_argument("--service-discovery", required=True, - help = "The service discovery type. Options: static, k8s") - parser.add_argument("--static-backends", type=str, default=None, - help="The urls of static backends, separeted by comma." - "E.g., http://localhost:8000,http://localhost:8001") - parser.add_argument("--static-models", type=str, default=None, - help="The models of static backends, separeted by comma." - "E.g., model1,model2") - parser.add_argument("--k8s-port", type=int, default=8000, - help="The port of vLLM processes when using K8s service discovery.") - parser.add_argument("--k8s-namespace", type=str, default="default", - help="The namespace of vLLM pods when using K8s service discovery.") - parser.add_argument("--k8s-label-selector", type=str, default="", - help="The label selector to filter vLLM pods when using K8s service discovery.") + parser.add_argument( + "--service-discovery", + required=True, + help="The service discovery type. Options: static, k8s", + ) + parser.add_argument( + "--static-backends", + type=str, + default=None, + help="The urls of static backends, separated by comma." + "E.g., http://localhost:8000,http://localhost:8001", + ) + parser.add_argument( + "--static-models", + type=str, + default=None, + help="The models of static backends, separated by comma." "E.g., model1,model2", + ) + parser.add_argument( + "--k8s-port", + type=int, + default=8000, + help="The port of vLLM processes when using K8s service discovery.", + ) + parser.add_argument( + "--k8s-namespace", + type=str, + default="default", + help="The namespace of vLLM pods when using K8s service discovery.", + ) + parser.add_argument( + "--k8s-label-selector", + type=str, + default="", + help="The label selector to filter vLLM pods when using K8s service discovery.", + ) # Routing logic - parser.add_argument("--routing-logic", type=str, required=True, - help="The routing logic to use, Options: roundrobin, session") - parser.add_argument("--session-key", type=str, default=None, - help="The key (in the header) to identify a session.") + parser.add_argument( + "--routing-logic", + type=str, + required=True, + help="The routing logic to use, Options: roundrobin, session", + ) + parser.add_argument( + "--session-key", + type=str, + default=None, + help="The key (in the header) to identify a session.", + ) # Monitoring - parser.add_argument("--engine-stats-interval", type=int, default=30, - help="The interval in seconds to scrape engine statistics.") - parser.add_argument("--request-stats-window", type=int, default=60, - help="The sliding window seconds to compute request statistics.") + parser.add_argument( + "--engine-stats-interval", + type=int, + default=30, + help="The interval in seconds to scrape engine statistics.", + ) + parser.add_argument( + "--request-stats-window", + type=int, + default=60, + help="The sliding window seconds to compute request statistics.", + ) # Logging - parser.add_argument("--log-stats", action="store_true", - help="Log statistics every 10 seconds.") + parser.add_argument( + "--log-stats", action="store_true", help="Log statistics every 10 seconds." + ) args = parser.parse_args() validate_args(args) return args + def parse_static_urls(args): urls = args.static_backends.split(",") backend_urls = [] @@ -253,20 +316,26 @@ def parse_static_urls(args): logger.warning(f"Skipping invalid url: {url}") return backend_urls + def parse_static_model_names(args): models = args.static_models.split(",") return models + def InitializeAll(args): if args.service_discovery == "static": - InitializeServiceDiscovery(ServiceDiscoveryType.STATIC, - urls = parse_static_urls(args), - models = parse_static_model_names(args)) + InitializeServiceDiscovery( + ServiceDiscoveryType.STATIC, + urls=parse_static_urls(args), + models=parse_static_model_names(args), + ) elif args.service_discovery == "k8s": - InitializeServiceDiscovery(ServiceDiscoveryType.K8S, - namespace = args.k8s_namespace, - port = args.k8s_port, - label_selector = args.k8s_label_selector) + InitializeServiceDiscovery( + ServiceDiscoveryType.K8S, + namespace=args.k8s_namespace, + port=args.k8s_port, + label_selector=args.k8s_label_selector, + ) else: raise ValueError(f"Invalid service discovery type: {args.service_discovery}") @@ -275,21 +344,19 @@ def InitializeAll(args): global GLOBAL_ROUTER if args.routing_logic == "roundrobin": - GLOBAL_ROUTER = InitializeRoutingLogic( - RoutingLogic.ROUND_ROBIN - ) + GLOBAL_ROUTER = InitializeRoutingLogic(RoutingLogic.ROUND_ROBIN) elif args.routing_logic == "session": GLOBAL_ROUTER = InitializeRoutingLogic( - RoutingLogic.SESSION_BASED, - session_key = args.session_key + RoutingLogic.SESSION_BASED, session_key=args.session_key ) else: raise ValueError(f"Invalid routing logic: {args.routing_logic}") - + + def log_stats(): while True: time.sleep(10) - logstr = "\n" + "="*50 + "\n" + logstr = "\n" + "=" * 50 + "\n" endpoints = GetServiceDiscovery().get_endpoint_info() engine_stats = GetEngineStatsScraper().get_engine_stats() request_stats = GetRequestStatsMonitor().get_request_stats(time.time()) @@ -300,14 +367,14 @@ def log_stats(): logstr += f" Engine stats: {engine_stats[url]}\n" else: logstr += f" Engine stats: No stats available\n" - + if url in request_stats: logstr += f" Request Stats: {request_stats[url]}\n" else: logstr += f" Request Stats: No stats available\n" logstr += "-" * 50 + "\n" - logstr += "="*50 + "\n" + logstr += "=" * 50 + "\n" logger.info(logstr) diff --git a/src/vllm_router/routing_logic.py b/src/vllm_router/routing_logic.py index 66c4744b..fb115a8c 100644 --- a/src/vllm_router/routing_logic.py +++ b/src/vllm_router/routing_logic.py @@ -1,31 +1,33 @@ -from typing import List, Optional, Dict -import hashlib import abc import enum +import hashlib +from typing import Dict, List, Optional from fastapi import Request from uhashring import HashRing -from vllm_router.service_discovery import EndpointInfo from vllm_router.engine_stats import EngineStats -from vllm_router.request_stats import RequestStats - from vllm_router.log import init_logger +from vllm_router.request_stats import RequestStats +from vllm_router.service_discovery import EndpointInfo logger = init_logger(__name__) + class RoutingLogic(enum.Enum): ROUND_ROBIN = "round-robin" SESSION_BASED = "session" -class RoutingInterface(metaclass = abc.ABCMeta): + +class RoutingInterface(metaclass=abc.ABCMeta): @abc.abstractmethod def route_request( - self, - endpoints: List[EndpointInfo], - engine_stats: Dict[str, EngineStats], - request_stats: Dict[str, RequestStats], - request: Request) -> str: + self, + endpoints: List[EndpointInfo], + engine_stats: Dict[str, EngineStats], + request_stats: Dict[str, RequestStats], + request: Request, + ) -> str: """ Route the request to the appropriate engine URL @@ -39,18 +41,20 @@ def route_request( """ raise NotImplementedError + class RoundRobinRouter(RoutingInterface): # TODO (ApostaC): when available engines in the endpoints changes, the - # algorithm may not be "perfectly" round-robin. + # algorithm may not be "perfectly" round-robin. def __init__(self): self.req_id = 0 def route_request( - self, - endpoints: List[EndpointInfo], - engine_stats: Dict[str, EngineStats], - request_stats: Dict[str, RequestStats], - request: Request) -> str: + self, + endpoints: List[EndpointInfo], + engine_stats: Dict[str, EngineStats], + request_stats: Dict[str, RequestStats], + request: Request, + ) -> str: """ Route the request to the appropriate engine URL using a simple round-robin algorithm @@ -64,24 +68,25 @@ def route_request( request (Request): The incoming request """ len_engines = len(endpoints) - ret = sorted(endpoints, - key = lambda e: e.url)[self.req_id % len_engines] + ret = sorted(endpoints, key=lambda e: e.url)[self.req_id % len_engines] self.req_id += 1 return ret.url + class SessionRouter(RoutingInterface): """ Route the request to the appropriate engine URL based on the session key in the request headers """ + def __init__(self, session_key: str): self.session_key = session_key # Map from session ID to engine URL self.hash_ring = HashRing() - def _qps_routing(self, - endpoints: List[EndpointInfo], - request_stats: Dict[str, RequestStats]) -> str: + def _qps_routing( + self, endpoints: List[EndpointInfo], request_stats: Dict[str, RequestStats] + ) -> str: """ Route the request to the appropriate engine URL based on the QPS of each engine @@ -95,14 +100,14 @@ def _qps_routing(self, for info in endpoints: url = info.url if url not in request_stats: - return url # This engine does not have any requests + return url # This engine does not have any requests request_stat = request_stats[url] if request_stat.qps < lowest_qps: lowest_qps = request_stat.qps ret = url return ret - def _update_hash_ring(self, endpoints: List['EndpointInfo']): + def _update_hash_ring(self, endpoints: List["EndpointInfo"]): """ Update the hash ring with the current list of endpoints. """ @@ -124,11 +129,12 @@ def _update_hash_ring(self, endpoints: List['EndpointInfo']): self.hash_ring.add_node(node) def route_request( - self, - endpoints: List[EndpointInfo], - engine_stats: Dict[str, EngineStats], - request_stats: Dict[str, RequestStats], - request: Request) -> str: + self, + endpoints: List[EndpointInfo], + engine_stats: Dict[str, EngineStats], + request_stats: Dict[str, RequestStats], + request: Request, + ) -> str: """ Route the request to the appropriate engine URL by the 'session id' in the request headers. @@ -158,9 +164,10 @@ def route_request( return url + def InitializeRoutingLogic( - routing_logic: RoutingLogic, - *args, **kwargs) -> RoutingInterface: + routing_logic: RoutingLogic, *args, **kwargs +) -> RoutingInterface: """ Initialize the routing logic based on the routing_logic string diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index a4d17e35..77ad2120 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -1,21 +1,25 @@ -from typing import List, Dict, Optional -import threading import abc import enum +import threading import time +from dataclasses import dataclass +from typing import Dict, List, Optional + import requests from kubernetes import client, config, watch -from dataclasses import dataclass from vllm_router.log import init_logger + logger = init_logger(__name__) _global_service_discovery: "Optional[ServiceDiscovery]" = None + class ServiceDiscoveryType(enum.Enum): STATIC = "static" K8S = "k8s" + @dataclass class EndpointInfo: # Endpoint's url @@ -27,7 +31,8 @@ class EndpointInfo: # Added timestamp added_timestamp: float -class ServiceDiscovery(metaclass = abc.ABCMeta): + +class ServiceDiscovery(metaclass=abc.ABCMeta): @abc.abstractmethod def get_endpoint_info(self) -> List[EndpointInfo]: """ @@ -51,8 +56,7 @@ def get_health(self) -> bool: class StaticServiceDiscovery(ServiceDiscovery): def __init__(self, urls: List[str], models: List[str]): - assert len(urls) == len(models), \ - "URLs and models should have the same length" + assert len(urls) == len(models), "URLs and models should have the same length" self.urls = urls self.models = models self.added_timestamp = int(time.time()) @@ -65,14 +69,16 @@ def get_endpoint_info(self) -> List[EndpointInfo]: Returns: a list of engine URLs """ - return [EndpointInfo(url, model, self.added_timestamp) \ - for url, model in zip(self.urls, self.models)] + return [ + EndpointInfo(url, model, self.added_timestamp) + for url, model in zip(self.urls, self.models) + ] class K8sServiceDiscovery(ServiceDiscovery): - def __init__(self, namespace: str, port: str, label_selector = None): + def __init__(self, namespace: str, port: str, label_selector=None): """ - Initialize the Kubernetes service discovery module. This module + Initialize the Kubernetes service discovery module. This module assumes all serving engine pods are in the same namespace, listening on the same port, and have the same label selector. @@ -100,15 +106,13 @@ def __init__(self, namespace: str, port: str, label_selector = None): self.k8s_watcher = watch.Watch() # Start watching engines - self.watcher_thread = threading.Thread( - target=self._watch_engines, - daemon=True) + self.watcher_thread = threading.Thread(target=self._watch_engines, daemon=True) self.watcher_thread.start() @staticmethod def _check_pod_ready(container_statuses): """ - Check if all containers in the pod are ready by reading the + Check if all containers in the pod are ready by reading the k8s container statuses. """ if not container_statuses: @@ -138,7 +142,6 @@ def _get_model_name(self, pod_ip) -> Optional[str]: return model_name - def _watch_engines(self): # TODO (ApostaC): Add error handling @@ -156,12 +159,15 @@ def _watch_engines(self): model_name = self._get_model_name(pod_ip) else: model_name = None - self._on_engine_update(pod_name, pod_ip, event_type, - is_pod_ready, model_name) + self._on_engine_update( + pod_name, pod_ip, event_type, is_pod_ready, model_name + ) def _add_engine(self, engine_name: str, engine_ip: str, model_name: str): - logger.info(f"Discovered new serving engine {engine_name} at " - f"{engine_ip}, running model: {model_name}") + logger.info( + f"Discovered new serving engine {engine_name} at " + f"{engine_ip}, running model: {model_name}" + ) with self.available_engines_lock: self.available_engines[engine_name] = EndpointInfo( url=f"http://{engine_ip}:{self.port}", @@ -175,13 +181,13 @@ def _delete_engine(self, engine_name: str): del self.available_engines[engine_name] def _on_engine_update( - self, - engine_name: str, - engine_ip: Optional[str], - event: str, - is_pod_ready: bool, - model_name: Optional[str], - ) -> None: + self, + engine_name: str, + engine_ip: Optional[str], + event: str, + is_pod_ready: bool, + model_name: Optional[str], + ) -> None: if event == "ADDED": if engine_ip is None: return @@ -199,7 +205,7 @@ def _on_engine_update( return self._delete_engine(engine_name) - + elif event == "MODIFIED": if engine_ip is None: return @@ -208,10 +214,11 @@ def _on_engine_update( self._add_engine(engine_name, engine_ip, model_name) return - if (not is_pod_ready or model_name is None) and \ - engine_name in self.available_engines: + if ( + not is_pod_ready or model_name is None + ) and engine_name in self.available_engines: self._delete_engine(engine_name) - return + return def get_endpoint_info(self) -> List[EndpointInfo]: """ @@ -233,9 +240,10 @@ def get_health(self) -> bool: """ return self.watcher_thread.is_alive() + def InitializeServiceDiscovery( - service_discovery_type: ServiceDiscoveryType, - *args, **kwargs) -> ServiceDiscovery: + service_discovery_type: ServiceDiscoveryType, *args, **kwargs +) -> ServiceDiscovery: """ Initialize the service discovery module with the given type and arguments. @@ -264,6 +272,7 @@ def InitializeServiceDiscovery( return _global_service_discovery + def GetServiceDiscovery() -> ServiceDiscovery: """ Get the initialized service discovery module. @@ -280,16 +289,20 @@ def GetServiceDiscovery() -> ServiceDiscovery: return _global_service_discovery + if __name__ == "__main__": # Test the service discovery - #k8s_sd = K8sServiceDiscovery("default", 8000, "release=test") - InitializeServiceDiscovery(ServiceDiscoveryType.K8S, - namespace = "default", - port = 8000, - label_selector = "release=test") + # k8s_sd = K8sServiceDiscovery("default", 8000, "release=test") + InitializeServiceDiscovery( + ServiceDiscoveryType.K8S, + namespace="default", + port=8000, + label_selector="release=test", + ) k8s_sd = GetServiceDiscovery() import time + time.sleep(1) while True: urls = k8s_sd.get_endpoint_info() diff --git a/src/vllm_router/utils.py b/src/vllm_router/utils.py index dfba648e..1da7e1ff 100644 --- a/src/vllm_router/utils.py +++ b/src/vllm_router/utils.py @@ -1,5 +1,6 @@ import re + def validate_url(url: str) -> bool: """ Validates the format of the given URL. @@ -11,11 +12,11 @@ def validate_url(url: str) -> bool: bool: True if the URL is valid, False otherwise. """ regex = re.compile( - r'^(http|https)://' # Protocol - r'(([a-zA-Z0-9_-]+\.)+[a-zA-Z]{2,}|' # Domain name - r'localhost|' # Or localhost - r'\d{1,3}(\.\d{1,3}){3})' # Or IPv4 address - r'(:\d+)?' # Optional port - r'(/.*)?$' # Optional path + r"^(http|https)://" # Protocol + r"(([a-zA-Z0-9_-]+\.)+[a-zA-Z]{2,}|" # Domain name + r"localhost|" # Or localhost + r"\d{1,3}(\.\d{1,3}){3})" # Or IPv4 address + r"(:\d+)?" # Optional port + r"(/.*)?$" # Optional path ) return bool(regex.match(url)) diff --git a/tutorials/00-install-kubernetes-env.md b/tutorials/00-install-kubernetes-env.md index 0fc2fd47..b8977355 100644 --- a/tutorials/00-install-kubernetes-env.md +++ b/tutorials/00-install-kubernetes-env.md @@ -4,8 +4,6 @@ This tutorial guides you through the process of setting up a Kubernetes environment on a GPU-enabled server. We will install and configure `kubectl`, `helm`, and `minikube`, ensuring GPU compatibility for workloads requiring accelerated computing. By the end of this tutorial, you will have a fully functional Kubernetes environment ready for deploy the vLLM Production Stack. ---- - ## Table of Contents - [Introduction](#introduction) @@ -17,8 +15,6 @@ This tutorial guides you through the process of setting up a Kubernetes environm - [Step 3: Installing Minikube with GPU Support](#step-3-installing-minikube-with-gpu-support) - [Step 4: Verifying GPU Configuration](#step-4-verifying-gpu-configuration) ---- - ## Prerequisites Before you begin, ensure the following: @@ -35,8 +31,6 @@ Before you begin, ensure the following: - A Linux-based operating system (e.g., Ubuntu 20.04 or later). - Basic understanding of Linux shell commands. ---- - ## Steps ### Step 1: Installing kubectl @@ -71,8 +65,6 @@ Before you begin, ensure the following: Client Version: v1.32.1 ``` ---- - ### Step 2: Installing Helm 1. Execute the script `install-helm.sh`: @@ -99,8 +91,6 @@ Before you begin, ensure the following: version.BuildInfo{Version:"v3.17.0", GitCommit:"301108edc7ac2a8ba79e4ebf5701b0b6ce6a31e4", GitTreeState:"clean", GoVersion:"go1.23.4"} ``` ---- - ### Step 3: Installing Minikube with GPU Support 1. Execute the script `install-minikube-cluster.sh`: @@ -116,6 +106,7 @@ Before you begin, ensure the following: 3. **Expected Output:** If everything goes smoothly, you should see the example output like following: + ```plaintext πŸ˜„ minikube v1.35.0 on Ubuntu 22.04 (kvm/amd64) ❗ minikube skips various validations when --force is supplied; this may lead to unexpected behavior @@ -135,8 +126,6 @@ Before you begin, ensure the following: TEST SUITE: None ``` ---- - ### Step 4: Verifying GPU Configuration 1. Ensure Minikube is running: @@ -145,7 +134,7 @@ Before you begin, ensure the following: sudo minikube status ``` - Expected Output: + Expected output: ```plaintext minikube @@ -162,7 +151,7 @@ Before you begin, ensure the following: sudo kubectl describe nodes | grep -i gpu ``` - Expected Output: + Expected output: ```plaintext nvidia.com/gpu: 1 @@ -181,12 +170,12 @@ Before you begin, ensure the following: sudo kubectl logs gpu-test ``` - You should see the nvidia-smi output from the terminal ---- + You should see the nvidia-smi output from the terminal ## Conclusion By following this tutorial, you have successfully set up a Kubernetes environment with GPU support on your server. You are now ready to deploy and test vLLM Production Stack on Kubernetes. For further configuration and workload-specific setups, consult the official documentation for `kubectl`, `helm`, and `minikube`. -What's next: +What's next: + - [01-minimal-helm-installation](https://github.com/vllm-project/production-stack/blob/main/tutorials/01-minimal-helm-installation.md) diff --git a/tutorials/01-minimal-helm-installation.md b/tutorials/01-minimal-helm-installation.md index 2db2e76f..bf9995a7 100644 --- a/tutorials/01-minimal-helm-installation.md +++ b/tutorials/01-minimal-helm-installation.md @@ -1,9 +1,11 @@ # Tutorial: Minimal Setup of the vLLM Production Stack ## Introduction + This tutorial guides you through a minimal setup of the vLLM Production Stack using one vLLM instance with the `facebook/opt-125m` model. By the end of this tutorial, you will have a working deployment of vLLM on a Kubernetes environment with GPU. ## Table of Contents + - [Introduction](#introduction) - [Table of Contents](#table-of-contents) - [Prerequisites](#prerequisites) @@ -12,11 +14,12 @@ This tutorial guides you through a minimal setup of the vLLM Production Stack us - [2. Validate Installation](#2-validate-installation) - [3. Send a Query to the Stack](#3-send-a-query-to-the-stack) - [3.1. Forward the Service Port](#31-forward-the-service-port) - - [3.2. Query the OpenAI-Compatible API](#32-query-the-openai-compatible-api) + - [3.2. Query the OpenAI-Compatible API to list the available models](#32-query-the-openai-compatible-api-to-list-the-available-models) - [3.3. Query the OpenAI Completion Endpoint](#33-query-the-openai-completion-endpoint) - [4. Uninstall](#4-uninstall) ## Prerequisites + 1. A Kubernetes environment with GPU support. If not set up, follow the [00-install-kubernetes-env](00-install-kubernetes-env.md) guide. 2. Helm installed. Refer to the [install-helm.sh](install-helm.sh) script for instructions. 3. kubectl installed. Refer to the [install-kubectl.sh](install-kubectl.sh) script for instructions. @@ -27,7 +30,8 @@ This tutorial guides you through a minimal setup of the vLLM Production Stack us ### 1. Deploy vLLM Instance -#### Step 1.1: Use Predefined Configuration +#### 1.1: Use Predefined Configuration + The vLLM Production Stack repository provides a predefined configuration file, `values-01-minimal-example.yaml`, located at `tutorials/assets/values-01-minimal-example.yaml`. This file contains the following content: ```yaml @@ -48,6 +52,7 @@ servingEngineSpec: ``` Explanation of the key fields: + - **`modelSpec`**: Defines the model configuration, including: - `name`: A name for the model deployment. - `repository`: Docker repository hosting the model image. @@ -58,47 +63,63 @@ Explanation of the key fields: - **`requestGPU`**: Specifies the number of GPUs required. - **`pvcStorage`**: Allocates persistent storage for the model. -#### Step 1.2: Deploy the Helm Chart +#### 1.2: Deploy the Helm Chart + Deploy the Helm chart using the predefined configuration file: + ```bash helm repo add vllm https://vllm-project.github.io/production-stack helm install vllm vllm/production-stack -f tutorials/assets/values-01-minimal-example.yaml ``` + Explanation of the command: + - `vllm` in the first command: The Helm repository. - `vllm` in the second command: The name of the Helm release. - `-f tutorials/assets/values-01-minimal-example.yaml`: Specifies the predefined configuration file. ### 2. Validate Installation -#### Step 2.1: Monitor Deployment Status +#### 2.1: Monitor Deployment Status + Monitor the deployment status using: + ```bash sudo kubectl get pods ``` + Expected output: + - Pods for the `vllm` deployment should transition to `Ready` and the `Running` state. -``` + +```plaintext NAME READY STATUS RESTARTS AGE vllm-deployment-router-859d8fb668-2x2b7 1/1 Running 0 2m38s vllm-opt125m-deployment-vllm-84dfc9bd7-vb9bs 1/1 Running 0 2m38s ``` + _Note_: It may take some time for the containers to download the Docker images and LLM weights. ### 3. Send a Query to the Stack -#### Step 3.1: Forward the Service Port +#### 3.1: Forward the Service Port + Expose the `vllm-router-service` port to the host machine: + ```bash sudo kubectl port-forward svc/vllm-router-service 30080:80 ``` -#### Step 3.2: Query the OpenAI-Compatible API to list the available models +#### 3.2: Query the OpenAI-Compatible API to list the available models + Test the stack's OpenAI-compatible API by querying the available models: + ```bash curl -o- http://localhost:30080/models ``` + Expected output: + ```json { "object": "list", @@ -114,8 +135,10 @@ Expected output: } ``` -#### Step 3.3: Query the OpenAI Completion Endpoint +#### 3.3: Query the OpenAI Completion Endpoint + Send a query to the OpenAI `/completion` endpoint to generate a completion for a prompt: + ```bash curl -X POST http://localhost:30080/completions \ -H "Content-Type: application/json" \ @@ -125,7 +148,9 @@ curl -X POST http://localhost:30080/completions \ "max_tokens": 10 }' ``` + Expected output: + ```json { "id": "completion-id", @@ -141,12 +166,13 @@ Expected output: ] } ``` + This demonstrates the model generating a continuation for the provided prompt. ### 4. Uninstall To remove the deployment, run: + ```bash sudo helm uninstall vllm ``` - diff --git a/tutorials/02-basic-vllm-config.md b/tutorials/02-basic-vllm-config.md index 28efbda7..036e8170 100644 --- a/tutorials/02-basic-vllm-config.md +++ b/tutorials/02-basic-vllm-config.md @@ -1,15 +1,18 @@ # Tutorial: Basic vLLM Configurations ## Introduction + This tutorial guides you through the basic configurations required to deploy a vLLM serving engine in a Kubernetes environment with GPU support. You will learn how to specify the model details, set up necessary environment variables (like `HF_TOKEN`), and launch the vLLM serving engine. ## Table of Contents + 1. [Prerequisites](#prerequisites) 2. [Step 1: Preparing the Configuration File](#step-1-preparing-the-configuration-file) 3. [Step 2: Applying the Configuration](#step-2-applying-the-configuration) 4. [Step 3: Verifying the Deployment](#step-3-verifying-the-deployment) ## Prerequisites + - A Kubernetes environment with GPU support, as set up in the [00-install-kubernetes-env tutorial](00-install-kubernetes-env.md). - Helm installed on your system. - Access to a HuggingFace token (`HF_TOKEN`). @@ -41,6 +44,7 @@ This tutorial guides you through the basic configurations required to deploy a v - **`env`**: Extra environment variables to pass to the model-serving engine. ### Example Snippet + ```yaml servingEngineSpec: modelSpec: @@ -68,14 +72,15 @@ servingEngineSpec: ## Step 2: Applying the Configuration -1. Deploy the configuration using Helm: +Deploy the configuration using Helm: ```bash helm repo add vllm https://vllm-project.github.io/production-stack helm install vllm vllm/production-stack -f tutorials/assets/values-02-basic-config.yaml ``` -### Expected Output +Expected output: + You should see output indicating the successful deployment of the Helm chart: ```plaintext @@ -91,54 +96,56 @@ REVISION: 1 1. Check the status of the pods: -```bash -sudo kubectl get pods -``` + ```bash + sudo kubectl get pods + ``` -### Expected Output -You should see the following pods: + Expected output: -```plaintext -NAME READY STATUS RESTARTS AGE -pod/llmstack-deployment-router-xxxx-xxxx 1/1 Running 0 3m23s -llmstack-llama3-deployment-vllm-xxxx-xxxx 1/1 Running 0 3m23s -``` + You should see the following pods: -- The `llmstack-deployment-router` pod acts as the router, managing requests and routing them to the appropriate model-serving pod. -- The `llmstack-llama3-deployment-vllm` pod serves the actual model for inference. + ```plaintext + NAME READY STATUS RESTARTS AGE + pod/llmstack-deployment-router-xxxx-xxxx 1/1 Running 0 3m23s + llmstack-llama3-deployment-vllm-xxxx-xxxx 1/1 Running 0 3m23s + ``` + + - The `llmstack-deployment-router` pod acts as the router, managing requests and routing them to the appropriate model-serving pod. + - The `llmstack-llama3-deployment-vllm` pod serves the actual model for inference. 2. Verify the service is exposed correctly: -```bash -sudo kubectl get services -``` + ```bash + sudo kubectl get services + ``` -### Expected Output -Ensure there are services for both the serving engine and the router: + Expected output: -```plaintext -NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE -llmstack-engine-service ClusterIP 10.103.98.170 80/TCP 4m -llmstack-router-service ClusterIP 10.103.110.107 80/TCP 4m -``` + Ensure there are services for both the serving engine and the router: -- The `llmstack-engine-service` exposes the serving engine. -- The `llmstack-router-service` handles routing and load balancing across model-serving pods. + ```plaintext + NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE + llmstack-engine-service ClusterIP 10.103.98.170 80/TCP 4m + llmstack-router-service ClusterIP 10.103.110.107 80/TCP 4m + ``` + + - The `llmstack-engine-service` exposes the serving engine. + - The `llmstack-router-service` handles routing and load balancing across model-serving pods. 3. Test the health endpoint: -```bash -curl http:///health -``` + ```bash + curl http:///health + ``` -Replace `` with the external IP of the service. If everything is configured correctly, you will get: + Replace `` with the external IP of the service. If everything is configured correctly, you will get: -```plaintext -{"status":"healthy"} -``` + ```plaintext + {"status":"healthy"} + ``` Please refer to Step 3 in the [01-minimal-helm-installation](01-minimal-helm-installation.md) tutorial for querying the deployed vLLM service. ## Conclusion -In this tutorial, you configured and deployed a vLLM serving engine with GPU support in a Kubernetes environment. You also learned how to verify its deployment and ensure it is running as expected. For further customization, refer to the `values.yaml` file and Helm chart documentation. +In this tutorial, you configured and deployed a vLLM serving engine with GPU support in a Kubernetes environment. You also learned how to verify its deployment and ensure it is running as expected. For further customization, refer to the `values.yaml` file and Helm chart documentation. diff --git a/tutorials/03-load-model-from-pv.md b/tutorials/03-load-model-from-pv.md index b55d1661..e133ef65 100644 --- a/tutorials/03-load-model-from-pv.md +++ b/tutorials/03-load-model-from-pv.md @@ -1,15 +1,18 @@ -# Tutorial: Loading Model Weights from Persistent Volume +# Tutorial: Loading Model Weights from Persistent Volume ## Introduction + In this tutorial, you will learn how to load a model from a Persistent Volume (PV) in Kubernetes to optimize deployment performance. The steps include creating a PV, matching it using `pvcMatchLabels`, and deploying the Helm chart to utilize the PV. You will also verify the setup by examining the contents and measuring performance improvements. ## Table of Contents + 1. [Prerequisites](#prerequisites) 2. [Step 1: Creating a Persistent Volume](#step-1-creating-a-persistent-volume) 3. [Step 2: Deploying with Helm Using the PV](#step-2-deploying-with-helm-using-the-pv) 4. [Step 3: Verifying the Deployment](#step-3-verifying-the-deployment) ## Prerequisites + - A running Kubernetes cluster with GPU support. - Completion of previous tutorials: - [00-install-kubernetes-env.md](00-install-kubernetes-env.md) @@ -21,95 +24,95 @@ In this tutorial, you will learn how to load a model from a Persistent Volume (P 1. Locate the persistent Volume manifest file at `tutorials/assets/pv-03.yaml`) with the following content: -```yaml -apiVersion: v1 -kind: PersistentVolume -metadata: - name: test-vllm-pv - labels: - model: "llama3-pv" -spec: - capacity: - storage: 50Gi - accessModes: - - ReadWriteOnce - persistentVolumeReclaimPolicy: Retain - storageClassName: standard - hostPath: - path: /data/llama3 -``` - -> **Note:** You can change the path specified in the `hostPath` field to any valid directory on your Kubernetes node. + ```yaml + apiVersion: v1 + kind: PersistentVolume + metadata: + name: test-vllm-pv + labels: + model: "llama3-pv" + spec: + capacity: + storage: 50Gi + accessModes: + - ReadWriteOnce + persistentVolumeReclaimPolicy: Retain + storageClassName: standard + hostPath: + path: /data/llama3 + ``` + + > **Note:** You can change the path specified in the `hostPath` field to any valid directory on your Kubernetes node. 2. Apply the manifest: -```bash -sudo kubectl apply -f tutorials/assets/pv-03.yaml -``` + ```bash + sudo kubectl apply -f tutorials/assets/pv-03.yaml + ``` 3. Verify the PV is created: -```bash -sudo kubectl get pv -``` + ```bash + sudo kubectl get pv + ``` -### Expected Output + Expected output: -```plaintext -NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS AGE -test-vllm-pv 50Gi RWO Retain Available standard 2m -``` + ```plaintext + NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS AGE + test-vllm-pv 50Gi RWO Retain Available standard 2m + ``` ## Step 2: Deploying with Helm Using the PV 1. Locate the example values file at `tutorials/assets/values-03-match-pv.yaml` with the following content: -```yaml -servingEngineSpec: - modelSpec: - - name: "llama3" - repository: "vllm/vllm-openai" - tag: "latest" - modelURL: "meta-llama/Llama-3.1-8B-Instruct" - replicaCount: 1 + ```yaml + servingEngineSpec: + modelSpec: + - name: "llama3" + repository: "vllm/vllm-openai" + tag: "latest" + modelURL: "meta-llama/Llama-3.1-8B-Instruct" + replicaCount: 1 - requestCPU: 10 - requestMemory: "16Gi" - requestGPU: 1 + requestCPU: 10 + requestMemory: "16Gi" + requestGPU: 1 - pvcStorage: "50Gi" - pvcMatchLabels: - model: "llama3-pv" + pvcStorage: "50Gi" + pvcMatchLabels: + model: "llama3-pv" - vllmConfig: - maxModelLen: 4096 + vllmConfig: + maxModelLen: 4096 - hf_token: -``` + hf_token: + ``` -> **Explanation:** The `pvcMatchLabels` field specifies the labels to match an existing Persistent Volume. In this example, it ensures that the deployment uses the PV with the label `model: "llama3-pv"`. This provides a way to link a specific PV to your application. + > **Explanation:** The `pvcMatchLabels` field specifies the labels to match an existing Persistent Volume. In this example, it ensures that the deployment uses the PV with the label `model: "llama3-pv"`. This provides a way to link a specific PV to your application. -> **Note:** Make sure to replace `` with your actual Hugging Face token in the yaml. + > **Note:** Make sure to replace `` with your actual Hugging Face token in the yaml. 2. Deploy the Helm chart: -```bash -helm install vllm vllm/production-stack -f tutorials/assets/values-03-match-pv.yaml -``` + ```bash + helm install vllm vllm/production-stack -f tutorials/assets/values-03-match-pv.yaml + ``` 3. Verify the deployment: -```bash -sudo kubectl get pods -``` + ```bash + sudo kubectl get pods + ``` -### Expected Output + Expected output: -```plaintext -NAME READY STATUS RESTARTS AGE -llmstack-deployment-router-xxxx-xxxx 1/1 Running 0 1m -llmstack-llama3-deployment-vllm-xxxx-xxxx 1/1 Running 0 1m -``` + ```plaintext + NAME READY STATUS RESTARTS AGE + llmstack-deployment-router-xxxx-xxxx 1/1 Running 0 1m + llmstack-llama3-deployment-vllm-xxxx-xxxx 1/1 Running 0 1m + ``` ## Step 3: Verifying the Deployment @@ -128,23 +131,26 @@ llmstack-llama3-deployment-vllm-xxxx-xxxx 1/1 Running 0 1m ls /data/llama3/hub ``` -### Expected Output -You should see the model files loaded into the directory: + Expected output: -```plaintext -models--meta-llama--Llama-3.1-8B-Instruct version.txt -``` + You should see the model files loaded into the directory: + + ```plaintext + models--meta-llama--Llama-3.1-8B-Instruct version.txt + ``` 2. Uninstall and reinstall the deployment to observe faster startup: -```bash -sudo helm uninstall llmstack -sudo kubectl delete -f tutorials/assets/pv-03.yaml && sudo kubectl apply -f tutorials/assets/pv-03.yaml -helm install vllm vllm/production-stack -f tutorials/assets/values-03-match-pv.yaml -``` + ```bash + sudo helm uninstall llmstack + sudo kubectl delete -f tutorials/assets/pv-03.yaml && sudo kubectl apply -f tutorials/assets/pv-03.yaml + helm install vllm vllm/production-stack -f tutorials/assets/values-03-match-pv.yaml + ``` ### Explanation + - During the second installation, the serving engine starts faster because the model files are already loaded into the Persistent Volume. ## Conclusion + In this tutorial, you learned how to utilize a Persistent Volume to store model weights for a vLLM serving engine. This approach optimizes deployment performance and demonstrates the benefits of Kubernetes storage resources. Continue exploring advanced configurations in future tutorials. diff --git a/tutorials/04-launch-multiple-model.md b/tutorials/04-launch-multiple-model.md index d3a4decd..1ff574b8 100644 --- a/tutorials/04-launch-multiple-model.md +++ b/tutorials/04-launch-multiple-model.md @@ -1,9 +1,11 @@ # Tutorial: Launching Multiple Models in vLLM Production Stack ## Introduction + This tutorial demonstrates how to deploy multiple vLLM instances that serve different models on a Kubernetes cluster using vLLM Production Stack. By utilizing the `modelSpec` field in the Helm chart's `values.yaml`, you can configure multiple models to run on different GPUs. You will also learn how to verify the deployment and query the models. ## Table of Contents + 1. [Prerequisites](#prerequisites) 2. [Step 1: Configuring Multiple Models](#step-1-configuring-multiple-models) 3. [Step 2: Deploying the Helm Chart](#step-2-deploying-the-helm-chart) @@ -11,6 +13,7 @@ This tutorial demonstrates how to deploy multiple vLLM instances that serve diff 5. [Step 4: Querying the Models Using Python](#step-4-querying-the-models-using-python) ## Prerequisites + - A Kubernetes environment with at least 2 GPUs. - Completion of the following tutorials: - [00-install-kubernetes-env.md](00-install-kubernetes-env.md) @@ -20,7 +23,7 @@ This tutorial demonstrates how to deploy multiple vLLM instances that serve diff ## Step 1: Configuring Multiple Models -Locate the `tutorials/assets/values-04-multiple-models.yaml` with following contents: +Locate the `tutorials/assets/values-04-multiple-models.yaml` with following contents: ```yaml servingEngineSpec: @@ -54,7 +57,6 @@ servingEngineSpec: > **Note:** Replace `` and `` with your Hugging Face tokens. - ## Step 2: Deploying the Helm Chart Deploy the Helm chart using the customized values file: @@ -67,64 +69,65 @@ helm install vllm vllm/production-stack -f tutorials/assets/values-04-multiple-m 1. Check the running pods to ensure both models are deployed: -```bash -sudo kubectl get pods -``` + ```bash + sudo kubectl get pods + ``` -### Expected Output + Expected output: -```plaintext -NAME READY STATUS RESTARTS AGE -llmstack-deployment-router-xxxxx-xxxxx 1/1 Running 0 90s -llmstack-llama3-deployment-vllm-xxxxx-xxxxx 1/1 Running 0 90s -llmstack-mistral-deployment-vllm-xxxxx-xxxxx 1/1 Running 0 90s -``` + ```plaintext + NAME READY STATUS RESTARTS AGE + llmstack-deployment-router-xxxxx-xxxxx 1/1 Running 0 90s + llmstack-llama3-deployment-vllm-xxxxx-xxxxx 1/1 Running 0 90s + llmstack-mistral-deployment-vllm-xxxxx-xxxxx 1/1 Running 0 90s + ``` -> **Note:** It may take some time for the models to be downloaded before the READY changes to "1/1". + > **Note:** It may take some time for the models to be downloaded before the READY changes to "1/1". 2. Forward the router service port to access it locally: -```bash -sudo kubectl port-forward svc/llmstack-router-service 30080:80 -``` + ```bash + sudo kubectl port-forward svc/llmstack-router-service 30080:80 + ``` -> **Explanation:** We are forwarding the port from the router service, which has a global view of all the vLLM engines running different models. + > **Explanation:** We are forwarding the port from the router service, which has a global view of all the vLLM engines running different models. 3. Query the `/models` endpoint to verify the models: -```bash -curl http://localhost:30080/models -``` - -For details on the `/models` endpoint, refer to the [README.md](README.md). - -### Expected Output - -```json -{ - "object": "list", - "data": [ - { - "id": "mistralai/Mistral-7B-Instruct-v0.2", - "object": "model", - "created": 1737516826, - "owned_by": "vllm", - "root": null - }, - { - "id": "meta-llama/Llama-3.1-8B-Instruct", - "object": "model", - "created": 1737516836, - "owned_by": "vllm", - "root": null - } - ] -} -``` + ```bash + curl http://localhost:30080/models + ``` + + For details on the `/models` endpoint, refer to the [README.md](README.md). + + Expected output: + + ```json + { + "object": "list", + "data": [ + { + "id": "mistralai/Mistral-7B-Instruct-v0.2", + "object": "model", + "created": 1737516826, + "owned_by": "vllm", + "root": null + }, + { + "id": "meta-llama/Llama-3.1-8B-Instruct", + "object": "model", + "created": 1737516836, + "owned_by": "vllm", + "root": null + } + ] + } + ``` ## Step 4: Querying the Models Using Python Use the OpenAI Python API to query the deployed models. We provide a python script at `tutorials/assets/example-04-openai.py` + ```python from openai import OpenAI @@ -155,12 +158,14 @@ for model in models: ``` To run the script: -``` + +```bash pip install openai python3 tutorials/assets/example-04-openai.py ``` You should see outputs like: + ```plaintext Completion results from model: mistralai/Mistral-7B-Instruct-v0.2 2, but what is the result of 1 @@ -171,5 +176,5 @@ Completion results from model: meta-llama/Llama-3.1-8B-Instruct ``` ## Conclusion -In this tutorial, you learned how to deploy and query multiple models using vLLM on Kubernetes. This configuration allows you to utilize multiple GPUs efficiently and serve different models in parallel. Continue exploring advanced features to further optimize your deployment. +In this tutorial, you learned how to deploy and query multiple models using vLLM on Kubernetes. This configuration allows you to utilize multiple GPUs efficiently and serve different models in parallel. Continue exploring advanced features to further optimize your deployment. diff --git a/tutorials/05-offload-kv-cache.md b/tutorials/05-offload-kv-cache.md index 1d45095f..39496144 100644 --- a/tutorials/05-offload-kv-cache.md +++ b/tutorials/05-offload-kv-cache.md @@ -1,11 +1,12 @@ # Tutorial: Offload KV Cache to CPU with LMCache ## Introduction -This tutorial demonstrates how to enable KV cache offloading using LMCache in a vLLM deployment. KV cache offloading moves large KV caches from GPU memory to CPU or disk, enabling more potential KV cache hits. -vLLM Production Stack uses LMCache for KV cache offloading. For more details, see the [LMCache GitHub repository](https://github.com/LMCache/LMCache). +This tutorial demonstrates how to enable KV cache offloading using LMCache in a vLLM deployment. KV cache offloading moves large KV caches from GPU memory to CPU or disk, enabling more potential KV cache hits. +vLLM Production Stack uses LMCache for KV cache offloading. For more details, see the [LMCache GitHub repository](https://github.com/LMCache/LMCache). ## Table of Contents + 1. [Prerequisites](#prerequisites) 2. [Step 1: Configuring KV Cache Offloading](#step-1-configuring-kv-cache-offloading) 3. [Step 2: Deploying the Helm Chart](#step-2-deploying-the-helm-chart) @@ -13,6 +14,7 @@ vLLM Production Stack uses LMCache for KV cache offloading. For more details, se 5. [Benchmark the Performance Gain of CPU Offloading (Work in Progress)](#benchmark-the-performance-gain-of-cpu-offloading-work-in-progress) ## Prerequisites + - Completion of the following tutorials: - [00-install-kubernetes-env.md](00-install-kubernetes-env.md) - [01-minimal-helm-installation.md](01-minimal-helm-installation.md) @@ -63,51 +65,53 @@ helm install vllm vllm/production-stack -f tutorials/assets/values-05-cpu-offloa 1. Check the pod logs to verify LMCache is active: -```bash -sudo kubectl get pods -``` + ```bash + sudo kubectl get pods + ``` -Identify the pod name for the vLLM deployment (e.g., `llmstack-mistral-deployment-vllm-xxxx-xxxx`). Then run: + Identify the pod name for the vLLM deployment (e.g., `llmstack-mistral-deployment-vllm-xxxx-xxxx`). Then run: -```bash -sudo kubectl logs -f -``` + ```bash + sudo kubectl logs -f + ``` -Look for entries in the log indicating LMCache is enabled and operational. An example output is: + Look for entries in the log indicating LMCache is enabled and operational. An example output is: -```plaintext -INFO 01-21 20:16:58 lmcache_connector.py:41] Initializing LMCacheConfig under kv_transfer_config kv_connector='LMCacheConnector' kv_buffer_device='cuda' kv_buffer_size=1000000000.0 kv_role='kv_both' kv_rank=None kv_parallel_size=1 kv_ip='127.0.0.1' kv_port=14579 -INFO LMCache: Creating LMCacheEngine instance vllm-instance [2025-01-21 20:16:58,732] -- /usr/local/lib/python3.12/dist-packages/lmcache/experimental/cache_engine.py:237 -``` + ```plaintext + INFO 01-21 20:16:58 lmcache_connector.py:41] Initializing LMCacheConfig under kv_transfer_config kv_connector='LMCacheConnector' kv_buffer_device='cuda' kv_buffer_size=1000000000.0 kv_role='kv_both' kv_rank=None kv_parallel_size=1 kv_ip='127.0.0.1' kv_port=14579 + INFO LMCache: Creating LMCacheEngine instance vllm-instance [2025-01-21 20:16:58,732] -- /usr/local/lib/python3.12/dist-packages/lmcache/experimental/cache_engine.py:237 + ``` 2. Forward the router service port to access the stack locally: -```bash -sudo kubectl port-forward svc/llmstack-router-service 30080:80 -``` + ```bash + sudo kubectl port-forward svc/llmstack-router-service 30080:80 + ``` 3. Send a request to the stack and observe the logs: -```bash -curl -X POST http://localhost:30080/completions \ - -H "Content-Type: application/json" \ - -d '{ - "model": "mistralai/Mistral-7B-Instruct-v0.2", - "prompt": "Explain the significance of KV cache in language models.", - "max_tokens": 10 - }' -``` + ```bash + curl -X POST http://localhost:30080/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "mistralai/Mistral-7B-Instruct-v0.2", + "prompt": "Explain the significance of KV cache in language models.", + "max_tokens": 10 + }' + ``` -### Expected Output -The response from the stack should contain the completion result, and the logs should show LMCache activity, for example: + Expected output: -```plaintext -DEBUG LMCache: Store skips 0 tokens and then stores 13 tokens [2025-01-21 20:23:45,113] -- /usr/local/lib/python3.12/dist-packages/lmcache/integration/vllm/vllm_adapter.py:490 -``` + The response from the stack should contain the completion result, and the logs should show LMCache activity, for example: + + ```plaintext + DEBUG LMCache: Store skips 0 tokens and then stores 13 tokens [2025-01-21 20:23:45,113] -- /usr/local/lib/python3.12/dist-packages/lmcache/integration/vllm/vllm_adapter.py:490 + ``` ## Benchmark the Performance Gain of CPU Offloading (Work in Progress) + In this section, we will benchmark the performance improvements when using LMCache for CPU offloading. Stay tuned for updates. ## Conclusion -This tutorial demonstrated how to enable KV cache offloading in a vLLM deployment using LMCache. By offloading KV cache to CPU, you can optimize GPU memory usage and improve the scalability of your models. Explore further configurations to tailor LMCache to your workloads. +This tutorial demonstrated how to enable KV cache offloading in a vLLM deployment using LMCache. By offloading KV cache to CPU, you can optimize GPU memory usage and improve the scalability of your models. Explore further configurations to tailor LMCache to your workloads. diff --git a/tutorials/README.md b/tutorials/README.md index 9798fa1e..0d77ba16 100644 --- a/tutorials/README.md +++ b/tutorials/README.md @@ -4,40 +4,36 @@ Welcome to the tutorials for vLLM Production Stack! This series of tutorials is ## Table of Contents -1. [Install Kubernetes Environment](00-install-kubernetes-env.md) +1. [Install Kubernetes Environment](00-install-kubernetes-env.md) Learn how to set up a Kubernetes environment as the foundation for running vLLM Production Stack. -2. [Minimal Helm Installation](01-minimal-helm-installation.md) +2. [Minimal Helm Installation](01-minimal-helm-installation.md) A step-by-step guide for deploying vLLM Production Stack using Helm with minimal configuration. -3. [Basic vLLM Configuration](02-basic-vllm-config.md) +3. [Basic vLLM Configuration](02-basic-vllm-config.md) Learn how to customize vLLM options when using vLLM Production Stack. -4. [Load Model from Persistent Volume](03-load-model-from-pv.md) +4. [Load Model from Persistent Volume](03-load-model-from-pv.md) Discover how to load models from a persistent volume to ensure efficient resource usage. -5. [Launch Multiple Models](04-launch-multiple-model.md) +5. [Launch Multiple Models](04-launch-multiple-model.md) Learn how to deploy and manage multiple models simultaneously in your vLLM environment. -6. [Offload KV Cache](05-offload-kv-cache.md) +6. [Offload KV Cache](05-offload-kv-cache.md) Understand how to offload the KV cache to CPU to improve the performance in production use cases. ---- - ## Getting Started These tutorials are designed to be followed sequentially for beginners, but you can also jump to a specific tutorial based on your needs. Each tutorial includes: + - Prerequisites - Detailed steps - Commands to execute - Expected outputs - Explanations to enhance your understanding ---- - ## Feedback and Contributions If you encounter any issues or have suggestions for improving these tutorials, feel free to contribute by opening a pull request or an issue on our [GitHub repository](https://github.com/vllm-project/production-stack). Happy learning! - diff --git a/tutorials/assets/example-04-openai.py b/tutorials/assets/example-04-openai.py index 995ea374..f4329a1a 100644 --- a/tutorials/assets/example-04-openai.py +++ b/tutorials/assets/example-04-openai.py @@ -17,8 +17,9 @@ model=model.id, prompt="The result of 1 + 1 is ", echo=False, - temperature = 0, - max_tokens = 10) + temperature=0, + max_tokens=10, + ) print("Completion results from model: ", model.id) print(completion.choices[0].text) diff --git a/utils/install-helm.sh b/utils/install-helm.sh index 91cd0436..0abd5b98 100644 --- a/utils/install-helm.sh +++ b/utils/install-helm.sh @@ -1,6 +1,6 @@ #!/bin/bash -set -e +set -e helm_exists() { which helm > /dev/null 2>&1 diff --git a/utils/install-kubectl.sh b/utils/install-kubectl.sh index e1f33f41..fe4cdfcc 100644 --- a/utils/install-kubectl.sh +++ b/utils/install-kubectl.sh @@ -1,6 +1,6 @@ #!/bin/bash -set -e +set -e kubectl_exists() { command -v kubectl >/dev/null 2>&1 diff --git a/utils/install-minikube-cluster.sh b/utils/install-minikube-cluster.sh index 77403059..b076bb92 100644 --- a/utils/install-minikube-cluster.sh +++ b/utils/install-minikube-cluster.sh @@ -34,5 +34,3 @@ sudo helm install --wait --generate-name \ -n gpu-operator --create-namespace \ nvidia/gpu-operator \ --version=v24.9.1 - -