Skip to content

Commit

Permalink
[Feat] dynamic configuration support for router (#207)
Browse files Browse the repository at this point in the history
* [Add] dynamic router config support

Signed-off-by: ApostaC <[email protected]>

* [Fix] small errors and add readme

Signed-off-by: ApostaC <[email protected]>

* [Add] proper close and reconfigure for different components

Signed-off-by: ApostaC <[email protected]>

* [Add] getting dynamic config from the /health endpoint

Signed-off-by: ApostaC <[email protected]>

---------

Signed-off-by: ApostaC <[email protected]>
  • Loading branch information
ApostaC authored Mar 1, 2025
1 parent 09d5c10 commit c1eb84b
Show file tree
Hide file tree
Showing 7 changed files with 508 additions and 71 deletions.
53 changes: 53 additions & 0 deletions src/vllm_router/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ The router can be configured using command-line arguments. Below are the availab

- `--log-stats`: Log statistics every 30 seconds.

### Dynamic Config Options

- `--dynamic-config-json`: The path to the json file containing the dynamic configuration.

## Build docker image

```bash
Expand All @@ -69,3 +73,52 @@ vllm-router --port 8000 \
--log-stats \
--routing-logic roundrobin
```

## Dynamic Router Config

The router can be configured dynamically using a json file when passing the `--dynamic-config-json` option.
The router will watch the json file for changes and update the configuration accordingly (every 10 seconds).

Currently, the dynamic config supports the following fields:

**Required fields:**

- `service_discovery`: The service discovery type. Options are `static` or `k8s`.
- `routing_logic`: The routing logic to use. Options are `roundrobin` or `session`.

**Optional fields:**

- (When using `static` service discovery) `static_backends`: The URLs of static serving engines, separated by commas (e.g., `http://localhost:9001,http://localhost:9002,http://localhost:9003`).
- (When using `static` service discovery) `static_models`: The models running in the static serving engines, separated by commas (e.g., `model1,model2`).
- (When using `k8s` service discovery) `k8s_port`: The port of vLLM processes when using K8s service discovery. Default is `8000`.
- (When using `k8s` service discovery) `k8s_namespace`: The namespace of vLLM pods when using K8s service discovery. Default is `default`.
- (When using `k8s` service discovery) `k8s_label_selector`: The label selector to filter vLLM pods when using K8s service discovery.
- `session_key`: The key (in the header) to identify a session when using session-based routing.

Here is an example dynamic config file:

```json
{
"service_discovery": "static",
"routing_logic": "roundrobin",
"static_backends": "http://localhost:9001,http://localhost:9002,http://localhost:9003",
"static_models": "facebook/opt-125m,meta-llama/Llama-3.1-8B-Instruct,facebook/opt-125m"
}
```

### Get current dynamic config

If the dynamic config is enabled, the router will reflect the current dynamic config in the `/health` endpoint.

```bash
curl http://<router_host>:<router_port>/health
```

The response will be a JSON object with the current dynamic config.

```json
{
"status": "healthy",
"dynamic_config": <current_dynamic_config (JSON object)>
}
```
228 changes: 228 additions & 0 deletions src/vllm_router/dynamic_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import json
import threading
import time
from dataclasses import dataclass
from typing import Optional

from fastapi import FastAPI

from vllm_router.log import init_logger
from vllm_router.routing_logic import ReconfigureRoutingLogic
from vllm_router.service_discovery import (
ReconfigureServiceDiscovery,
ServiceDiscoveryType,
)
from vllm_router.utils import SingletonMeta, parse_static_model_names, parse_static_urls

logger = init_logger(__name__)


@dataclass
class DynamicRouterConfig:
"""
Re-configurable configurations for the VLLM router.
"""

# Required configurations
service_discovery: str
routing_logic: str

# Optional configurations
# Service discovery configurations
static_backends: Optional[str] = None
static_models: Optional[str] = None
k8s_port: Optional[int] = None
k8s_namespace: Optional[str] = None
k8s_label_selector: Optional[str] = None

# Routing logic configurations
session_key: Optional[str] = None

# Batch API configurations
# TODO (ApostaC): Support dynamic reconfiguration of batch API
# enable_batch_api: bool
# file_storage_class: str
# file_storage_path: str
# batch_processor: str

# Stats configurations
# TODO (ApostaC): Support dynamic reconfiguration of stats monitor
# engine_stats_interval: int
# request_stats_window: int
# log_stats: bool
# log_stats_interval: int

@staticmethod
def from_args(args) -> "DynamicRouterConfig":
return DynamicRouterConfig(
service_discovery=args.service_discovery,
static_backends=args.static_backends,
static_models=args.static_models,
k8s_port=args.k8s_port,
k8s_namespace=args.k8s_namespace,
k8s_label_selector=args.k8s_label_selector,
# Routing logic configurations
routing_logic=args.routing_logic,
session_key=args.session_key,
)

@staticmethod
def from_json(json_path: str) -> "DynamicRouterConfig":
with open(json_path, "r") as f:
config = json.load(f)
return DynamicRouterConfig(**config)

def to_json_str(self) -> str:
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)


