Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support PII detection in http request #235

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 68 additions & 134 deletions src/vllm_router/experimental/feature_gates.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
"""Feature gates for experimental features."""

import json
import logging
import os
from enum import Enum
from typing import Dict, Optional, Set

from vllm_router.utils import SingletonMeta

logger = logging.getLogger(__name__)

# Feature gate names
SEMANTIC_CACHE = "SemanticCache"
PII_DETECTION = "PIIDetection" # Add PII detection feature gate


class FeatureStage(Enum):
"""
Expand Down Expand Up @@ -38,147 +47,68 @@ def __init__(
self.default_enabled = default_enabled


class FeatureGates:
"""
Manages experimental features through feature gates.
Similar to Kubernetes feature gates, this allows explicit enabling/disabling of features.
"""

_instance = None
class FeatureGates(metaclass=SingletonMeta):
"""Manages feature gates for experimental features."""

def __init__(self):
# Dictionary of all available features
self.available_features: Dict[str, Feature] = {}

# Set of enabled features
self.enabled_features: Set[str] = set()

# Register all known features
self._register_known_features()

def _register_known_features(self):
"""Register all known features with their default states."""
self.register_feature(
Feature(
name="SemanticCache",
description="Semantic caching of LLM requests and responses",
stage=FeatureStage.ALPHA,
default_enabled=False,
)
)
"""Initialize feature gates."""
self._enabled_features: Set[str] = set()

def enable(self, feature: str) -> None:
"""Enable a feature."""
self._enabled_features.add(feature)
logger.info(f"Enabled feature: {feature}")

def disable(self, feature: str) -> None:
"""Disable a feature."""
self._enabled_features.discard(feature)
logger.info(f"Disabled feature: {feature}")

def is_enabled(self, feature: str) -> bool:
"""Check if a feature is enabled."""
return feature in self._enabled_features

def configure(self, config: Dict[str, bool]) -> None:
"""Configure multiple features at once."""
for feature, enabled in config.items():
if enabled:
self.enable(feature)
else:
self.disable(feature)

def register_feature(self, feature: Feature):
"""
Register a new feature.

Args:
feature: The feature to register
"""
self.available_features[feature.name] = feature
if feature.default_enabled:
self.enabled_features.add(feature.name)

def enable_feature(self, feature_name: str) -> bool:
"""
Enable a feature by name.

Args:
feature_name: The name of the feature to enable

Returns:
True if the feature was enabled, False if it doesn't exist
"""
if feature_name in self.available_features:
self.enabled_features.add(feature_name)
logger.info(f"Feature '{feature_name}' enabled")
return True
logger.warning(f"Attempted to enable unknown feature '{feature_name}'")
return False

def disable_feature(self, feature_name: str) -> bool:
"""
Disable a feature by name.

Args:
feature_name: The name of the feature to disable

Returns:
True if the feature was disabled, False if it doesn't exist
"""
if feature_name in self.available_features:
self.enabled_features.discard(feature_name)
logger.info(f"Feature '{feature_name}' disabled")
return True
logger.warning(f"Attempted to disable unknown feature '{feature_name}'")
return False

def is_enabled(self, feature_name: str) -> bool:
"""
Check if a feature is enabled.

Args:
feature_name: The name of the feature to check

Returns:
True if the feature is enabled, False otherwise
"""
return feature_name in self.enabled_features

def parse_feature_gates(self, feature_gates_str: str):
"""
Parse a comma-separated list of feature gates.

Format: feature1=true,feature2=false

Args:
feature_gates_str: The feature gates string to parse
"""
if not feature_gates_str:
return

for gate in feature_gates_str.split(","):
if "=" not in gate:
logger.warning(f"Invalid feature gate format: {gate}")
continue

name, value = gate.split("=", 1)
name = name.strip()
value = value.strip().lower()
def initialize_feature_gates(config: Optional[str] = None) -> None:
"""
Initialize feature gates from a configuration string.

if value in ("true", "yes", "1"):
self.enable_feature(name)
elif value in ("false", "no", "0"):
self.disable_feature(name)
else:
logger.warning(f"Invalid feature gate value: {value}")
Args:
config: Configuration string in the format "feature1=true,feature2=false"
"""
feature_gates = get_feature_gates()

def list_features(self) -> Dict[str, Dict]:
"""
List all available features and their status.
if not config:
return

Returns:
A dictionary of feature information
"""
result = {}
for name, feature in self.available_features.items():
result[name] = {
"description": feature.description,
"stage": feature.stage.value,
"enabled": name in self.enabled_features,
}
return result
try:
# Parse config string
features = {}
for item in config.split(","):
if "=" not in item:
continue
name, value = item.split("=", 1)
features[name.strip()] = value.strip().lower() == "true"

# Configure feature gates
feature_gates.configure(features)

def get_feature_gates() -> FeatureGates:
"""
Get the singleton instance of FeatureGates.
except Exception as e:
logger.error(f"Failed to initialize feature gates: {e}")
raise

