diff --git a/observability/README.md b/observability/README.md index 086d2ebb..9b79d73e 100644 --- a/observability/README.md +++ b/observability/README.md @@ -10,13 +10,21 @@ The observability stack is based on [kube-prom-stack](https://github.com/prometh To launch the observability stack: +Make sure to have: + +- A running Kubernetes (K8s) environment with GPUs + - Run `cd utils && bash install-minikube-cluster.sh` + - Or follow our [tutorial](tutorials/00-install-kubernetes-env.md) + +After that you can run: + ```bash sudo bash install.sh ``` After installing, the dashboard can be accessed through the service `service/kube-prom-stack-grafana` in the `monitoring` namespace. -## Access the Grafana dashboard +## Access the Grafana & Prometheus dashboard Forward the Grafana dashboard port to the local node-port @@ -24,6 +32,12 @@ Forward the Grafana dashboard port to the local node-port sudo kubectl --namespace monitoring port-forward svc/kube-prom-stack-grafana 3000:80 --address 0.0.0.0 ``` +Forward the Prometheus dashboard + +```bash +sudo kubectl --namespace monitoring port-forward prometheus-kube-prom-stack-kube-prome-prometheus-0 9090:9090 +``` + Open the webpage at `http://:3000` to access the Grafana web page. The default user name is `admin` and the password can be configured in `values.yaml` (default is `prom-operator`). Import the dashboard using the `vllm-dashboard.json` in this folder. diff --git a/observability/install.sh b/observability/install.sh index 93340993..bf606544 100644 --- a/observability/install.sh +++ b/observability/install.sh @@ -1,7 +1,7 @@ #!/bin/bash helm repo add prometheus-community https://prometheus-community.github.io/helm-charts -helm install kube-prom-stack prometheus-community/kube-prometheus-stack \ +helm upgrade --install kube-prom-stack prometheus-community/kube-prometheus-stack \ --namespace monitoring \ --create-namespace \ -f values.yaml diff --git a/observability/vllm-dashboard.json b/observability/vllm-dashboard.json index 73b9ca11..c3201efd 100644 --- a/observability/vllm-dashboard.json +++ b/observability/vllm-dashboard.json @@ -9,7 +9,7 @@ }, "enable": true, "hide": true, - "iconColor": "rgba(0, 211, 255, 1)", + "iconColor": "rgba(0,211,255,1)", "name": "Annotations & Alerts", "type": "dashboard" } @@ -22,53 +22,40 @@ "links": [], "panels": [ { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, - "description": "Number of healthy vLLM instances", + "id": 100, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 0 }, + "collapsed": false, + "title": "Overview System Performance", + "type": "row" + }, + { + "id": 1, + "type": "stat", + "title": "Available vLLM instances", + "description": "Number of healthy vLLM instances (by instance usage)", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 7, "w": 12, "x": 0, "y": 1 }, "fieldConfig": { "defaults": { - "color": { - "mode": "thresholds" - }, + "color": { "mode": "thresholds" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } + { "color": "green", "value": null }, + { "color": "red", "value": 80 } ] } }, "overrides": [] }, - "gridPos": { - "h": 7, - "w": 5, - "x": 0, - "y": 0 - }, - "id": 1, "options": { "colorMode": "value", "graphMode": "area", "justifyMode": "auto", "orientation": "auto", "percentChangeColorMode": "standard", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, "showPercentChange": false, "textMode": "auto", "wideLayout": true @@ -76,58 +63,76 @@ "pluginVersion": "11.4.0", "targets": [ { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, - "disableTextWrap": false, - "editorMode": "builder", "expr": "count by(endpoint) (vllm:cpu_cache_usage_perc)", - "fullMetaSearch": false, - "includeNullMetadata": true, + "editorMode": "builder", + "format": "time_series", "legendFormat": "vLLM instances", - "range": true, "refId": "A", - "useBackend": false + "range": true } - ], - "title": "Available vLLM instances", - "type": "stat" + ] }, { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, + "id": 19, + "type": "stat", + "title": "Average Latency", + "description": "Average end-to-end request latency in seconds", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 7, "w": 12, "x": 12, "y": 1 }, "fieldConfig": { "defaults": { - "color": { - "mode": "thresholds" - }, + "color": { "mode": "thresholds" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } + { "color": "green", "value": null }, + { "color": "red", "value": 1 } ] } }, "overrides": [] }, - "gridPos": { - "h": 7, - "w": 9, - "x": 5, - "y": 0 + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "expr": "avg(vllm:e2e_request_latency_seconds_sum) / avg(vllm:e2e_request_latency_seconds_count)", + "editorMode": "builder", + "legendFormat": "Avg Latency", + "refId": "A", + "range": true + } + ] + }, + { + "id": 2, + "type": "bargauge", + "title": "Request latency distribution", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 7, "w": 24, "x": 0, "y": 8 }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 80 } + ] + } + }, + "overrides": [] }, - "id": 6, "options": { "displayMode": "gradient", "legend": { @@ -136,76 +141,211 @@ "placement": "bottom", "showLegend": false }, - "maxVizHeight": 300, - "minVizHeight": 16, - "minVizWidth": 8, - "namePlacement": "auto", - "orientation": "auto", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, "showUnfilled": true, "sizing": "auto", - "valueMode": "color" + "valueMode": "color", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false } }, "pluginVersion": "11.4.0", "targets": [ { - "disableTextWrap": false, - "editorMode": "builder", - "exemplar": false, "expr": "sum by(le) (vllm:e2e_request_latency_seconds_bucket)", + "editorMode": "builder", "format": "heatmap", - "fullMetaSearch": false, - "includeNullMetadata": true, - "instant": false, "legendFormat": "{{le}}", - "range": true, "refId": "A", - "useBackend": false + "range": true } - ], - "title": "Request latency distribution", - "type": "bargauge" + ] + }, + { + "id": 101, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 15 }, + "collapsed": false, + "title": "QoS Information", + "type": "row" }, { - "datasource": { - "type": "prometheus", - "uid": "prometheus" + "id": 3, + "type": "stat", + "title": "Current QPS", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 7, "w": 12, "x": 0, "y": 16 }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 80 } + ] + } + }, + "overrides": [] }, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "expr": "vllm:current_qps", + "editorMode": "builder", + "legendFormat": "Current QPS", + "refId": "A", + "range": true + } + ] + }, + { + "id": 5, + "type": "stat", + "title": "Router-side Queueing Delay", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 7, "w": 12, "x": 12, "y": 16 }, "fieldConfig": { "defaults": { - "color": { - "mode": "thresholds" - }, + "color": { "mode": "thresholds" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } + { "color": "green", "value": null }, + { "color": "red", "value": 80 } ] } }, "overrides": [] }, - "gridPos": { - "h": 7, - "w": 10, - "x": 14, - "y": 0 + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "expr": "vllm:router_queueing_delay_seconds", + "editorMode": "builder", + "legendFormat": "Queueing Delay", + "refId": "A", + "range": true + } + ] + }, + { + "id": 6, + "type": "stat", + "title": "Average Prefill Length", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 7, "w": 12, "x": 0, "y": 23 }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 80 } + ] + } + }, + "overrides": [] + }, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "expr": "vllm:avg_prefill_length", + "editorMode": "builder", + "legendFormat": "Avg. Prefill Length", + "refId": "A", + "range": true + } + ] + }, + { + "id": 20, + "type": "stat", + "title": "Average ITL", + "description": "Average Inter-Token Latency", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 7, "w": 12, "x": 12, "y": 23 }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 1 } + ] + } + }, + "overrides": [] + }, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "expr": "avg(vllm:time_per_output_token_seconds_sum) / avg(vllm:time_per_output_token_seconds_count)", + "editorMode": "builder", + "legendFormat": "Avg ITL", + "refId": "A", + "range": true + } + ] + }, + { + "id": 4, + "type": "bargauge", + "title": "Request TTFT distribution", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 7, "w": 24, "x": 0, "y": 30 }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 80 } + ] + } + }, + "overrides": [] }, - "id": 7, "options": { "displayMode": "gradient", "legend": { @@ -214,440 +354,371 @@ "placement": "bottom", "showLegend": false }, - "maxVizHeight": 300, - "minVizHeight": 16, - "minVizWidth": 8, - "namePlacement": "auto", - "orientation": "auto", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false - }, "showUnfilled": true, "sizing": "auto", - "valueMode": "color" + "valueMode": "color", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false } }, "pluginVersion": "11.4.0", "targets": [ { - "disableTextWrap": false, - "editorMode": "builder", "expr": "sum by(le) (vllm:time_to_first_token_seconds_bucket)", + "editorMode": "builder", "format": "heatmap", - "fullMetaSearch": false, - "includeNullMetadata": true, "legendFormat": "__auto", - "range": true, "refId": "A", - "useBackend": false + "range": true } - ], - "title": "Request TTFT distribution", - "type": "bargauge" + ] }, { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, + "id": 102, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 37 }, + "collapsed": false, + "title": "Serving Engine Load", + "type": "row", + "note": "Metrics indicating the load on the serving engine." + }, + { + "id": 10, + "type": "timeseries", + "title": "Number of Running Requests", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 8, "x": 0, "y": 38 }, "fieldConfig": { "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, + "color": { "mode": "palette-classic" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } + { "color": "green", "value": null }, + { "color": "red", "value": 80 } ] } }, "overrides": [] }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 7 - }, - "id": 2, "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } + "legend": { "calcs": [], "displayMode": "list", "placement": "right", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } }, "pluginVersion": "11.4.0", "targets": [ { - "disableTextWrap": false, - "editorMode": "builder", "expr": "vllm:num_requests_running", - "fullMetaSearch": false, - "includeNullMetadata": true, + "editorMode": "builder", "legendFormat": "{{instance}}", - "range": true, "refId": "A", - "useBackend": false + "range": true } - ], - "title": "Number of running requests", - "type": "timeseries" + ] }, { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, + "id": 11, + "type": "timeseries", + "title": "Number of Pending Requests", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 8, "x": 8, "y": 38 }, "fieldConfig": { "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, + "color": { "mode": "palette-classic" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } + { "color": "green", "value": null }, + { "color": "red", "value": 80 } ] } }, "overrides": [] }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 7 - }, - "id": 4, "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } + "legend": { "calcs": [], "displayMode": "list", "placement": "right", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } }, "pluginVersion": "11.4.0", "targets": [ { - "disableTextWrap": false, + "expr": "vllm:num_requests_waiting", "editorMode": "builder", - "expr": "vllm:gpu_cache_usage_perc", - "fullMetaSearch": false, - "includeNullMetadata": true, "legendFormat": "{{instance}}", - "range": true, "refId": "A", - "useBackend": false + "range": true } - ], - "title": "GPU KV Usage percent", - "type": "timeseries" + ] }, { - "datasource": { - "type": "prometheus", - "uid": "prometheus" - }, + "id": 12, + "type": "timeseries", + "title": "GPU KV Usage Percentage", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 8, "x": 16, "y": 38 }, "fieldConfig": { "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, + "color": { "mode": "palette-classic" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } + { "color": "green", "value": null }, + { "color": "red", "value": 80 } ] } }, "overrides": [] }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 15 - }, - "id": 3, "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "right", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" + "legend": { "calcs": [], "displayMode": "list", "placement": "right", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "expr": "vllm:gpu_cache_usage_perc", + "editorMode": "builder", + "legendFormat": "{{instance}}", + "refId": "A", + "range": true } + ] + }, + { + "id": 13, + "type": "timeseries", + "title": "GPU KV Cache Hit Rate", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 46 }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 80 } + ] + } + }, + "overrides": [] + }, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "right", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } }, "pluginVersion": "11.4.0", "targets": [ { - "disableTextWrap": false, + "expr": "vllm:gpu_prefix_cache_hit_rate", "editorMode": "builder", - "expr": "vllm:num_requests_waiting", - "fullMetaSearch": false, - "includeNullMetadata": true, "legendFormat": "{{instance}}", - "range": true, "refId": "A", - "useBackend": false + "range": true } - ], - "title": "Number of pending requests", - "type": "timeseries" + ] }, { - "datasource": { - "type": "prometheus", - "uid": "prometheus" + "id": 21, + "type": "stat", + "title": "Number of Swapped Requests", + "description": "Requests moved from GPU to CPU", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 46 }, + "fieldConfig": { + "defaults": { + "color": { "mode": "thresholds" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 0 } + ] + } + }, + "overrides": [] + }, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { "calcs": ["lastNotNull"], "fields": "", "values": false }, + "textMode": "auto", + "wideLayout": true }, + "pluginVersion": "11.4.0", + "targets": [ + { + "expr": "vllm:num_requests_swapped", + "editorMode": "builder", + "legendFormat": "Swapped Requests", + "refId": "A", + "range": true + } + ] + }, + { + "id": 103, + "gridPos": { "h": 1, "w": 24, "x": 0, "y": 55 }, + "collapsed": false, + "title": "Current Resource Usage", + "type": "row", + "note": "Metrics for GPU, CPU, Memory and Disk usage." + }, + { + "id": 14, + "type": "timeseries", + "title": "GPU Usage", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 6, "x": 0, "y": 56 }, "fieldConfig": { "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisBorderShow": false, - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "barWidthFactor": 0.6, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, + "color": { "mode": "palette-classic" }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } + { "color": "green", "value": null }, + { "color": "red", "value": 80 } ] } }, "overrides": [] }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 15 + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "right", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "expr": "node_gpu_usage_query", + "editorMode": "builder", + "legendFormat": "GPU Usage", + "refId": "A", + "range": true + } + ] + }, + { + "id": 15, + "type": "timeseries", + "title": "CPU Usage", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 6, "x": 6, "y": 56 }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 80 } + ] + } + }, + "overrides": [] }, - "id": 5, "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "right", - "showLegend": true + "legend": { "calcs": [], "displayMode": "list", "placement": "right", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "expr": "node_cpu_usage_query", + "editorMode": "builder", + "legendFormat": "CPU Usage", + "refId": "A", + "range": true + } + ] + }, + { + "id": 16, + "type": "timeseries", + "title": "Memory Usage", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 6, "x": 12, "y": 56 }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 80 } + ] + } }, - "tooltip": { - "mode": "single", - "sort": "none" + "overrides": [] + }, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "right", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } + }, + "pluginVersion": "11.4.0", + "targets": [ + { + "expr": "node_memory_usage_query", + "editorMode": "builder", + "legendFormat": "Memory Usage", + "refId": "A", + "range": true } + ] + }, + { + "id": 17, + "type": "timeseries", + "title": "Disk Usage", + "datasource": { "type": "prometheus", "uid": "prometheus" }, + "gridPos": { "h": 8, "w": 6, "x": 18, "y": 56 }, + "fieldConfig": { + "defaults": { + "color": { "mode": "palette-classic" }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { "color": "green", "value": null }, + { "color": "red", "value": 80 } + ] + } + }, + "overrides": [] + }, + "options": { + "legend": { "calcs": [], "displayMode": "list", "placement": "right", "showLegend": true }, + "tooltip": { "mode": "single", "sort": "none" } }, "pluginVersion": "11.4.0", "targets": [ { - "disableTextWrap": false, + "expr": "node_disk_usage_query", "editorMode": "builder", - "expr": "vllm:gpu_prefix_cache_hit_rate", - "fullMetaSearch": false, - "includeNullMetadata": true, - "legendFormat": "{{instance}}", - "range": true, + "legendFormat": "Disk Usage", "refId": "A", - "useBackend": false + "range": true } - ], - "title": "GPU KV cache hit rate", - "type": "timeseries" + ] } ], "preload": false, "refresh": "auto", "schemaVersion": 40, "tags": [], - "templating": { - "list": [] - }, - "time": { - "from": "now-15m", - "to": "now" - }, + "templating": { "list": [] }, + "time": { "from": "now-15m", "to": "now" }, "timepicker": {}, "timezone": "browser", - "title": "vllm dashboard", - "uid": "ee9i0i4y606psc", - "version": 17, + "title": "vLLM Dashboard", + "uid": "750918234", + "version": 20, "weekStart": "" } diff --git a/src/tests/perftest/request_generator.py b/src/tests/perftest/request_generator.py index 8fb30edd..bbbfa107 100644 --- a/src/tests/perftest/request_generator.py +++ b/src/tests/perftest/request_generator.py @@ -96,7 +96,7 @@ def main(): processes = [] api_key = "YOUR_API_KEY_HERE" - base_url = "http://localhost:8000/" + base_url = "http://localhost:8000/v1" model = "fake_model_name" for _ in range(args.num_workers): diff --git a/src/tests/requirements.txt b/src/tests/requirements.txt index 6e6cb41b..743b58aa 100644 --- a/src/tests/requirements.txt +++ b/src/tests/requirements.txt @@ -1,4 +1,5 @@ fastapi httpx +openai uvicorn vllm diff --git a/src/vllm_router/engine_stats.py b/src/vllm_router/engine_stats.py index 342b36f5..1733b172 100644 --- a/src/vllm_router/engine_stats.py +++ b/src/vllm_router/engine_stats.py @@ -1,7 +1,7 @@ import threading import time from dataclasses import dataclass -from typing import Dict +from typing import Dict, Optional import requests from prometheus_client.parser import text_string_to_metric_families @@ -18,12 +18,12 @@ class EngineStats: # Number of running requests num_running_requests: int = 0 - # Number of queuing requests num_queuing_requests: int = 0 - - # GPU cache hit rate - gpu_cache_hit_rate: float = 0.0 + # GPU prefix cache hit rate (as used in some panels) + gpu_prefix_cache_hit_rate: float = 0.0 + # GPU KV usage percentage (new field for dashboard "GPU KV Usage Percentage") + gpu_cache_usage_perc: float = 0.0 @staticmethod def FromVllmScrape(vllm_scrape: str): @@ -41,7 +41,9 @@ def FromVllmScrape(vllm_scrape: str): """ num_running_reqs = 0 num_queuing_reqs = 0 - gpu_cache_hit_rate = 0 + gpu_prefix_cache_hit_rate = 0.0 + gpu_cache_usage_perc = 0.0 + for family in text_string_to_metric_families(vllm_scrape): for sample in family.samples: if sample.name == "vllm:num_requests_running": @@ -49,18 +51,23 @@ def FromVllmScrape(vllm_scrape: str): elif sample.name == "vllm:num_requests_waiting": num_queuing_reqs = sample.value elif sample.name == "vllm:gpu_prefix_cache_hit_rate": - gpu_cache_hit_rate = sample.value + gpu_prefix_cache_hit_rate = sample.value + elif sample.name == "vllm:gpu_cache_usage_perc": + gpu_cache_usage_perc = sample.value return EngineStats( num_running_requests=num_running_reqs, num_queuing_requests=num_queuing_reqs, - gpu_cache_hit_rate=gpu_cache_hit_rate, + gpu_prefix_cache_hit_rate=gpu_prefix_cache_hit_rate, + gpu_cache_usage_perc=gpu_cache_usage_perc, ) class EngineStatsScraper: def __init__(self, scrape_interval: float): """ + Initialize the scraper to periodically fetch metrics from all serving engines. + Args: scrape_interval (float): The interval in seconds to scrape the metrics. @@ -73,18 +80,16 @@ def __init__(self, scrape_interval: float): self.service_discovery = GetServiceDiscovery() self.engine_stats: Dict[str, EngineStats] = {} self.engine_stats_lock = threading.Lock() - self.scrape_interval = scrape_interval self.scrape_thread = threading.Thread(target=self._scrape_worker, daemon=True) self.scrape_thread.start() def _scrape_one_endpoint(self, url: str): - """Scrape the metrics and model information from a single - serving engine + """ + Scrape metrics from a single serving engine. Args: - url (str): The URL of the serving engine - (does not contain endpoint) + url (str): The base URL of the serving engine. """ try: response = requests.get(url + "/metrics") @@ -96,6 +101,14 @@ def _scrape_one_endpoint(self, url: str): return engine_stats def _scrape_metrics(self): + """ + Scrape metrics from all serving engines. + + Scrape metrics from all serving engines by calling + _scrape_one_endpoint on each of them. The metrics are + stored in self.engine_stats. + + """ collected_engine_stats = {} endpoints = self.service_discovery.get_endpoint_info() logger.info(f"Scraping metrics from {len(endpoints)} serving engine(s)") @@ -110,16 +123,29 @@ def _scrape_metrics(self): for old_url in old_urls: if old_url not in collected_engine_stats: del self.engine_stats[old_url] - for url, stats in collected_engine_stats.items(): self.engine_stats[url] = stats def _scrape_worker(self): + """ + Periodically scrape metrics from all serving engines in the background. + + This function will loop forever and sleep for self.scrape_interval + seconds between each scrape. It will call _scrape_metrics to scrape + metrics from all serving engines and store them in self.engine_stats. + + """ while True: self._scrape_metrics() time.sleep(self.scrape_interval) def get_engine_stats(self) -> Dict[str, EngineStats]: + """ + Retrieve a copy of the current engine statistics. + + Returns: + A dictionary mapping engine URLs to their respective EngineStats objects. + """ with self.engine_stats_lock: return self.engine_stats.copy() @@ -136,8 +162,10 @@ def get_health(self) -> bool: def InitializeEngineStatsScraper(scrape_interval: float) -> EngineStatsScraper: """ - Initialize the EngineStatsScraper object. This function should be - called after the service discovery module has been initialized. + Initialize the EngineStatsScraper. + + Args: + scrape_interval (float): The interval (in seconds) to scrape metrics. Raises: ValueError: if the service discover module is have @@ -149,23 +177,20 @@ def InitializeEngineStatsScraper(scrape_interval: float) -> EngineStatsScraper: global _global_engine_stats_scraper if _global_engine_stats_scraper: raise ValueError("EngineStatsScraper object has already been initialized") - _global_engine_stats_scraper = EngineStatsScraper(scrape_interval) return _global_engine_stats_scraper def GetEngineStatsScraper() -> EngineStatsScraper: """ - Get the EngineStatsScraper object + Retrieve the EngineStatsScraper. Raises: - ValueError: if the EngineStatsScraper object has not been - initialized + ValueError: If not initialized. """ global _global_engine_stats_scraper if not _global_engine_stats_scraper: raise ValueError("EngineStatsScraper object has not been initialized") - return _global_engine_stats_scraper diff --git a/src/vllm_router/request_stats.py b/src/vllm_router/request_stats.py index f42dd7e3..1c724f69 100644 --- a/src/vllm_router/request_stats.py +++ b/src/vllm_router/request_stats.py @@ -1,6 +1,6 @@ from collections import deque from dataclasses import dataclass -from typing import Deque, Dict +from typing import Deque, Dict, Tuple from vllm_router.log import init_logger @@ -13,63 +13,66 @@ class RequestStats: # Number of queries per second qps: float - - # Average time-to-first-token in seconds + # Average time-to-first-token (TTFT) in seconds ttft: float - # Total number of requests during prefilling in_prefill_requests: int - # Total number of requests during decoding in_decoding_requests: int - # Total number of requests finished finished_requests: int - - # How long does this url serves requests - # NOTE (ApostaC): consider moving this to engine stats + # How long the engine has been serving requests (uptime) uptime: int + # Average decoding length (time from first token to completion) + avg_decoding_length: float + # Average overall latency (from request arrival to completion) + avg_latency: float + # Average inter-token latency (if available; default -1 if not computed) + avg_itl: float + # Number of swapped requests (moved from GPU to CPU) + num_swapped_requests: int class MovingAverageMonitor: """ - Monitors the average of the value of in a sliding window + Monitors the average of values in a sliding window. """ def __init__(self, sliding_window_size: float): self.sliding_window_size = sliding_window_size - self.timestamps = deque() - self.values = deque() + self.timestamps: Deque[float] = deque() + self.values: Deque[float] = deque() def update(self, timestamp: float, value: float): """ Update the throughput monitor with a new timestamp + + Args: + timestamp: The timestamp of the data point. + value: The value of the data point. + + This method adds the new data point to the sliding window and + removes any data point that is older than the sliding window size. """ self.timestamps.append(timestamp) self.values.append(value) while ( - len(self.timestamps) > 0 + self.timestamps and self.timestamps[0] < timestamp - self.sliding_window_size ): self.timestamps.popleft() self.values.popleft() def get_average(self) -> float: - """ - Get the throughput in the sliding window - """ - return sum(self.values) / len(self.values) + return sum(self.values) / len(self.values) if self.values else -1 def get_sum(self) -> float: - """ - Get the sum of the values in the sliding window - """ return sum(self.values) class RequestStatsMonitor: """ - Monitors the request statistics of all serving engines + Monitors the request statistics of all serving engines. """ # NOTE (ApostaC): Currently, QPS is calculated based on the number of @@ -90,14 +93,23 @@ def __init__(self, sliding_window_size: float): self.ttft_monitors: Dict[str, MovingAverageMonitor] = {} # The time when the request is coming (engine_url, request_id) -> timestamp - self.request_coming_time: Dict[(str, str), float] = {} + self.request_start_time: Dict[Tuple[str, str], float] = {} + # Record time when first token is received: (engine_url, request_id) -> timestamp + self.first_token_time: Dict[Tuple[str, str], float] = {} # Number of requests in different stages (from the start of the router) self.in_prefill_requests: Dict[str, int] = {} self.in_decoding_requests: Dict[str, int] = {} self.finished_requests: Dict[str, int] = {} - self.first_query_time = None + # New monitors for overall latency and decoding length + self.latency_monitors: Dict[str, MovingAverageMonitor] = {} + self.decoding_length_monitors: Dict[str, MovingAverageMonitor] = {} + + # Counter for swapped requests + self.swapped_requests: Dict[str, int] = {} + + self.first_query_time: float = None def on_new_request(self, engine_url: str, request_id: str, timestamp: float): """ @@ -108,7 +120,7 @@ def on_new_request(self, engine_url: str, request_id: str, timestamp: float): request_id: The global request ID timestamp: the timestamp when the request was created """ - self.request_coming_time[(engine_url, request_id)] = timestamp + self.request_start_time[(engine_url, request_id)] = timestamp if engine_url not in self.in_prefill_requests: self.in_prefill_requests[engine_url] = 0 @@ -118,7 +130,6 @@ def on_new_request(self, engine_url: str, request_id: str, timestamp: float): self.qps_monitors[engine_url] = MovingAverageMonitor( self.sliding_window_size ) - self.qps_monitors[engine_url].update(timestamp, 1) if self.first_query_time is None: @@ -133,20 +144,25 @@ def on_request_response(self, engine_url: str, request_id: str, timestamp: float request_id: The global request ID timestamp: The timestamp when the response token was received """ - if (engine_url, request_id) not in self.request_coming_time: + if (engine_url, request_id) not in self.request_start_time: return - coming_time = self.request_coming_time.pop((engine_url, request_id)) + # Record first token time (do not pop so we can compute overall latency later) + self.first_token_time[(engine_url, request_id)] = timestamp if engine_url not in self.in_decoding_requests: self.in_decoding_requests[engine_url] = 0 - self.in_prefill_requests[engine_url] -= 1 + self.in_prefill_requests[engine_url] = max( + 0, self.in_prefill_requests.get(engine_url, 1) - 1 + ) self.in_decoding_requests[engine_url] += 1 if engine_url not in self.ttft_monitors: self.ttft_monitors[engine_url] = MovingAverageMonitor( self.sliding_window_size ) - self.ttft_monitors[engine_url].update(timestamp, timestamp - coming_time) + # Update TTFT as time from request start to first token + ttft = timestamp - self.request_start_time[(engine_url, request_id)] + self.ttft_monitors[engine_url].update(timestamp, ttft) def on_request_complete(self, engine_url: str, request_id: str, timestamp: float): """ @@ -159,13 +175,26 @@ def on_request_complete(self, engine_url: str, request_id: str, timestamp: float """ if engine_url not in self.finished_requests: self.finished_requests[engine_url] = 0 - self.in_decoding_requests[engine_url] -= 1 + self.in_decoding_requests[engine_url] = max( + 0, self.in_decoding_requests.get(engine_url, 1) - 1 + ) self.finished_requests[engine_url] += 1 - def get_request_stats( - self, - current_time: float, - ) -> Dict[str, RequestStats]: + def on_request_swapped(self, engine_url: str, request_id: str, timestamp: float): + # This function should be called if a request is determined to be swapped from GPU to CPU. + """ + Tell the monitor that a request has been swapped from GPU to CPU. + + Args: + engine_url: The URL of the serving engine + request_id: The global request ID + timestamp: The timestamp when the request was swapped + """ + if engine_url not in self.swapped_requests: + self.swapped_requests[engine_url] = 0 + self.swapped_requests[engine_url] += 1 + + def get_request_stats(self, current_time: float) -> Dict[str, RequestStats]: """ Get the request statistics for each serving engine @@ -178,14 +207,10 @@ def get_request_stats( The TTFT and inter token latency will be -1 if there is no requests finished in the sliding window. """ - # Calculate the request statistics ret = {} - - # Get all urls: urls = set(self.in_prefill_requests.keys()).union( set(self.in_decoding_requests.keys()) ) - for engine_url in urls: if engine_url not in self.qps_monitors: qps = -1 @@ -197,19 +222,42 @@ def get_request_stats( else: ttft = self.ttft_monitors[engine_url].get_average() - in_prefill_requests = self.in_prefill_requests.get(engine_url, 0) - in_decoding_requests = self.in_decoding_requests.get(engine_url, 0) - finished_requests = self.finished_requests.get(engine_url, 0) + in_prefill = self.in_prefill_requests.get(engine_url, 0) + in_decoding = self.in_decoding_requests.get(engine_url, 0) + finished = self.finished_requests.get(engine_url, 0) + + if engine_url in self.decoding_length_monitors: + avg_dec_len = self.decoding_length_monitors[engine_url].get_average() + else: + avg_dec_len = -1 + + if engine_url in self.latency_monitors: + avg_lat = self.latency_monitors[engine_url].get_average() + else: + avg_lat = -1 + + # For avg_itl, if not computed, default to -1. + avg_itl_val = -1 + + if engine_url in self.swapped_requests: + swapped = self.swapped_requests[engine_url] + else: + swapped = 0 ret[engine_url] = RequestStats( qps=qps, ttft=ttft, - in_prefill_requests=in_prefill_requests, - in_decoding_requests=in_decoding_requests, - finished_requests=finished_requests, - uptime=current_time - self.first_query_time, + in_prefill_requests=in_prefill, + in_decoding_requests=in_decoding, + finished_requests=finished, + uptime=( + current_time - self.first_query_time if self.first_query_time else 0 + ), + avg_decoding_length=avg_dec_len, + avg_latency=avg_lat, + avg_itl=avg_itl_val, + num_swapped_requests=swapped, ) - return ret @@ -227,7 +275,6 @@ def InitializeRequestStatsMonitor(sliding_window_size: float): global _global_request_stats_monitor if _global_request_stats_monitor is not None: raise ValueError("The global request statistics monitor has been initialized") - _global_request_stats_monitor = RequestStatsMonitor(sliding_window_size) return _global_request_stats_monitor @@ -247,5 +294,4 @@ def GetRequestStatsMonitor(): raise ValueError( "The global request statistics monitor has not been initialized" ) - return _global_request_stats_monitor diff --git a/src/vllm_router/requirements.txt b/src/vllm_router/requirements.txt index 301f2877..7151639f 100644 --- a/src/vllm_router/requirements.txt +++ b/src/vllm_router/requirements.txt @@ -1,7 +1,9 @@ +aiofiles==24.1.0 fastapi==0.115.8 httpx==0.28.1 kubernetes==32.0.0 numpy==1.26.4 prometheus_client==0.21.1 +python-multipart==0.0.20 uhashring==2.3 uvicorn==0.34.0 diff --git a/src/vllm_router/router.py b/src/vllm_router/router.py index bfc9a3ea..ea209383 100644 --- a/src/vllm_router/router.py +++ b/src/vllm_router/router.py @@ -9,6 +9,7 @@ import uvicorn from fastapi import FastAPI, Request, UploadFile from fastapi.responses import JSONResponse, Response, StreamingResponse +from prometheus_client import CONTENT_TYPE_LATEST, Gauge, generate_latest from vllm_router.batch import BatchProcessor, initialize_batch_processor from vllm_router.engine_stats import GetEngineStatsScraper, InitializeEngineStatsScraper @@ -44,20 +45,67 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) -# TODO: better request id system +# --- Prometheus Gauges --- +# Existing metrics +num_requests_running = Gauge( + "vllm:num_requests_running", "Number of running requests", ["server"] +) +num_requests_waiting = Gauge( + "vllm:num_requests_waiting", "Number of waiting requests", ["server"] +) +current_qps = Gauge("vllm:current_qps", "Current Queries Per Second", ["server"]) +avg_decoding_length = Gauge( + "vllm:avg_decoding_length", "Average Decoding Length", ["server"] +) +num_prefill_requests = Gauge( + "vllm:num_prefill_requests", "Number of Prefill Requests", ["server"] +) +num_decoding_requests = Gauge( + "vllm:num_decoding_requests", "Number of Decoding Requests", ["server"] +) +# New metrics per dashboard update +healthy_pods_total = Gauge( + "vllm:healthy_pods_total", "Number of healthy vLLM pods", ["server"] +) +avg_latency = Gauge( + "vllm:avg_latency", "Average end-to-end request latency", ["server"] +) +avg_itl = Gauge("vllm:avg_itl", "Average Inter-Token Latency", ["server"]) +num_requests_swapped = Gauge( + "vllm:num_requests_swapped", "Number of swapped requests", ["server"] +) + +# --- Request Processing & Routing --- +# TODO: better request id system async def process_request( method, header, body, backend_url, request_id, endpoint, debug_request=None ): """ - Async generator to stream data from the backend server to the client. + Process a request by sending it to the chosen backend. + + Args: + method: The HTTP method to use when sending the request to the backend. + header: The headers to send with the request to the backend. + body: The content of the request to send to the backend. + backend_url: The URL of the backend to send the request to. + request_id: A unique identifier for the request. + endpoint: The endpoint to send the request to on the backend. + debug_request: The original request object from the client, used for + optional debug logging. + + Yields: + The response headers and status code, followed by the response content. + + Raises: + HTTPError: If the backend returns a 4xx or 5xx status code. """ first_token = False total_len = 0 - # Pass response headers to the client start_time = time.time() GetRequestStatsMonitor().on_new_request(backend_url, request_id, start_time) + logger.info(f"Started request {request_id} for backend {backend_url}") client = httpx_client_wrapper() async with client.stream( @@ -67,9 +115,9 @@ async def process_request( content=body, timeout=None, ) as backend_response: + # Yield headers and status code first. yield backend_response.headers, backend_response.status_code - - # Stream response content + # Stream response content. async for chunk in backend_response.aiter_bytes(): total_len += len(chunk) if not first_token: @@ -80,22 +128,32 @@ async def process_request( yield chunk GetRequestStatsMonitor().on_request_complete(backend_url, request_id, time.time()) - - # if debug_request: - # logger.debug(f"Finished the request with request id: {debug_request.headers.get('x-request-id', None)} at {time.time()}") + logger.info(f"Completed request {request_id} for backend {backend_url}") + # Optional debug logging can be enabled here. + # logger.debug(f"Finished the request with id: {debug_request.headers.get('x-request-id', None)} at {time.time()}") async def route_general_request(request: Request, endpoint: str): """ - Route the incoming request to the backend server and stream the response - back to the client. + Route the incoming request to the backend server and stream the response back to the client. + + This function extracts the requested model from the request body and retrieves the + corresponding endpoints. It uses routing logic to determine the best server URL to handle + the request, then streams the request to that server. If the requested model is not available, + it returns an error response. + + Args: + request (Request): The incoming HTTP request. + endpoint (str): The endpoint to which the request should be routed. + + Returns: + StreamingResponse: A response object that streams data from the backend server to the client. """ + in_router_time = time.time() request_id = str(uuid.uuid4()) - - # TODO (ApostaC): merge two awaits into one request_body = await request.body() - request_json = await request.json() + request_json = await request.json() # TODO (ApostaC): merge two awaits into one requested_model = request_json.get("model", None) if requested_model is None: return JSONResponse( @@ -106,21 +164,19 @@ async def route_general_request(request: Request, endpoint: str): endpoints = GetServiceDiscovery().get_endpoint_info() engine_stats = GetEngineStatsScraper().get_engine_stats() request_stats = GetRequestStatsMonitor().get_request_stats(time.time()) - endpoints = list(filter(lambda x: x.model_name == requested_model, endpoints)) - if len(endpoints) == 0: + if not endpoints: return JSONResponse( status_code=400, content={"error": f"Model {requested_model} not found."} ) + logger.debug(f"Routing request {request_id} for model: {requested_model}") server_url = GetRoutingLogic().route_request( endpoints, engine_stats, request_stats, request ) - curr_time = time.time() logger.info( - f"Routing request {request_id} to {server_url} at {curr_time}, " - f"process time = {curr_time - in_router_time:.4f}" + f"Routing request {request_id} to {server_url} at {curr_time}, process time = {curr_time - in_router_time:.4f}" ) stream_generator = process_request( request.method, @@ -130,9 +186,7 @@ async def route_general_request(request: Request, endpoint: str): request_id, endpoint=endpoint, ) - headers, status_code = await anext(stream_generator) - return StreamingResponse( stream_generator, status_code=status_code, @@ -141,26 +195,30 @@ async def route_general_request(request: Request, endpoint: str): ) +# --- File Endpoints --- @app.post("/v1/files") async def route_files(request: Request): - """Handle file upload requests that include a purpose and file data.""" - form = await request.form() + """ + Handle file upload requests and save the files to the configured storage. - # Validate required fields - if "purpose" not in form: - # Unlike openai, we do not support fine-tuning, so we do not need to - # check for 'purpose`.` - purpose = "unknown" - else: - purpose = form["purpose"] + Args: + request (Request): The incoming HTTP request. + + Returns: + JSONResponse: A JSON response containing the file metadata. + + Raises: + JSONResponse: A JSON response with a 400 status code if the request is invalid, + or a 500 status code if an error occurs during file saving. + """ + form = await request.form() + purpose = form.get("purpose", "unknown") if "file" not in form: return JSONResponse( status_code=400, content={"error": "Missing required parameter 'file'"} ) - file_obj: UploadFile = form["file"] file_content = await file_obj.read() - try: storage: Storage = app.state.batch_storage file_info = await storage.save_file( @@ -292,6 +350,99 @@ async def route_cancel_batch(batch_id: str): ) +@app.post("/v1/batches") +async def route_batches(request: Request): + """Handle batch requests that process files with specified endpoints.""" + try: + request_json = await request.json() + + # Validate required fields + if "input_file_id" not in request_json: + return JSONResponse( + status_code=400, + content={"error": "Missing required parameter 'input_file_id'"}, + ) + if "endpoint" not in request_json: + return JSONResponse( + status_code=400, + content={"error": "Missing required parameter 'endpoint'"}, + ) + + # Verify file exists + storage: Storage = app.state.batch_storage + file_id = request_json["input_file_id"] + try: + await storage.get_file(file_id) + except FileNotFoundError: + return JSONResponse( + status_code=404, content={"error": f"File {file_id} not found"} + ) + + batch_processor: BatchProcessor = app.state.batch_processor + batch = await batch_processor.create_batch( + input_file_id=file_id, + endpoint=request_json["endpoint"], + completion_window=request_json.get("completion_window", "5s"), + metadata=request_json.get("metadata", None), + ) + + # Return metadata as attribute, not a callable. + return JSONResponse(content=batch.to_dict()) + + except Exception as e: + return JSONResponse( + status_code=500, + content={"error": f"Failed to process batch request: {str(e)}"}, + ) + + +@app.get("/v1/batches/{batch_id}") +async def route_get_batch(batch_id: str): + try: + batch_processor: BatchProcessor = app.state.batch_processor + batch = await batch_processor.retrieve_batch(batch_id) + return JSONResponse(content=batch.to_dict()) + except FileNotFoundError: + return JSONResponse( + status_code=404, content={"error": f"Batch {batch_id} not found"} + ) + + +@app.get("/v1/batches") +async def route_list_batches(limit: int = 20, after: str = None): + try: + batch_processor: BatchProcessor = app.state.batch_processor + batches = await batch_processor.list_batches(limit=limit, after=after) + + # Convert batches to response format + batch_data = [batch.to_dict() for batch in batches] + + response = { + "object": "list", + "data": batch_data, + "first_id": batch_data[0]["id"] if batch_data else None, + "last_id": batch_data[-1]["id"] if batch_data else None, + "has_more": len(batch_data) + == limit, # If we got limit items, there may be more + } + + return JSONResponse(content=response) + except FileNotFoundError: + return JSONResponse(status_code=404, content={"error": "No batches found"}) + + +@app.delete("/v1/batches/{batch_id}") +async def route_cancel_batch(batch_id: str): + try: + batch_processor: BatchProcessor = app.state.batch_processor + batch = await batch_processor.cancel_batch(batch_id) + return JSONResponse(content=batch.to_dict()) + except FileNotFoundError: + return JSONResponse( + status_code=404, content={"error": f"Batch {batch_id} not found"} + ) + + @app.post("/v1/chat/completions") async def route_chat_completition(request: Request): return await route_general_request(request, "/v1/chat/completions") @@ -304,12 +455,23 @@ async def route_completition(request: Request): @app.get("/version") async def show_version(): - ver = {"version": STACK_VERSION} - return JSONResponse(content=ver) + return JSONResponse(content={"version": STACK_VERSION}) @app.get("/v1/models") async def show_models(): + """ + Returns a list of all models available in the stack. + + Args: + None + + Returns: + JSONResponse: A JSON response containing the list of models. + + Raises: + Exception: If there is an error in retrieving the endpoint information. + """ endpoints = GetServiceDiscovery().get_endpoint_info() existing_models = set() model_cards = [] @@ -324,14 +486,26 @@ async def show_models(): ) model_cards.append(model_card) existing_models.add(endpoint.model_name) - model_list = ModelList(data=model_cards) return JSONResponse(content=model_list.model_dump()) @app.get("/health") async def health() -> Response: - """Health check. check the health of the threads""" + """ + Endpoint to check the health status of various components. + + This function verifies the health of the service discovery module and + the engine stats scraper. If either component is down, it returns a + 503 response with the appropriate status message. If both components + are healthy, it returns a 200 OK response. + + Returns: + Response: A JSONResponse with status code 503 if a component is + down, or a plain Response with status code 200 if all components + are healthy. + """ + if not GetServiceDiscovery().get_health(): return JSONResponse( content={"status": "Service discovery module is down."}, status_code=503 @@ -343,6 +517,51 @@ async def health() -> Response: return Response(status_code=200) +# --- Prometheus Metrics Endpoint --- +@app.get("/metrics") +async def metrics(): + # Retrieve request stats from the monitor. + """ + Endpoint to expose Prometheus metrics for the vLLM router. + + This function gathers request statistics, engine metrics, and health status + of the service endpoints to update Prometheus gauges. It exports metrics + such as queries per second (QPS), average decoding length, number of prefill + and decoding requests, average latency, average inter-token latency, number + of swapped requests, and the number of healthy pods for each server. The + metrics are used to monitor the performance and health of the vLLM router + services. + + Returns: + Response: A HTTP response containing the latest Prometheus metrics in + the appropriate content type. + """ + + stats = GetRequestStatsMonitor().get_request_stats(time.time()) + for server, stat in stats.items(): + current_qps.labels(server=server).set(stat.qps) + # Assuming stat contains the following attributes: + avg_decoding_length.labels(server=server).set(stat.avg_decoding_length) + num_prefill_requests.labels(server=server).set(stat.in_prefill_requests) + num_decoding_requests.labels(server=server).set(stat.in_decoding_requests) + num_requests_running.labels(server=server).set( + stat.in_prefill_requests + stat.in_decoding_requests + ) + avg_latency.labels(server=server).set(stat.avg_latency) + avg_itl.labels(server=server).set(stat.avg_itl) + num_requests_swapped.labels(server=server).set(stat.num_swapped_requests) + # For healthy pods, we use a hypothetical function from service discovery. + healthy = {} + endpoints = GetServiceDiscovery().get_endpoint_info() + for ep in endpoints: + # Assume each endpoint object has an attribute 'healthy' (1 if healthy, 0 otherwise). + healthy[ep.url] = 1 if getattr(ep, "healthy", True) else 0 + for server, value in healthy.items(): + healthy_pods_total.labels(server=server).set(value) + return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) + + +# --- Argument Parsing and Initialization --- def validate_args(args): if args.service_discovery == "static": if args.static_backends is None: @@ -353,26 +572,16 @@ def validate_args(args): raise ValueError( "Static models must be provided when using static service discovery." ) - - if args.service_discovery == "static" and args.static_backends is None: - raise ValueError( - "Static backends must be provided when using static service discovery." - ) - if args.service_discovery == "k8s" and args.k8s_port is None: raise ValueError("K8s port must be provided when using K8s service discovery.") - if args.routing_logic == "session" and args.session_key is None: raise ValueError( "Session key must be provided when using session routing logic." ) - if args.log_stats and args.log_stats_interval <= 0: raise ValueError("Log stats interval must be greater than 0.") - if args.engine_stats_interval <= 0: raise ValueError("Engine stats interval must be greater than 0.") - if args.request_stats_window <= 0: raise ValueError("Request stats window must be greater than 0.") @@ -385,8 +594,6 @@ def parse_args(): parser.add_argument( "--port", type=int, default=8001, help="The port to run the server on." ) - - # Service discovery parser.add_argument( "--service-discovery", required=True, @@ -397,14 +604,13 @@ def parse_args(): "--static-backends", type=str, default=None, - help="The urls of static backends, separated by comma." - "E.g., http://localhost:8000,http://localhost:8001", + help="The URLs of static backends, separated by commas. E.g., http://localhost:8000,http://localhost:8001", ) parser.add_argument( "--static-models", type=str, default=None, - help="The models of static backends, separated by comma. E.g., model1,model2", + help="The models of static backends, separated by commas. E.g., model1,model2", ) parser.add_argument( "--k8s-port", @@ -424,8 +630,6 @@ def parse_args(): default="", help="The label selector to filter vLLM pods when using K8s service discovery.", ) - - # Routing logic parser.add_argument( "--routing-logic", type=str, @@ -479,14 +683,11 @@ def parse_args(): "--request-stats-window", type=int, default=60, - help="The sliding window seconds to compute request statistics.", + help="The sliding window in seconds to compute request statistics.", ) - - # Logging parser.add_argument( "--log-stats", action="store_true", help="Log statistics periodically." ) - parser.add_argument( "--log-stats-interval", type=int, @@ -505,7 +706,7 @@ def parse_static_urls(args): if validate_url(url): backend_urls.append(url) else: - logger.warning(f"Skipping invalid url: {url}") + logger.warning(f"Skipping invalid URL: {url}") return backend_urls @@ -515,6 +716,15 @@ def parse_static_model_names(args): def InitializeAll(args): + """ + Initialize all the components of the router with the given arguments. + + Args: + args: the parsed command-line arguments + + Raises: + ValueError: if the service discovery type is invalid + """ if args.service_discovery == "static": InitializeServiceDiscovery( ServiceDiscoveryType.STATIC, @@ -530,7 +740,6 @@ def InitializeAll(args): ) else: raise ValueError(f"Invalid service discovery type: {args.service_discovery}") - InitializeEngineStatsScraper(args.engine_stats_interval) InitializeRequestStatsMonitor(args.request_stats_window) @@ -547,6 +756,20 @@ def InitializeAll(args): def log_stats(interval: int = 10): + """ + Periodically logs the engine and request statistics for each service endpoint. + + This function retrieves the current service endpoints and their corresponding + engine and request statistics, and logs them at a specified interval. The + statistics include the number of running and queued requests, GPU cache hit + rate, queries per second (QPS), average latency, average inter-token latency + (ITL), and more. These statistics are also updated in the Prometheus metrics. + + Args: + interval (int): The interval in seconds at which statistics are logged. + Default is 10 seconds. + """ + while True: time.sleep(interval) logstr = "\n" + "=" * 50 + "\n" @@ -555,17 +778,41 @@ def log_stats(interval: int = 10): request_stats = GetRequestStatsMonitor().get_request_stats(time.time()) for endpoint in endpoints: url = endpoint.url + logstr += f"Model: {endpoint.model_name}\n" logstr += f"Server: {url}\n" if url in engine_stats: - logstr += f" Engine stats: {engine_stats[url]}\n" + es = engine_stats[url] + logstr += ( + f" Engine Stats: Running Requests: {es.num_running_requests}, " + f"Queued Requests: {es.num_queuing_requests}, " + f"GPU Cache Hit Rate: {es.gpu_prefix_cache_hit_rate:.2f}\n" + ) else: - logstr += " Engine stats: No stats available\n" - + logstr += " Engine Stats: No stats available\n" if url in request_stats: - logstr += f" Request Stats: {request_stats[url]}\n" + rs = request_stats[url] + logstr += ( + f" Request Stats: QPS: {rs.qps:.2f}, " + f"Avg Latency: {rs.avg_latency}, " + f"Avg ITL: {rs.avg_itl}, " + f"Prefill Requests: {rs.in_prefill_requests}, " + f"Decoding Requests: {rs.in_decoding_requests}, " + f"Swapped Requests: {rs.num_swapped_requests}, " + f"Finished: {rs.finished_requests}, " + f"Uptime: {rs.uptime:.2f} sec\n" + ) + current_qps.labels(server=url).set(rs.qps) + avg_decoding_length.labels(server=url).set(rs.avg_decoding_length) + num_prefill_requests.labels(server=url).set(rs.in_prefill_requests) + num_decoding_requests.labels(server=url).set(rs.in_decoding_requests) + num_requests_running.labels(server=url).set( + rs.in_prefill_requests + rs.in_decoding_requests + ) + avg_latency.labels(server=url).set(rs.avg_latency) + avg_itl.labels(server=url).set(rs.avg_itl) + num_requests_swapped.labels(server=url).set(rs.num_swapped_requests) else: - logstr += " Request Stats: No stats available\n" - + logstr += " Request Stats: No stats available\n" logstr += "-" * 50 + "\n" logstr += "=" * 50 + "\n" logger.info(logstr) @@ -573,9 +820,7 @@ def log_stats(interval: int = 10): def main(): args = parse_args() - InitializeAll(args) - if args.log_stats: threading.Thread( target=log_stats, args=(args.log_stats_interval,), daemon=True diff --git a/src/vllm_router/run-router.sh b/src/vllm_router/run-router.sh old mode 100644 new mode 100755 index cde00c46..b1b6ddbb --- a/src/vllm_router/run-router.sh +++ b/src/vllm_router/run-router.sh @@ -4,15 +4,30 @@ if [[ $# -ne 1 ]]; then exit 1 fi -python3 router.py --port "$1" \ - --service-discovery k8s \ - --k8s-label-selector release=test \ - --k8s-namespace default \ - --routing-logic session \ - --session-key "x-user-id" \ +# Use this command when testing with k8s service discovery +# python3 -m vllm_router.router --port "$1" \ +# --service-discovery k8s \ +# --k8s-label-selector release=test \ +# --k8s-namespace default \ +# --routing-logic session \ +# --session-key "x-user-id" \ +# --engine-stats-interval 10 \ +# --log-stats + +# Use this command when testing with static service discovery +python3 -m vllm_router.router --port "$1" \ + --service-discovery static \ + --static-backends "http://localhost:9000" \ + --static-models "fake_model_name" \ + --log-stats \ + --log-stats-interval 10 \ --engine-stats-interval 10 \ - --log-stats + --request-stats-window 10 \ + --request-stats-window 10 \ + --routing-logic session \ + --session-key "x-user-id" +# Use this command when testing with roundrobin routing logic #python3 router.py --port "$1" \ # --service-discovery k8s \ # --k8s-label-selector release=test \ diff --git a/utils/install-minikube-cluster.sh b/utils/install-minikube-cluster.sh index 5918961c..6017c28d 100755 --- a/utils/install-minikube-cluster.sh +++ b/utils/install-minikube-cluster.sh @@ -1,51 +1,91 @@ #!/bin/bash set -e +# Allow users to override the paths for the NVIDIA tools. +: "${NVIDIA_SMI_PATH:=nvidia-smi}" +: "${NVIDIA_CTK_PATH:=nvidia-ctk}" + +# --- Debug and Environment Setup --- +echo "Current PATH: $PATH" +echo "Operating System: $(uname -a)" + +# --- Helper Functions --- +# Check if minikube is installed. minikube_exists() { command -v minikube >/dev/null 2>&1 } -# Get script directory for relative paths +# Get the script directory to reference local scripts reliably. SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -# Install kubectl and helm +# --- Install Prerequisites --- +echo "Installing kubectl and helm..." bash "$SCRIPT_DIR/install-kubectl.sh" bash "$SCRIPT_DIR/install-helm.sh" -# Install minikube +# Install minikube if it isn’t already installed. if minikube_exists; then - echo "Minikube already installed" + echo "Minikube already installed." else + echo "Minikube not found. Installing minikube..." curl -LO https://github.com/kubernetes/minikube/releases/latest/download/minikube-linux-amd64 sudo install minikube-linux-amd64 /usr/local/bin/minikube && rm minikube-linux-amd64 fi -# Configure BPF if available +# --- Configure BPF (if available) --- if [ -f /proc/sys/net/core/bpf_jit_harden ]; then + echo "Configuring BPF: Setting net.core.bpf_jit_harden=0" echo "net.core.bpf_jit_harden=0" | sudo tee -a /etc/sysctl.conf sudo sysctl -p else echo "BPF JIT hardening configuration not available, skipping..." fi -# Check if NVIDIA GPU is available -if command -v nvidia-smi &> /dev/null; then - # Install nvidia-container-toolkit - sudo nvidia-ctk runtime configure --runtime=docker && sudo systemctl restart docker +# --- NVIDIA GPU Setup --- +GPU_AVAILABLE=false +if command -v "$NVIDIA_SMI_PATH" >/dev/null 2>&1; then + echo "NVIDIA GPU detected via nvidia-smi at: $(command -v "$NVIDIA_SMI_PATH")" + if command -v "$NVIDIA_CTK_PATH" >/dev/null 2>&1; then + echo "nvidia-ctk found at: $(command -v "$NVIDIA_CTK_PATH")" + GPU_AVAILABLE=true + else + echo "nvidia-ctk not found. Please install the NVIDIA Container Toolkit to enable GPU support." + fi +else + echo "No NVIDIA GPU detected. Will start minikube without GPU support." +fi - # Start cluster with GPU support - minikube start --driver docker --container-runtime docker --gpus all --force --addons=nvidia-device-plugin +if [ "$GPU_AVAILABLE" = true ]; then + # Configure Docker for GPU support. + echo "Configuring Docker runtime for GPU support..." + if sudo "$NVIDIA_CTK_PATH" runtime configure --runtime=docker; then + echo "Restarting Docker to apply changes..." + sudo systemctl restart docker + echo "Docker runtime configured successfully." + else + echo "Error: Failed to configure Docker runtime using the NVIDIA Container Toolkit." + exit 1 + fi - # Install gpu-operator + # Start minikube with GPU support. + echo "Starting minikube with GPU support..." + sudo minikube start --driver=docker --container-runtime=docker --gpus=all --force --addons=nvidia-device-plugin + + # Update kubeconfig context. + echo "Updating kubeconfig context..." + sudo minikube update-context + + # Install the GPU Operator via Helm. + echo "Adding NVIDIA helm repo and updating..." sudo helm repo add nvidia https://helm.ngc.nvidia.com/nvidia && sudo helm repo update - sudo helm install --wait --generate-name \ - -n gpu-operator --create-namespace \ - nvidia/gpu-operator \ - --version=v24.9.1 + echo "Installing GPU Operator..." + sudo helm install --wait --generate-name -n gpu-operator --create-namespace nvidia/gpu-operator --version=v24.9.1 else - echo "No NVIDIA GPU detected, starting minikube without GPU support..." - # Fix permission issues + # No GPU: Start minikube without GPU support. + echo "Starting minikube without GPU support..." + # Fix potential permission issues. sudo sysctl fs.protected_regular=0 - # Start cluster without GPU - minikube start --driver docker --force + minikube start --driver=docker --force fi + +echo "Minikube cluster installation complete."