Skip to content

Commit

Permalink
feat: support PII detection in http request
Browse files Browse the repository at this point in the history
Signed-off-by: Huamin Chen <[email protected]>
  • Loading branch information
rootfs committed Mar 5, 2025
1 parent 26fda5e commit 749a4cf
Show file tree
Hide file tree
Showing 11 changed files with 779 additions and 134 deletions.
202 changes: 68 additions & 134 deletions src/vllm_router/experimental/feature_gates.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
"""Feature gates for experimental features."""

import json
import logging
import os
from enum import Enum
from typing import Dict, Optional, Set

from vllm_router.utils import SingletonMeta

logger = logging.getLogger(__name__)

# Feature gate names
SEMANTIC_CACHE = "SemanticCache"
PII_DETECTION = "PIIDetection" # Add PII detection feature gate


class FeatureStage(Enum):
"""
Expand Down Expand Up @@ -38,147 +47,68 @@ def __init__(
self.default_enabled = default_enabled


class FeatureGates:
"""
Manages experimental features through feature gates.
Similar to Kubernetes feature gates, this allows explicit enabling/disabling of features.
"""

_instance = None
class FeatureGates(metaclass=SingletonMeta):
"""Manages feature gates for experimental features."""

def __init__(self):
# Dictionary of all available features
self.available_features: Dict[str, Feature] = {}

# Set of enabled features
self.enabled_features: Set[str] = set()

# Register all known features
self._register_known_features()

def _register_known_features(self):
"""Register all known features with their default states."""
self.register_feature(
Feature(
name="SemanticCache",
description="Semantic caching of LLM requests and responses",
stage=FeatureStage.ALPHA,
default_enabled=False,
)
)
"""Initialize feature gates."""
self._enabled_features: Set[str] = set()

def enable(self, feature: str) -> None:
"""Enable a feature."""
self._enabled_features.add(feature)
logger.info(f"Enabled feature: {feature}")

def disable(self, feature: str) -> None:
"""Disable a feature."""
self._enabled_features.discard(feature)
logger.info(f"Disabled feature: {feature}")

def is_enabled(self, feature: str) -> bool:
"""Check if a feature is enabled."""
return feature in self._enabled_features

def configure(self, config: Dict[str, bool]) -> None:
"""Configure multiple features at once."""
for feature, enabled in config.items():
if enabled:
self.enable(feature)
else:
self.disable(feature)

def register_feature(self, feature: Feature):
"""
Register a new feature.
Args:
feature: The feature to register
"""
self.available_features[feature.name] = feature
if feature.default_enabled:
self.enabled_features.add(feature.name)

def enable_feature(self, feature_name: str) -> bool:
"""
Enable a feature by name.
Args:
feature_name: The name of the feature to enable
Returns:
True if the feature was enabled, False if it doesn't exist
"""
if feature_name in self.available_features:
self.enabled_features.add(feature_name)
logger.info(f"Feature '{feature_name}' enabled")
return True
logger.warning(f"Attempted to enable unknown feature '{feature_name}'")
return False

def disable_feature(self, feature_name: str) -> bool:
"""
Disable a feature by name.
Args:
feature_name: The name of the feature to disable
Returns:
True if the feature was disabled, False if it doesn't exist
"""
if feature_name in self.available_features:
self.enabled_features.discard(feature_name)
logger.info(f"Feature '{feature_name}' disabled")
return True
logger.warning(f"Attempted to disable unknown feature '{feature_name}'")
return False

def is_enabled(self, feature_name: str) -> bool:
"""
Check if a feature is enabled.
Args:
feature_name: The name of the feature to check
Returns:
True if the feature is enabled, False otherwise
"""
return feature_name in self.enabled_features

def parse_feature_gates(self, feature_gates_str: str):
"""
Parse a comma-separated list of feature gates.
Format: feature1=true,feature2=false
Args:
feature_gates_str: The feature gates string to parse
"""
if not feature_gates_str:
return

for gate in feature_gates_str.split(","):
if "=" not in gate:
logger.warning(f"Invalid feature gate format: {gate}")
continue

name, value = gate.split("=", 1)
name = name.strip()
value = value.strip().lower()
def initialize_feature_gates(config: Optional[str] = None) -> None:
"""
Initialize feature gates from a configuration string.
if value in ("true", "yes", "1"):
self.enable_feature(name)
elif value in ("false", "no", "0"):
self.disable_feature(name)
else:
logger.warning(f"Invalid feature gate value: {value}")
Args:
config: Configuration string in the format "feature1=true,feature2=false"
"""
feature_gates = get_feature_gates()

def list_features(self) -> Dict[str, Dict]:
"""
List all available features and their status.
if not config:
return

Returns:
A dictionary of feature information
"""
result = {}
for name, feature in self.available_features.items():
result[name] = {
"description": feature.description,
"stage": feature.stage.value,
"enabled": name in self.enabled_features,
}
return result
try:
# Parse config string
features = {}
for item in config.split(","):
if "=" not in item:
continue
name, value = item.split("=", 1)
features[name.strip()] = value.strip().lower() == "true"

# Configure feature gates
feature_gates.configure(features)

def get_feature_gates() -> FeatureGates:
"""
Get the singleton instance of FeatureGates.
except Exception as e:
logger.error(f"Failed to initialize feature gates: {e}")
raise

