diff --git a/observability/README.md b/observability/README.md index 0eb88782..2a40461f 100644 --- a/observability/README.md +++ b/observability/README.md @@ -15,6 +15,7 @@ Make sure to have: - Or follow our [tutorial](tutorials/00-install-kubernetes-env.md) After that you can run: + ```bash sudo bash install.sh ``` diff --git a/src/tests/requirements.txt b/src/tests/requirements.txt index 07fcefca..743b58aa 100644 --- a/src/tests/requirements.txt +++ b/src/tests/requirements.txt @@ -1,5 +1,5 @@ fastapi httpx -uvicorn openai +uvicorn vllm diff --git a/src/vllm_router/router.py b/src/vllm_router/router.py index 7539d527..5d35464f 100644 --- a/src/vllm_router/router.py +++ b/src/vllm_router/router.py @@ -8,7 +8,7 @@ import uvicorn from fastapi import FastAPI, Request, UploadFile from fastapi.responses import JSONResponse, Response, StreamingResponse -from prometheus_client import Gauge, generate_latest, CONTENT_TYPE_LATEST +from prometheus_client import CONTENT_TYPE_LATEST, Gauge, generate_latest from vllm_router.engine_stats import GetEngineStatsScraper, InitializeEngineStatsScraper from vllm_router.files import initialize_storage @@ -47,13 +47,22 @@ async def lifespan(app: FastAPI): "vllm:num_requests_running", "Number of running requests", ["server"] ) current_qps = Gauge("vllm:current_qps", "Current Queries Per Second", ["server"]) -avg_decoding_length = Gauge("vllm:avg_decoding_length", "Average Decoding Length", ["server"]) -num_prefill_requests = Gauge("vllm:num_prefill_requests", "Number of Prefill Requests", ["server"]) -num_decoding_requests = Gauge("vllm:num_decoding_requests", "Number of Decoding Requests", ["server"]) +avg_decoding_length = Gauge( + "vllm:avg_decoding_length", "Average Decoding Length", ["server"] +) +num_prefill_requests = Gauge( + "vllm:num_prefill_requests", "Number of Prefill Requests", ["server"] +) +num_decoding_requests = Gauge( + "vllm:num_decoding_requests", "Number of Decoding Requests", ["server"] +) -# --- Request Processing & Routing --- + +# --- Request Processing & Routing --- # TODO: better request id system -async def process_request(method, header, body, backend_url, request_id, endpoint, debug_request=None): +async def process_request( + method, header, body, backend_url, request_id, endpoint, debug_request=None +): """ Async generator to stream data from the backend server to the client. """ @@ -81,7 +90,9 @@ async def process_request(method, header, body, backend_url, request_id, endpoin total_len += len(chunk) if not first_token: first_token = True - GetRequestStatsMonitor().on_request_response(backend_url, request_id, time.time()) + GetRequestStatsMonitor().on_request_response( + backend_url, request_id, time.time() + ) yield chunk GetRequestStatsMonitor().on_request_complete(backend_url, request_id, time.time()) @@ -89,6 +100,7 @@ async def process_request(method, header, body, backend_url, request_id, endpoin # Optional debug logging can be enabled here. # logger.debug(f"Finished the request with id: {debug_request.headers.get('x-request-id', None)} at {time.time()}") + async def route_general_request(request: Request, endpoint: str): """ Route the incoming request to the backend server and stream the response back to the client. @@ -145,6 +157,7 @@ async def route_general_request(request: Request, endpoint: str): headers={key: value for key, value in headers.items()}, ) + # --- File Endpoints --- @app.post("/files") async def route_files(request: Request): @@ -171,6 +184,7 @@ async def route_files(request: Request): status_code=500, content={"error": f"Failed to save file: {str(e)}"} ) + @app.get("/files/{file_id}") async def route_get_file(file_id: str): try: @@ -181,6 +195,7 @@ async def route_get_file(file_id: str): status_code=404, content={"error": f"File {file_id} not found"} ) + @app.get("/files/{file_id}/content") async def route_get_file_content(file_id: str): try: @@ -192,20 +207,24 @@ async def route_get_file_content(file_id: str): status_code=404, content={"error": f"File {file_id} not found"} ) + # --- API Endpoints --- @app.post("/chat/completions") async def route_chat_completition(request: Request): return await route_general_request(request, "/v1/chat/completions") + @app.post("/completions") async def route_completition(request: Request): return await route_general_request(request, "/v1/completions") + @app.get("/version") async def show_version(): ver = {"version": STACK_VERSION} return JSONResponse(content=ver) + @app.get("/models") async def show_models(): endpoints = GetServiceDiscovery().get_endpoint_info() @@ -226,6 +245,7 @@ async def show_models(): model_list = ModelList(data=model_cards) return JSONResponse(content=model_list.model_dump()) + @app.get("/health") async def health() -> Response: """Health check: verifies that service discovery and engine stats scraping are operational.""" @@ -239,6 +259,7 @@ async def health() -> Response: ) return Response(status_code=200) + # --- Prometheus Metrics Endpoint (v2 observation/tracking) --- @app.get("/metrics") async def metrics(): @@ -249,9 +270,12 @@ async def metrics(): avg_decoding_length.labels(server=server).set(stat.ttft) num_prefill_requests.labels(server=server).set(stat.in_prefill_requests) num_decoding_requests.labels(server=server).set(stat.in_decoding_requests) - vnum_requests_running.labels(server=server).set(stat.in_prefill_requests + stat.in_decoding_requests) + vnum_requests_running.labels(server=server).set( + stat.in_prefill_requests + stat.in_decoding_requests + ) return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) + # --- Argument Parsing and Initialization --- def validate_args(args): if args.service_discovery == "static":