Returns:
The FeatureGates instance
"""
if FeatureGates._instance is None:
FeatureGates._instance = FeatureGates()
return FeatureGates._instance

def get_feature_gates() -> FeatureGates:
"""Get the feature gates singleton."""
return FeatureGates()


def initialize_feature_gates(feature_gates_str: Optional[str] = None):
Expand All @@ -193,14 +123,18 @@ def initialize_feature_gates(feature_gates_str: Optional[str] = None):
# Parse environment variable if it exists
env_feature_gates = os.environ.get("VLLM_FEATURE_GATES")
if env_feature_gates:
feature_gates.parse_feature_gates(env_feature_gates)
feature_gates.configure(
dict(map(lambda x: x.split("="), env_feature_gates.split(",")))
)

# Parse command-line argument if provided
if feature_gates_str:
feature_gates.parse_feature_gates(feature_gates_str)
feature_gates.configure(
dict(map(lambda x: x.split("="), feature_gates_str.split(",")))
)

# Log enabled features
enabled_features = [name for name in feature_gates.enabled_features]
enabled_features = [name for name in feature_gates._enabled_features]
if enabled_features:
logger.info(f"Enabled experimental features: {', '.join(enabled_features)}")
else:
Expand Down
68 changes: 68 additions & 0 deletions src/vllm_router/experimental/pii/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""PII detection module for vLLM router."""

import logging
from typing import Optional

from .analyzers.base import PIIAnalyzer
from .analyzers.factory import create_analyzer
from .config import PIIConfig
from .middleware import check_pii
from .types import PIIAction, PIITarget, PIIType

logger = logging.getLogger(__name__)

# Global analyzer instance
_analyzer: Optional[PIIAnalyzer] = None


async def initialize_pii_detection(
analyzer_type: str = "presidio", config: Optional[dict] = None
) -> None:
"""
Initialize PII detection with the specified analyzer.

Args:
analyzer_type: Type of analyzer to use
config: Optional configuration for the analyzer
"""
global _analyzer

try:
_analyzer = await create_analyzer(analyzer_type, config)
logger.info(f"Initialized PII detection with {analyzer_type} analyzer")
except Exception as e:
logger.error(f"Failed to initialize PII detection: {e}")
raise


async def shutdown_pii_detection() -> None:
"""Shutdown PII detection."""
global _analyzer

if _analyzer:
await _analyzer.shutdown()
_analyzer = None
logger.info("Shut down PII detection")


def get_pii_analyzer() -> Optional[PIIAnalyzer]:
"""Get the current PII analyzer instance."""
return _analyzer


def is_pii_detection_enabled() -> bool:
"""Check if PII detection is enabled."""
return _analyzer is not None


__all__ = [
"PIIAction",
"PIITarget",
"PIIType",
"PIIConfig",
"check_pii",
"initialize_pii_detection",
"shutdown_pii_detection",
"get_pii_analyzer",
"is_pii_detection_enabled",
]
13 changes: 13 additions & 0 deletions src/vllm_router/experimental/pii/analyzers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""PII analyzers package."""

from .base import PIIAnalysisResult, PIIAnalyzer, PIILocation
from .factory import create_analyzer
from .presidio import PresidioAnalyzer

__all__ = [
"PIIAnalyzer",
"PIIAnalysisResult",
"PIILocation",
"create_analyzer",
"PresidioAnalyzer",
]
65 changes: 65 additions & 0 deletions src/vllm_router/experimental/pii/analyzers/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Base interface for PII analyzers."""

import abc
from dataclasses import dataclass
from typing import Dict, List, Optional, Set

from ..types import PIIType


@dataclass
class PIILocation:
"""Location of PII in text."""

start: int # Start index in text
end: int # End index in text
pii_type: PIIType # Type of PII found
value: str # The actual PII text found
score: float # Confidence score of the detection


@dataclass
class PIIAnalysisResult:
"""Result of PII analysis."""

has_pii: bool # Whether PII was found
detected_types: Set[PIIType] # Types of PII found
pii_locations: Optional[List[PIILocation]] = None # Locations of PII in text


class PIIAnalyzer(abc.ABC):
"""Base class for PII analyzers."""

def __init__(self, config: Optional[Dict] = None):
"""Initialize the analyzer with optional config."""
self.config = config or {}

@abc.abstractmethod
async def analyze(
self,
text: str,
pii_types: Optional[Set[PIIType]] = None,
score_threshold: float = 0.5,
) -> PIIAnalysisResult:
"""
Analyze text for PII.

Args:
text: Text to analyze
pii_types: Types of PII to look for. If None, look for all types.
score_threshold: Minimum confidence score to consider a match

Returns:
PIIAnalysisResult containing analysis results
"""
raise NotImplementedError

@abc.abstractmethod
async def initialize(self) -> None:
"""Initialize the analyzer. Called once before first use."""
raise NotImplementedError

@abc.abstractmethod
async def shutdown(self) -> None:
"""Shutdown the analyzer. Called when service is shutting down."""
raise NotImplementedError
Loading