Skip to content

Commit

Permalink
now runnable
Browse files Browse the repository at this point in the history
Signed-off-by: KuntaiDu <[email protected]>
  • Loading branch information
KuntaiDu committed Mar 6, 2025
1 parent 8e21fc7 commit 49dd17a
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 163 deletions.
6 changes: 4 additions & 2 deletions src/vllm_router/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ The router can be configured using command-line arguments. Below are the availab

### Routing Logic Options

- `--routing-logic`: The routing logic to use. Options are `roundrobin` or `session`. This option is required.
- `--routing-affinity`: The routing logic to use. Options are `roundrobin` or `session`. This option is required.
- `--session-key`: The key (in the header) to identify a session.

### Monitoring Options
Expand Down Expand Up @@ -71,7 +71,9 @@ vllm-router --port 8000 \
--static-models "facebook/opt-125m,meta-llama/Llama-3.1-8B-Instruct,facebook/opt-125m" \
--engine-stats-interval 10 \
--log-stats \
--routing-logic roundrobin
--routing-affinity longest_prefix \
--endpoint-filters num_queueing_request \
--endpoint-filters-configs {}
```

## Dynamic Router Config
Expand Down
8 changes: 7 additions & 1 deletion src/vllm_router/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ def initialize_all(app: FastAPI, args):
args.batch_processor, args.file_storage_path, app.state.batch_storage
)

initialize_routing_logic(args.routing_logic, session_key=args.session_key)
initialize_routing_logic(
args.routing_affinity,
args.session_key,
args.routing_affinity_config,
args.endpoint_filters,
args.endpoint_filters_configs,
)

# Initialize feature gates
initialize_feature_gates(args.feature_gates)
Expand Down
35 changes: 28 additions & 7 deletions src/vllm_router/parsers/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ def validate_args(args):
)
if args.service_discovery == "k8s" and args.k8s_port is None:
raise ValueError("K8s port must be provided when using K8s service discovery.")
if args.routing_logic == "session" and args.session_key is None:
if args.routing_affinity == "session" and args.session_key is None:
raise ValueError(
"Session key must be provided when using session routing logic."
"Session key must be provided when using session routing affinity."
)
if args.log_stats and args.log_stats_interval <= 0:
raise ValueError("Log stats interval must be greater than 0.")
Expand Down Expand Up @@ -96,24 +96,45 @@ def parse_args():
help="The label selector to filter vLLM pods when using K8s service discovery.",
)
parser.add_argument(
"--routing-logic",
"--routing-affinity",
type=str,
required=True,
choices=["roundrobin", "session"],
help="The routing logic to use",
choices=["roundrobin", "session", "longest_prefix", "simhash"],
help="The routing affinity to use.",
)
parser.add_argument(
"--session-key",
type=str,
default=None,
help="The key (in the header) to identify a session.",
help="The key (in the header) to identify a session. This is a shortcut"
" for --routing-affinity-config "
'\'{"session_key": "<session_key>"}\'.',
)
parser.add_argument(
"--routing-config",
"--routing-affinity-config",
type=str,
default="{}",
help="The routing configuration in JSON format.",
)
parser.add_argument(
"--endpoint-filters",
nargs="+",
default=[],
choices=["num_queueing_request"],
help="Tndpoint filters to use. Example usage: "
"--endpoint-filters num_queueing_request other_filter_A "
"other_filter_B ...",
)
parser.add_argument(
"--endpoint-filters-configs",
nargs="+",
default=[],
help="The configurations for endpoint filters, in JSON format. "
"Example usage: "
"--endpoint-filters-configs "
"'{\"percentile\": 0.9}' "
"other_filter_config_A other_filter_config_B ...",
)

# Batch API
# TODO(gaocegege): Make these batch api related arguments to a separate config.
Expand Down
132 changes: 89 additions & 43 deletions src/vllm_router/routers/routing_logic.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import abc
import enum
from typing import Dict, List
import json
from typing import Any, Dict, List

from fastapi import Request
from uhashring import HashRing

from vllm_router.log import init_logger
from vllm_router.service_discovery import EndpointInfo
from vllm_router.services.routing_service.affinity.factory import get_affinity
from vllm_router.services.routing_service.endpoint_filter.factory import (
get_endpoint_filter,
)
from vllm_router.stats.engine_stats import EngineStats
from vllm_router.stats.request_stats import RequestStats
from vllm_router.utils import SingletonABCMeta

from vllm_router.routers.affinity.factory import get_affinity
from vllm_router.routers.endpoint_filter.factory import get_endpoint_filter

logger = init_logger(__name__)


Expand Down Expand Up @@ -44,36 +46,56 @@ def route_request(
class Router(RoutingInterface):

def __init__(
self,
**kwargs: Dict[str, Any],
self,
routing_affinity: str,
routing_affinity_config: Dict[str, Any],
endpoint_filters: List[str],
endpoint_filters_configs: List[Dict[str, Any]],
):

if hasattr(self, "_initialized"):
return

self.reconfigure(**kwargs)
self.reconfigure(
routing_affinity=routing_affinity,
routing_affinity_config=routing_affinity_config,
endpoint_filters=endpoint_filters,
endpoint_filters_configs=endpoint_filters_configs,
)
self.initialized = True

def reconfigure(self, **kwargs: Dict[str, Any]):
def reconfigure(
self,
routing_affinity: str,
routing_affinity_config: str,
endpoint_filters: List[str],
endpoint_filters_configs: List[str],
):

# Initialize the affinity module
self.affinity = None
if "affinity" not in kwargs:
logger.warning("No affinity specified, using simple round-robin logic to select endpoints")
self.affinity = get_affinity("round_robin", {})
else:
self.affinity = get_affinity(**kwargs["affinity"])

routing_affinity_config = json.loads(routing_affinity_config)
self.affinity = get_affinity(routing_affinity, **routing_affinity_config)

# Initialize the endpoint filters
self.endpoint_filters = []
if "endpoint_filters" not in kwargs:
logger.info("No endpoint filters specified.")
else:
for endpoint_filter_kwargs in kwargs["endpoint_filters"]:
self.endpoint_filters.append(get_endpoint_filter(**endpoint_filter_kwargs))

self._initialized = True
assert len(endpoint_filters) == len(endpoint_filters_configs), (
"The number of items in endpoint filters and endpoint filter "
"configs must be the same"
)

for endpoint_filter_name, endpoint_filter_config in zip(
endpoint_filters, endpoint_filters_configs
):
self.endpoint_filters.append(
get_endpoint_filter(
endpoint_filter_name, **json.loads(endpoint_filter_config)
)
)

self._initialized = True

def route_request(
self,
Expand All @@ -84,62 +106,86 @@ def route_request(
request_json: Dict[str, Any],
) -> str:

self.affinity.update_endpoints_stats(endpoints, engine_stats, request_stats)

endpoints = set(endpoint.url for endpoint in endpoints)
assert endpoints, "No endpoints provided for the routing logic."

for endpoint_filter in self.endpoint_filters:
previous_endpoints = endpoints
endpoints = endpoint_filter.get_filtered_endpoints(
endpoints,
request_stats,
engine_stats
endpoints, request_stats, engine_stats
)
if not endpoints:
logger.warning(f"Endpoint filter {endpoint_filter.name} "
f"removed all endpoints from "
f"{previous_endpoints}. Reverting to previous "
f"endpoints and skipping all remaining "
f"endpoint filters.")
logger.warning(
f"Endpoint filter {endpoint_filter.name} "
f"removed all endpoints from "
f"{previous_endpoints}. Reverting to previous "
f"endpoints and skipping all remaining "
f"endpoint filters."
)
endpoints = previous_endpoints
break

# NOTE(Kuntai): Only update the endpoint stats for the candidate
# endpoints instead of all endpoints.
# Another design is to actually update the endpoint stats for all
# endpoints. But I don't see that there is a strong reason to do so.
self.affinity.update_endpoints_stats(endpoints, engine_stats, request_stats)

selected_endpoint = self.affinity.get_high_affinity_endpoint(
request,
request_json,
endpoints
request, request_json, endpoints
)

self.affinity.on_request_routed(
request,
request_json,
selected_endpoint
)
self.affinity.on_request_routed(request, request_json, selected_endpoint)

return selected_endpoint


_router = None


# Instead of managing a global _global_router, we can define the initialization functions as:
def initialize_routing_logic(
**kwargs
routing_affinity: str,
session_key: str,
routing_affinity_config: str,
endpoint_filters: List[str],
endpoint_filters_configs: str,
) -> RoutingInterface:

global _router
assert _router is None, "Routing logic already initialized"
_router = Router(**kwargs)
if routing_affinity == "session":
routing_affinity_config.update({"session_key": session_key})
_router = Router(
routing_affinity=routing_affinity,
routing_affinity_config=routing_affinity_config,
endpoint_filters=endpoint_filters,
endpoint_filters_configs=endpoint_filters_configs,
)
return _router


def reconfigure_routing_logic(
**kwargs
routing_affinity: str,
session_key: str,
routing_affinity_config: str,
endpoint_filters: List[str],
endpoint_filters_configs: str,
) -> RoutingInterface:
_router.reconfigure(**kwargs)
global _router
_router.reconfigure(
routing_affinity=routing_affinity,
routing_affinity_config=routing_affinity_config,
endpoint_filters=endpoint_filters,
endpoint_filters_configs=endpoint_filters_configs,
)
return _router


def get_routing_logic() -> RoutingInterface:
assert _router is not None, ("Routing logic not initialized. "
"Please call initialize_routing_logic() first.")
global _router
assert _router is not None, (
"Routing logic not initialized. "
"Please call initialize_routing_logic() first."
)
return _router
2 changes: 1 addition & 1 deletion src/vllm_router/services/request_service/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ async def route_general_request(request: Request, endpoint: str):

logger.debug(f"Routing request {request_id} for model: {requested_model}")
server_url = request.app.state.router.route_request(
endpoints, engine_stats, request_stats, request
endpoints, engine_stats, request_stats, request, request_json
)
curr_time = time.time()
logger.info(
Expand Down
12 changes: 8 additions & 4 deletions src/vllm_router/services/routing_service/affinity/base.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
"""
Abstract class for best endpoint selector.
Abstract class for best endpoint selector.
"""

import abc
from typing import Set, Dict, Any
from typing import Any, Dict, Set

from fastapi import Request
from vllm_router.types import EngineStats, RequestStats

class BaseAffinityMaintainer(metaclass=abc.ABCMeta):
from vllm_router.stats.engine_stats import EngineStats
from vllm_router.stats.request_stats import RequestStats


class BaseAffinity(metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_high_affinity_endpoint(
self,
Expand Down
39 changes: 23 additions & 16 deletions src/vllm_router/services/routing_service/affinity/factory.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,39 @@

from vllm_router.services.routing_service.affinity.base import BaseAffinity

import json
from logging import getLogger

from vllm_router.services.routing_service.affinity.base import BaseAffinity

logger = getLogger(__name__)

from vllm_router.services.routing_service.affinity.round_robin_affinity import RoundRobinAffinity
from vllm_router.services.routing_service.affinity.session_based_affinity import SessionBasedAffinity
from vllm_router.services.routing_service.affinity.longest_prefix_affinity import LongestPrefixAffinity
from vllm_router.services.routing_service.affinity.simhash_affinity import SimhashAffinity
from vllm_router.services.routing_service.affinity.longest_prefix_affinity import (
LongestPrefixAffinity,
)
from vllm_router.services.routing_service.affinity.round_robin_affinity import (
RoundRobinAffinity,
)
from vllm_router.services.routing_service.affinity.session_affinity import (
SessionAffinity,
)
from vllm_router.services.routing_service.affinity.simhash_affinity import (
SimhashAffinity,
)

affinity_name_to_class = {
"round_robin": RoundRobinAffinity,
"session": SessionBasedAffinity,
"session": SessionAffinity,
"longest_prefix": LongestPrefixAffinity,
"simhash": SimhashAffinity,
}

def get_affinity(affinity_name: str, affinity_config: Dict[str, Any] = {}, **kwargs) -> BaseAffinity:

if affinity_name not in affinity_name_to_class:
raise ValueError(f"Invalid affinity name: {affinity_name}")
def get_affinity(routing_affinity_name: str, **kwargs) -> BaseAffinity:

if routing_affinity_name not in affinity_name_to_class:
raise ValueError(f"Invalid affinity name: {routing_affinity_name}")

assert kwargs == {}, ("There are extra kwargs forwarded to the affinity "
"factory method. This is likely unintended. "
"Received kwargs: %s" % kwargs)
routing_affinity_config = kwargs

logger.info(f"Using affinity type: {affinity_name} with config: {affinity_config}")
return affinity_name_to_class[affinity_name](**affinity_config)
logger.info(
f"Using affinity type: {routing_affinity_name} with config: {routing_affinity_config}"
)
return affinity_name_to_class[routing_affinity_name](**routing_affinity_config)
Loading

0 comments on commit 49dd17a

Please sign in to comment.