Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add /models endpoint to gateway #802

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions cmd/metadata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,41 @@ limitations under the License.
package main

import (
"flag"
"fmt"

"github.com/vllm-project/aibrix/pkg/cache"
"github.com/vllm-project/aibrix/pkg/metadata"
"github.com/vllm-project/aibrix/pkg/utils"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)

func main() {
redisClient := utils.GetRedisClient()

fmt.Println("starting cache")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be better to use klog

stopCh := make(chan struct{})
defer close(stopCh)
var config *rest.Config
var err error

// ref: https://github.com/kubernetes-sigs/controller-runtime/issues/878#issuecomment-1002204308
kubeConfig := flag.Lookup("kubeconfig").Value.String()
if kubeConfig == "" {
klog.Info("using in-cluster configuration")
config, err = rest.InClusterConfig()
} else {
klog.Infof("using configuration from '%s'", kubeConfig)
config, err = clientcmd.BuildConfigFromFlags("", kubeConfig)
}

if err != nil {
panic(err)
}
cache.NewCache(config, stopCh, redisClient)

klog.Info("Starting listening on port 8090")
srv := metadata.NewHTTPServer(":8090", redisClient)
klog.Fatal(srv.ListenAndServe())
Expand Down
55 changes: 53 additions & 2 deletions config/gateway/gateway-plugin/gateway-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,36 @@ spec:
fieldPath: metadata.namespace
serviceAccountName: aibrix-gateway-plugins
---
# this is a dummy route for incoming request to list models registered to aibrix-control-plane
# TODO (varun): check if this dummy route can be removed in future
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
name: reserved-router-models-endpoint
namespace: aibrix-system
spec:
parentRefs:
- name: aibrix-eg
rules:
- matches:
- path:
type: PathPrefix
value: /models
backendRefs:
- name: aibrix-metadata-service
port: 8090
---
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyExtensionPolicy
metadata:
name: skip-ext-proc
namespace: aibrix-system
spec:
targetRef:
group: gateway.networking.k8s.io
kind: HTTPRoute
name: aibrix-reserved-router-models-endpoint
---
# this is a dummy route for incoming request and,
# then request is routed to httproute using model name OR
# request is routed based on the target for that model service
Expand All @@ -78,7 +108,28 @@ spec:
- matches:
- path:
type: PathPrefix
value: /
value: /v1
backendRefs:
- name: aibrix-gateway-plugins
port: 50052
port: 50052
---
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyExtensionPolicy
metadata:
name: gateway-plugins-extension-policy
namespace: aibrix-system
spec:
targetRef:
group: gateway.networking.k8s.io
kind: HTTPRoute
name: aibrix-reserved-router
extProc:
- backendRefs:
- name: aibrix-gateway-plugins
port: 50052
processingMode:
request:
body: Buffered
response:
body: Streamed
messageTimeout: 5s
23 changes: 1 addition & 22 deletions config/gateway/gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,6 @@ spec:
bufferLimit: 1048576
---
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyExtensionPolicy
metadata:
name: gateway-plugins-extension-policy
namespace: aibrix-system
spec:
targetRef:
group: gateway.networking.k8s.io
kind: Gateway
name: aibrix-eg
extProc:
- backendRefs:
- name: aibrix-gateway-plugins
port: 50052
processingMode:
request:
body: Buffered
response:
body: Streamed
messageTimeout: 5s
---
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyPatchPolicy
metadata:
name: epp
Expand All @@ -72,7 +51,7 @@ spec:
value:
name: original_route
match:
prefix: "/"
prefix: "/v1"
headers:
- name: "routing-strategy"
string_match:
Expand Down
4 changes: 4 additions & 0 deletions config/metadata/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ spec:
value: aibrix-redis-master
- name: REDIS_PORT
value: "6379"
# TODO: cache is shared across all control plane so add feature flags to enable metric pull
# for now setting to a 1 hr for metadata service
- name: AIBRIX_POD_METRIC_REFRESH_INTERVAL_MS
value: "3600000"
- name: POD_NAME
valueFrom:
fieldRef:
Expand Down
2 changes: 2 additions & 0 deletions docs/source/getting_started/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ Depending on where you deployed the AIBrix, you can use either of the following


.. code-block:: bash
# list models
curl -v http://${ENDPOINT}/models

# completion api
curl -v http://${ENDPOINT}/v1/completions \
Expand Down
12 changes: 12 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,18 @@ func (c *Cache) GetPodsForModel(modelName string) (map[string]*v1.Pod, error) {
return podsMap, nil
}

func (c *Cache) GetModels() []string {
c.mu.RLock()
defer c.mu.RUnlock()

models := []string{}
for model := range c.ModelToPodMapping {
models = append(models, model)
}

return models
}

func (c *Cache) GetModelsForPod(podName string) (map[string]struct{}, error) {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/modelrouter/modelrouter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (m *ModelRouter) createHTTPRoute(namespace string, labels map[string]string
Value: modelName,
},
},
Path: &gatewayv1.HTTPPathMatch{
Type: ptr.To(gatewayv1.PathMatchPathPrefix),
Value: ptr.To("/v1"),
},
},
},
BackendRefs: []gatewayv1.HTTPBackendRef{
Expand Down
21 changes: 21 additions & 0 deletions pkg/metadata/users.go → pkg/metadata/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,57 @@ limitations under the License.
package metadata

import (
"encoding/json"
"errors"
"fmt"
"net/http"

"github.com/gorilla/mux"
"github.com/redis/go-redis/v9"
"github.com/vllm-project/aibrix/pkg/cache"
"github.com/vllm-project/aibrix/pkg/utils"
"k8s.io/klog/v2"
)

type httpServer struct {
redisClient *redis.Client
cache *cache.Cache
}

func NewHTTPServer(addr string, redis *redis.Client) *http.Server {
c, err := cache.GetCache()
if err != nil {
panic(err)
}

server := &httpServer{
redisClient: redis,
cache: c,
}
r := mux.NewRouter()
r.HandleFunc("/CreateUser", server.createUser).Methods("POST")
r.HandleFunc("/ReadUser", server.readUser).Methods("POST")
r.HandleFunc("/UpdateUser", server.updateUser).Methods("POST")
r.HandleFunc("/DeleteUser", server.deleteUser).Methods("POST")
r.HandleFunc("/models", server.models).Methods("GET")

return &http.Server{
Addr: addr,
Handler: r,
}
}

// models returns base and lora adapters registered to aibrix control plane
func (s *httpServer) models(w http.ResponseWriter, r *http.Request) {
models := s.cache.GetModels()
jsonBytes, err := json.Marshal(models)
if err != nil {
http.Error(w, "error in processing model list", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "%s", string(jsonBytes))
}

func (s *httpServer) createUser(w http.ResponseWriter, r *http.Request) {
var u utils.User

Expand Down
Loading