Returns:
The FeatureGates instance
"""
if FeatureGates._instance is None:
FeatureGates._instance = FeatureGates()
return FeatureGates._instance

def get_feature_gates() -> FeatureGates:
"""Get the feature gates singleton."""
return FeatureGates()


def initialize_feature_gates(feature_gates_str: Optional[str] = None):
Expand All @@ -193,14 +123,18 @@ def initialize_feature_gates(feature_gates_str: Optional[str] = None):
# Parse environment variable if it exists
env_feature_gates = os.environ.get("VLLM_FEATURE_GATES")
if env_feature_gates:
feature_gates.parse_feature_gates(env_feature_gates)
feature_gates.configure(
dict(map(lambda x: x.split("="), env_feature_gates.split(",")))
)

# Parse command-line argument if provided
if feature_gates_str:
feature_gates.parse_feature_gates(feature_gates_str)
feature_gates.configure(
dict(map(lambda x: x.split("="), feature_gates_str.split(",")))
)

# Log enabled features
enabled_features = [name for name in feature_gates.enabled_features]
enabled_features = [name for name in feature_gates._enabled_features]
if enabled_features:
logger.info(f"Enabled experimental features: {', '.join(enabled_features)}")
else:
Expand Down
68 changes: 68 additions & 0 deletions src/vllm_router/experimental/pii/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""PII detection module for vLLM router."""

import logging
from typing import Optional

from .analyzers.base import PIIAnalyzer
from .analyzers.factory import create_analyzer
from .config import PIIConfig
from .middleware import check_pii
from .types import PIIAction, PIITarget, PIIType

logger = logging.getLogger(__name__)

# Global analyzer instance
_analyzer: Optional[PIIAnalyzer] = None


async def initialize_pii_detection(
analyzer_type: str = "presidio", config: Optional[dict] = None
) -> None:
"""
Initialize PII detection with the specified analyzer.
Args:
analyzer_type: Type of analyzer to use
config: Optional configuration for the analyzer
"""
global _analyzer

try:
_analyzer = await create_analyzer(analyzer_type, config)
logger.info(f"Initialized PII detection with {analyzer_type} analyzer")
except Exception as e:
logger.error(f"Failed to initialize PII detection: {e}")
raise


async def shutdown_pii_detection() -> None:
"""Shutdown PII detection."""
global _analyzer

if _analyzer:
await _analyzer.shutdown()
_analyzer = None
logger.info("Shut down PII detection")


def get_pii_analyzer() -> Optional[PIIAnalyzer]:
"""Get the current PII analyzer instance."""
return _analyzer


def is_pii_detection_enabled() -> bool:
"""Check if PII detection is enabled."""
return _analyzer is not None


__all__ = [
"PIIAction",
"PIITarget",
"PIIType",
"PIIConfig",
"check_pii",
"initialize_pii_detection",
"shutdown_pii_detection",
"get_pii_analyzer",
"is_pii_detection_enabled",
]
13 changes: 13 additions & 0 deletions src/vllm_router/experimental/pii/analyzers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""PII analyzers package."""

from .base import PIIAnalysisResult, PIIAnalyzer, PIILocation
from .factory import create_analyzer
from .presidio import PresidioAnalyzer

__all__ = [
"PIIAnalyzer",
"PIIAnalysisResult",
"PIILocation",
"create_analyzer",
"PresidioAnalyzer",
]
65 changes: 65 additions & 0 deletions src/vllm_router/experimental/pii/analyzers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Base interface for PII analyzers."""

import abc
from dataclasses import dataclass
from typing import Dict, List, Optional, Set

from ..types import PIIType


@dataclass
class PIILocation:
"""Location of PII in text."""

start: int # Start index in text
end: int # End index in text
pii_type: PIIType # Type of PII found
value: str # The actual PII text found
score: float # Confidence score of the detection


@dataclass
class PIIAnalysisResult:
"""Result of PII analysis."""

has_pii: bool # Whether PII was found
detected_types: Set[PIIType] # Types of PII found
pii_locations: Optional[List[PIILocation]] = None # Locations of PII in text


class PIIAnalyzer(abc.ABC):
"""Base class for PII analyzers."""

def __init__(self, config: Optional[Dict] = None):
"""Initialize the analyzer with optional config."""
self.config = config or {}

@abc.abstractmethod
async def analyze(
self,
text: str,
pii_types: Optional[Set[PIIType]] = None,
score_threshold: float = 0.5,
) -> PIIAnalysisResult:
"""
Analyze text for PII.
Args:
text: Text to analyze
pii_types: Types of PII to look for. If None, look for all types.
score_threshold: Minimum confidence score to consider a match
Returns:
PIIAnalysisResult containing analysis results
"""
raise NotImplementedError

@abc.abstractmethod
async def initialize(self) -> None:
"""Initialize the analyzer. Called once before first use."""
raise NotImplementedError

@abc.abstractmethod
async def shutdown(self) -> None:
"""Shutdown the analyzer. Called when service is shutting down."""
raise NotImplementedError
Loading

0 comments on commit 749a4cf

Please sign in to comment.