Skip to content

Commit

Permalink
[CI/Build][Router] Make semantic caching optional (#218)
Browse files Browse the repository at this point in the history
* Make semantic caching optional

Signed-off-by: Shaoting Feng <[email protected]>

* Avoid check semantic cache model

Signed-off-by: Shaoting Feng <[email protected]>

---------

Signed-off-by: Shaoting Feng <[email protected]>
  • Loading branch information
Shaoting-Feng authored Mar 3, 2025
1 parent fecae77 commit fcd75b4
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/functionality-helm-chart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
DOCKER_BUILDKIT: 1
run: |
cd ${{ github.workspace }}
sudo docker build -t localhost:5000/git-act-router -f docker/Dockerfile .
sudo docker build --build-arg INSTALL_SENTENCE_TRANSFORMERS=false -t localhost:5000/git-act-router -f docker/Dockerfile .
sudo docker push localhost:5000/git-act-router
sudo sysctl fs.protected_regular=0
sudo minikube image load localhost:5000/git-act-router
Expand Down
3 changes: 3 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ COPY .git/ .git/
# Copy the rest of the application code
COPY src/ src/

ARG INSTALL_SENTENCE_TRANSFORMERS=true
ENV INSTALL_SENTENCE_TRANSFORMERS=${INSTALL_SENTENCE_TRANSFORMERS}

# Install dependencies (use cache, and delete after install, to speed up the build)
RUN pip install --upgrade --no-cache-dir pip setuptools_scm && \
pip install --no-cache-dir .
Expand Down
38 changes: 24 additions & 14 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,36 @@
import os

from setuptools import find_packages, setup

install_sentence_transformers = (
os.getenv("INSTALL_SENTENCE_TRANSFORMERS", "true") == "true"
)

install_requires = [
"numpy==1.26.4",
"fastapi==0.115.8",
"httpx==0.28.1",
"uvicorn==0.34.0",
"kubernetes==32.0.0",
"prometheus_client==0.21.1",
"uhashring==2.3",
"aiofiles==24.1.0",
"python-multipart==0.0.20",
"faiss-cpu==1.10.0",
"huggingface-hub==0.25.2", # downgrade to 0.25.2 to avoid breaking changes
]

if install_sentence_transformers:
install_requires.append("sentence-transformers==2.2.2")

setup(
name="vllm-router",
use_scm_version=True,
setup_requires=["setuptools_scm"],
packages=find_packages(where="src"),
package_dir={"": "src"},
# Should be the same as src/router/requirements.txt
install_requires=[
"numpy==1.26.4",
"fastapi==0.115.8",
"httpx==0.28.1",
"uvicorn==0.34.0",
"kubernetes==32.0.0",
"prometheus_client==0.21.1",
"uhashring==2.3",
"aiofiles==24.1.0",
"python-multipart==0.0.20",
"sentence-transformers==2.2.2",
"faiss-cpu==1.10.0",
"huggingface-hub==0.25.2", # downgrade to 0.25.2 to avoid breaking changes
],
install_requires=install_requires,
entry_points={
"console_scripts": [
"vllm-router=vllm_router.router:main",
Expand Down
156 changes: 85 additions & 71 deletions src/vllm_router/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,29 @@
initialize_feature_gates,
)

# Semantic cache integration
from vllm_router.experimental.semantic_cache import (
GetSemanticCache,
InitializeSemanticCache,
enable_semantic_cache,
is_semantic_cache_enabled,
)
from vllm_router.experimental.semantic_cache_integration import (
add_semantic_cache_args,
check_semantic_cache,
semantic_cache_hit_ratio,
semantic_cache_hits,
semantic_cache_latency,
semantic_cache_misses,
semantic_cache_size,
store_in_semantic_cache,
)
try:
# Semantic cache integration
from vllm_router.experimental.semantic_cache import (
GetSemanticCache,
InitializeSemanticCache,
enable_semantic_cache,
is_semantic_cache_enabled,
)
from vllm_router.experimental.semantic_cache_integration import (
add_semantic_cache_args,
check_semantic_cache,
semantic_cache_hit_ratio,
semantic_cache_hits,
semantic_cache_latency,
semantic_cache_misses,
semantic_cache_size,
store_in_semantic_cache,
)

semantic_cache_available = True
except ImportError:
semantic_cache_available = False

from vllm_router.files import Storage, initialize_storage
from vllm_router.httpx_client import HTTPXClientWrapper
from vllm_router.protocols import ModelCard, ModelList
Expand Down Expand Up @@ -195,14 +201,15 @@ async def process_request(
backend_url, request_id, time.time()
)

# if debug_request:
# logger.debug(f"Finished the request with request id: {debug_request.headers.get('x-request-id', None)} at {time.time()}")
# Store in semantic cache if applicable
# Use the full response for non-streaming requests, or the last chunk for streaming
cache_chunk = bytes(full_response) if full_response is not None else chunk
await store_in_semantic_cache(
endpoint=endpoint, method=method, body=body, chunk=cache_chunk
)
if semantic_cache_available:
# if debug_request:
# logger.debug(f"Finished the request with request id: {debug_request.headers.get('x-request-id', None)} at {time.time()}")
# Store in semantic cache if applicable
# Use the full response for non-streaming requests, or the last chunk for streaming
cache_chunk = bytes(full_response) if full_response is not None else chunk
await store_in_semantic_cache(
endpoint=endpoint, method=method, body=body, chunk=cache_chunk
)


async def route_general_request(request: Request, endpoint: str):
Expand Down Expand Up @@ -521,13 +528,14 @@ async def route_cancel_batch(batch_id: str):

@app.post("/v1/chat/completions")
async def route_chat_completition(request: Request):
# Check if the request can be served from the semantic cache
logger.debug("Received chat completion request, checking semantic cache")
cache_response = await check_semantic_cache(request=request)
if semantic_cache_available:
# Check if the request can be served from the semantic cache
logger.debug("Received chat completion request, checking semantic cache")
cache_response = await check_semantic_cache(request=request)

if cache_response:
logger.info("Serving response from semantic cache")
return cache_response
if cache_response:
logger.info("Serving response from semantic cache")
return cache_response

logger.debug("No cache hit, forwarding request to backend")
return await route_general_request(request, "/v1/chat/completions")
Expand Down Expand Up @@ -832,8 +840,9 @@ def parse_args():
help="Show version and exit",
)

# Add semantic cache arguments
add_semantic_cache_args(parser)
if semantic_cache_available:
# Add semantic cache arguments
add_semantic_cache_args(parser)

# Add feature gates argument
parser.add_argument(
Expand Down Expand Up @@ -901,53 +910,58 @@ def InitializeAll(args):
initialize_feature_gates(args.feature_gates)
# Check if the SemanticCache feature gate is enabled
feature_gates = get_feature_gates()
if feature_gates.is_enabled("SemanticCache"):
# The feature gate is enabled, explicitly enable the semantic cache
enable_semantic_cache()
if semantic_cache_available:
if feature_gates.is_enabled("SemanticCache"):
# The feature gate is enabled, explicitly enable the semantic cache
enable_semantic_cache()

# Verify that the semantic cache was successfully enabled
if not is_semantic_cache_enabled():
logger.error("Failed to enable semantic cache feature")
# Verify that the semantic cache was successfully enabled
if not is_semantic_cache_enabled():
logger.error("Failed to enable semantic cache feature")

logger.info("SemanticCache feature gate is enabled")
logger.info("SemanticCache feature gate is enabled")

# Initialize the semantic cache with the model if specified
if args.semantic_cache_model:
logger.info(
f"Initializing semantic cache with model: {args.semantic_cache_model}"
)
logger.info(
f"Semantic cache directory: {args.semantic_cache_dir or 'default'}"
)
logger.info(f"Semantic cache threshold: {args.semantic_cache_threshold}")
# Initialize the semantic cache with the model if specified
if args.semantic_cache_model:
logger.info(
f"Initializing semantic cache with model: {args.semantic_cache_model}"
)
logger.info(
f"Semantic cache directory: {args.semantic_cache_dir or 'default'}"
)
logger.info(
f"Semantic cache threshold: {args.semantic_cache_threshold}"
)

cache = InitializeSemanticCache(
embedding_model=args.semantic_cache_model,
cache_dir=args.semantic_cache_dir,
default_similarity_threshold=args.semantic_cache_threshold,
)
cache = InitializeSemanticCache(
embedding_model=args.semantic_cache_model,
cache_dir=args.semantic_cache_dir,
default_similarity_threshold=args.semantic_cache_threshold,
)

# Update cache size metric
if cache and hasattr(cache, "db") and hasattr(cache.db, "index"):
semantic_cache_size.labels(server="router").set(
cache.db.index.ntotal
)
logger.info(
f"Semantic cache initialized with {cache.db.index.ntotal} entries"
)

# Update cache size metric
if cache and hasattr(cache, "db") and hasattr(cache.db, "index"):
semantic_cache_size.labels(server="router").set(cache.db.index.ntotal)
logger.info(
f"Semantic cache initialized with {cache.db.index.ntotal} entries"
f"Semantic cache initialized with model {args.semantic_cache_model}"
)

logger.info(
f"Semantic cache initialized with model {args.semantic_cache_model}"
)
else:
else:
logger.warning(
"SemanticCache feature gate is enabled but no embedding model specified. "
"The semantic cache will not be functional without an embedding model. "
"Use --semantic-cache-model to specify an embedding model."
)
elif args.semantic_cache_model:
logger.warning(
"SemanticCache feature gate is enabled but no embedding model specified. "
"The semantic cache will not be functional without an embedding model. "
"Use --semantic-cache-model to specify an embedding model."
"Semantic cache model specified but SemanticCache feature gate is not enabled. "
"Enable the feature gate with --feature-gates=SemanticCache=true"
)
elif args.semantic_cache_model:
logger.warning(
"Semantic cache model specified but SemanticCache feature gate is not enabled. "
"Enable the feature gate with --feature-gates=SemanticCache=true"
)

# --- Hybrid addition: attach singletons to FastAPI state ---
app.state.engine_stats_scraper = GetEngineStatsScraper()
Expand Down

0 comments on commit fcd75b4

Please sign in to comment.