class DynamicConfigWatcher(metaclass=SingletonMeta):
"""
Watches a config json file for changes and updates the DynamicRouterConfig accordingly.
"""

def __init__(
self,
config_json: str,
watch_interval: int,
init_config: DynamicRouterConfig,
app: FastAPI,
):
"""
Initializes the ConfigMapWatcher with the given ConfigMap name and namespace.
Args:
config_json: the path to the json file containing the dynamic configuration
watch_interval: the interval in seconds at which to watch the for changes
app: the fastapi app to reconfigure
"""
self.config_json = config_json
self.watch_interval = watch_interval
self.current_config = init_config
self.app = app

# Watcher thread
self.running = True
self.watcher_thread = threading.Thread(target=self._watch_worker)
self.watcher_thread.start()
assert hasattr(self.app, "state")

def get_current_config(self) -> DynamicRouterConfig:
return self.current_config

def reconfigure_service_discovery(self, config: DynamicRouterConfig):
"""
Reconfigures the router with the given config.
"""
if config.service_discovery == "static":
ReconfigureServiceDiscovery(
ServiceDiscoveryType.STATIC,
urls=parse_static_urls(config.static_backends),
models=parse_static_model_names(config.static_models),
)
elif config.service_discovery == "k8s":
ReconfigureServiceDiscovery(
ServiceDiscoveryType.K8S,
namespace=config.k8s_namespace,
port=config.k8s_port,
label_selector=config.k8s_label_selector,
)
else:
raise ValueError(
f"Invalid service discovery type: {config.service_discovery}"
)

logger.info(f"DynamicConfigWatcher: Service discovery reconfiguration complete")

def reconfigure_routing_logic(self, config: DynamicRouterConfig):
"""
Reconfigures the router with the given config.
"""
routing_logic = ReconfigureRoutingLogic(
config.routing_logic, session_key=config.session_key
)
self.app.state.router = routing_logic
logger.info(f"DynamicConfigWatcher: Routing logic reconfiguration complete")

def reconfigure_batch_api(self, config: DynamicRouterConfig):
"""
Reconfigures the router with the given config.
"""
# TODO (ApostaC): Implement reconfigure_batch_api
pass

def reconfigure_stats(self, config: DynamicRouterConfig):
"""
Reconfigures the router with the given config.
"""
# TODO (ApostaC): Implement reconfigure_stats
pass

def reconfigure_all(self, config: DynamicRouterConfig):
"""
Reconfigures the router with the given config.
"""
self.reconfigure_service_discovery(config)
self.reconfigure_routing_logic(config)
self.reconfigure_batch_api(config)
self.reconfigure_stats(config)

def _sleep_or_break(self, check_interval: float = 1):
"""
Sleep for self.watch_interval seconds if self.running is True.
Otherwise, break the loop.
"""
for _ in range(int(self.watch_interval / check_interval)):
if not self.running:
break
time.sleep(check_interval)

def _watch_worker(self):
"""
Watches the config file for changes and updates the DynamicRouterConfig accordingly.
On every watch_interval, it will try loading the config file and compare the changes.
If the config file has changed, it will reconfigure the system with the new config.
"""
while self.running:
try:
config = DynamicRouterConfig.from_json(self.config_json)
if config != self.current_config:
logger.info(
f"DynamicConfigWatcher: Config changed, reconfiguring..."
)
self.reconfigure_all(config)
logger.info(
f"DynamicConfigWatcher: Config reconfiguration complete"
)
self.current_config = config
except Exception as e:
logger.warning(f"DynamicConfigWatcher: Error loading config file: {e}")

