Skip to content

Commit

Permalink
support rueidis
Browse files Browse the repository at this point in the history
  • Loading branch information
keisku committed Mar 6, 2025
1 parent 18fec70 commit fa83b4b
Show file tree
Hide file tree
Showing 15 changed files with 816 additions and 10 deletions.
35 changes: 35 additions & 0 deletions contrib/redis/rueidis/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package rueidis_test

import (
"context"
"log"

"github.com/redis/rueidis"
rueidistrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/rueidis"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

// To start tracing Redis, simply create a new client using the library and continue
// using as you normally would.
func Example() {
tracer.Start()
defer tracer.Stop()

c, err := rueidistrace.NewClient(rueidis.ClientOption{
InitAddress: []string{"localhost:6379"},
})
if err != nil {
log.Fatal(err)
return
}

if err := c.Do(context.Background(), c.B().Set().Key("key").Value("value").Build()).Error(); err != nil {
log.Fatal(err)
return
}
}
41 changes: 41 additions & 0 deletions contrib/redis/rueidis/option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

package rueidis

import (
"gopkg.in/DataDog/dd-trace-go.v1/internal"
"gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema"
)

type config struct {
rawCommand bool
serviceName string
}

// Option represents an option that can be used to create or wrap a client.
type Option func(*config)

func defaultConfig() *config {
return &config{
// Do not include the raw command by default since it could contain sensitive data.
rawCommand: internal.BoolEnv("DD_TRACE_REDIS_RAW_COMMAND", false),
serviceName: namingschema.ServiceName(defaultServiceName),
}
}

// WithRawCommand can be used to set a tag `redis.raw_command` in the created spans (disabled by default).
func WithRawCommand(rawCommand bool) Option {
return func(cfg *config) {
cfg.rawCommand = rawCommand
}
}

// WithServiceName sets the given service name for the client.
func WithServiceName(name string) Option {
return func(cfg *config) {
cfg.serviceName = name
}
}
21 changes: 21 additions & 0 deletions contrib/redis/rueidis/orchestrion.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2023-present Datadog, Inc.
---
# yaml-language-server: $schema=https://datadoghq.dev/orchestrion/schema.json
meta:
name: gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/rueidis
description: Redis client for Go.

aspects:
- id: NewClient
join-point:
one-of:
- function-call: github.com/redis/rueidis.NewClient
advice:
- wrap-expression:
imports:
rueidistrace: gopkg.in/DataDog/dd-trace-go.v1/contrib/redis/rueidis
template: |-
rueidistrace.NewClient({{ index .AST.Args 0 }})
281 changes: 281 additions & 0 deletions contrib/redis/rueidis/rueidis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016 Datadog, Inc.

// Package rueidis provides tracing functions for tracing the redis/rueidis package (https://github.com/redis/rueidis).
package rueidis

import (
"context"
"net"
"strconv"
"strings"
"time"

"github.com/redis/rueidis"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry"
)

const (
componentName = "redis/rueidis"
defaultServiceName = "redis.client"
)

func init() {
telemetry.LoadIntegration(componentName)
tracer.MarkIntegrationImported("github.com/redis/rueidis")
}

var (
_ rueidis.Client = (*client)(nil)
_ rueidis.DedicatedClient = (*dedicatedClient)(nil)
)

// NewClient returns a new rueidis.Client enhanced with tracing.
func NewClient(clientOption rueidis.ClientOption, opts ...Option) (rueidis.Client, error) {
rueidisClient, err := rueidis.NewClient(clientOption)
if err != nil {
return nil, err
}
cfg := defaultConfig()
for _, fn := range opts {
fn(cfg)
}
tClient := &client{
client: rueidisClient,
cfg: cfg,
dbIndex: strconv.FormatInt(int64(clientOption.SelectDB), 10),
user: clientOption.Username,
}
if len(clientOption.InitAddress) > 0 {
host, port, err := net.SplitHostPort(clientOption.InitAddress[0])
if err == nil {
tClient.host = host
tClient.port = port
}
}
return tClient, nil
}

type client struct {
client rueidis.Client
cfg *config
host string
port string
dbIndex string
user string
}

type command struct {
statement string
raw string
}

func (c *client) startSpan(ctx context.Context, cmd command) (tracer.Span, context.Context) {
opts := []tracer.StartSpanOption{
tracer.ServiceName(c.cfg.serviceName),
tracer.ResourceName(cmd.statement),
tracer.SpanType(ext.SpanTypeRedis),
tracer.Tag(ext.TargetHost, c.host),
tracer.Tag(ext.TargetPort, c.port),
tracer.Tag(ext.Component, componentName),
tracer.Tag(ext.SpanKind, ext.SpanKindClient),
tracer.Tag(ext.DBSystem, ext.DBSystemRedis),
tracer.Tag(ext.TargetDB, c.dbIndex),
}
if c.cfg.rawCommand {
opts = append(opts, tracer.Tag(ext.RedisRawCommand, cmd.raw))
}
if c.host != "" {
opts = append(opts, tracer.Tag(ext.TargetHost, c.host))
}
if c.port != "" {
opts = append(opts, tracer.Tag(ext.TargetPort, c.port))
}
if c.user != "" {
opts = append(opts, tracer.Tag(ext.DBUser, c.user))
}
return tracer.StartSpanFromContext(ctx, "redis.command", opts...)
}

func (c *client) finishSpan(span tracer.Span, err error) {
var opts []tracer.FinishOption
if err != nil && !rueidis.IsRedisNil(err) {
opts = append(opts, tracer.WithError(err))
}
span.Finish(opts...)
}

func (c *client) B() rueidis.Builder {
return c.client.B()
}

func (c *client) Do(ctx context.Context, cmd rueidis.Completed) rueidis.RedisResult {
span, ctx := c.startSpan(ctx, processCommand(&cmd))
resp := c.client.Do(ctx, cmd)
setClientCacheTags(span, resp)
span.Finish(tracer.WithError(resp.Error()))
return resp
}

func (c *client) DoMulti(ctx context.Context, multi ...rueidis.Completed) []rueidis.RedisResult {
span, ctx := c.startSpan(ctx, processCommandMulti(multi))
resp := c.client.DoMulti(ctx, multi...)
c.finishSpan(span, firstError(resp))
return resp
}

func (c *client) Receive(ctx context.Context, subscribe rueidis.Completed, fn func(msg rueidis.PubSubMessage)) error {
span, ctx := c.startSpan(ctx, processCommand(&subscribe))
err := c.client.Receive(ctx, subscribe, fn)
c.finishSpan(span, err)
return err
}

func (c *client) Close() {
c.client.Close()
}

func (c *client) DoCache(ctx context.Context, cmd rueidis.Cacheable, ttl time.Duration) rueidis.RedisResult {
span, ctx := c.startSpan(ctx, processCommand(&cmd))
resp := c.client.DoCache(ctx, cmd, ttl)
setClientCacheTags(span, resp)
c.finishSpan(span, resp.Error())
return resp
}

func (c *client) DoMultiCache(ctx context.Context, multi ...rueidis.CacheableTTL) []rueidis.RedisResult {
span, ctx := c.startSpan(ctx, processCommandMultiCache(multi))
resp := c.client.DoMultiCache(ctx, multi...)
c.finishSpan(span, firstError(resp))
return resp
}

func (c *client) DoStream(ctx context.Context, cmd rueidis.Completed) rueidis.RedisResultStream {
span, ctx := c.startSpan(ctx, processCommand(&cmd))
resp := c.client.DoStream(ctx, cmd)
c.finishSpan(span, resp.Error())
return resp
}

func (c *client) DoMultiStream(ctx context.Context, multi ...rueidis.Completed) rueidis.MultiRedisResultStream {
span, ctx := c.startSpan(ctx, processCommandMulti(multi))
resp := c.client.DoMultiStream(ctx, multi...)
c.finishSpan(span, resp.Error())
return resp
}

func (c *client) Dedicated(fn func(rueidis.DedicatedClient) error) (err error) {
return c.client.Dedicated(func(dc rueidis.DedicatedClient) error {
return fn(&dedicatedClient{
client: c,
dedicatedClient: dc,
})
})
}

func (c *client) Dedicate() (client rueidis.DedicatedClient, cancel func()) {
dedicated, cancel := c.client.Dedicate()
return &dedicatedClient{
client: c,
dedicatedClient: dedicated,
}, cancel
}

func (c *client) Nodes() map[string]rueidis.Client {
nodes := c.client.Nodes()
for addr, redisClient := range nodes {
host, port, _ := net.SplitHostPort(addr)
nodes[addr] = &client{
client: redisClient,
cfg: c.cfg,
host: host,
port: port,
dbIndex: c.dbIndex,
user: c.user,
}
}
return nodes
}

type dedicatedClient struct {
*client
dedicatedClient rueidis.DedicatedClient
}

func (c *dedicatedClient) SetPubSubHooks(hooks rueidis.PubSubHooks) <-chan error {
return c.dedicatedClient.SetPubSubHooks(hooks)
}

type commander interface {
Commands() []string
}

func processCommand(cmd commander) command {
cmds := cmd.Commands()
if len(cmds) == 0 {
return command{}
}
statement := cmds[0]
raw := strings.Join(cmds, " ")
return command{
statement: statement,
raw: raw,
}
}

func processCommandMulti(multi []rueidis.Completed) command {
var cmds []command
for _, cmd := range multi {
cmds = append(cmds, processCommand(&cmd))
}
return multiCommand(cmds)
}

func processCommandMultiCache(multi []rueidis.CacheableTTL) command {
var cmds []command
for _, cmd := range multi {
cmds = append(cmds, processCommand(&cmd.Cmd))
}
return multiCommand(cmds)
}

func multiCommand(cmds []command) command {
// limit to the 5 first
if len(cmds) > 5 {
cmds = cmds[:5]
}
statement := strings.Builder{}
raw := strings.Builder{}
for i, cmd := range cmds {
statement.WriteString(cmd.statement)
raw.WriteString(cmd.raw)
if i != len(cmds)-1 {
statement.WriteString(" ")
raw.WriteString(" ")
}
}
return command{
statement: statement.String(),
raw: raw.String(),
}
}

func firstError(s []rueidis.RedisResult) error {
for _, result := range s {
if err := result.Error(); err != nil && !rueidis.IsRedisNil(err) {
return err
}
}
return nil
}

func setClientCacheTags(s tracer.Span, result rueidis.RedisResult) {
s.SetTag(ext.RedisClientCacheHit, result.IsCacheHit())
s.SetTag(ext.RedisClientCacheTTL, result.CacheTTL())
s.SetTag(ext.RedisClientCachePTTL, result.CachePTTL())
s.SetTag(ext.RedisClientCachePXAT, result.CachePXAT())
}
Loading

0 comments on commit fa83b4b

Please sign in to comment.