self._sleep_or_break()

def close(self):
"""
Closes the watcher thread.
"""
self.running = False
self.watcher_thread.join()
logger.info("DynamicConfigWatcher: Closed")


def InitializeDynamicConfigWatcher(
config_json: str,
watch_interval: int,
init_config: DynamicRouterConfig,
app: FastAPI,
):
"""
Initializes the DynamicConfigWatcher with the given config json and watch interval.
"""
return DynamicConfigWatcher(config_json, watch_interval, init_config, app)


def GetDynamicConfigWatcher() -> DynamicConfigWatcher:
"""
Returns the DynamicConfigWatcher singleton.
"""
return DynamicConfigWatcher(_create=False)
42 changes: 26 additions & 16 deletions src/vllm_router/engine_stats.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,18 @@
import threading
import time
from dataclasses import dataclass
from typing import Dict, Optional
from typing import Dict, Optional, Tuple

import requests
from prometheus_client.parser import text_string_to_metric_families

from vllm_router.log import init_logger
from vllm_router.service_discovery import GetServiceDiscovery
from vllm_router.utils import SingletonMeta

logger = init_logger(__name__)


class SingletonMeta(type):
_instances = {}

def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]


@dataclass
class EngineStats:
# Number of running requests
Expand Down Expand Up @@ -92,10 +83,12 @@ def __init__(self, scrape_interval: float):
raise ValueError(
"EngineStatsScraper must be initialized with scrape_interval"
)
self.service_discovery = GetServiceDiscovery() # (remains unchanged)
self.engine_stats: Dict[str, EngineStats] = {}
self.engine_stats_lock = threading.Lock()
self.scrape_interval = scrape_interval

# scrape thread
self.running = True
self.scrape_thread = threading.Thread(target=self._scrape_worker, daemon=True)
self.scrape_thread.start()
self._initialized = True
Expand All @@ -108,7 +101,7 @@ def _scrape_one_endpoint(self, url: str):
url (str): The URL of the serving engine (does not contain endpoint)
"""
try:
response = requests.get(url + "/metrics")
response = requests.get(url + "/metrics", timeout=self.scrape_interval)
response.raise_for_status()
engine_stats = EngineStats.FromVllmScrape(response.text)
except Exception as e:
Expand All @@ -126,7 +119,7 @@ def _scrape_metrics(self):
"""
collected_engine_stats = {}
endpoints = self.service_discovery.get_endpoint_info()
endpoints = GetServiceDiscovery().get_endpoint_info()
logger.info(f"Scraping metrics from {len(endpoints)} serving engine(s)")
for info in endpoints:
url = info.url
Expand All @@ -142,6 +135,16 @@ def _scrape_metrics(self):
for url, stats in collected_engine_stats.items():
self.engine_stats[url] = stats

def _sleep_or_break(self, check_interval: float = 1):
"""
Sleep for self.scrape_interval seconds if self.running is True.
Otherwise, break the loop.
"""
for _ in range(int(self.scrape_interval / check_interval)):
if not self.running:
break
time.sleep(check_interval)

def _scrape_worker(self):
"""
Periodically scrape metrics from all serving engines in the background.
Expand All @@ -151,9 +154,9 @@ def _scrape_worker(self):
metrics from all serving engines and store them in self.engine_stats.
"""
while True:
while self.running:
self._scrape_metrics()
time.sleep(self.scrape_interval)
self._sleep_or_break()

def get_engine_stats(self) -> Dict[str, EngineStats]:
"""
Expand All @@ -175,6 +178,13 @@ def get_health(self) -> bool:
"""
return self.scrape_thread.is_alive()

def close(self):
"""
Stop the background thread and cleanup resources.
"""
self.running = False
self.scrape_thread.join()


def InitializeEngineStatsScraper(scrape_interval: float) -> EngineStatsScraper:
return EngineStatsScraper(scrape_interval)
Expand Down
Loading

0 comments on commit c1eb84b

Please sign in to comment.