Skip to content

Commit

Permalink
fix(audit): allow to send events with complex payload to kafka with a…
Browse files Browse the repository at this point in the history
…vro (#3827)

There is no easy way to provide all possible schemas for event payload.
As workaround if the payload map have complex type it will be converted into json string if avro encoding is enabled.
  • Loading branch information
erka authored Jan 20, 2025
1 parent ac008ce commit b994a43
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 4 deletions.
31 changes: 31 additions & 0 deletions examples/audit/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Audit Event Logging / Kafka Example

This example shows how you can run Flipt with audit event logging enabled using the `Kafka` audit sink.

This works by setting the environment variables:

```bash
FLIPT_AUDIT_SINKS_KAFKA_ENABLED=true
FLIPT_AUDIT_SINKS_KAFKA_TOPIC=flipt-audit-events
FLIPT_AUDIT_SINKS_KAFKA_ENCODING=avro
FLIPT_AUDIT_SINKS_KAFKA_INSECURE_SKIP_TLS=true
FLIPT_AUDIT_SINKS_KAFKA_BOOTSTRAP_SERVERS=redpanda
FLIPT_AUDIT_SINKS_KAFKA_SCHEMA_REGISTRY_URL=http://redpanda:8081
```

The auditable events currently are `created`, `updated`, and `deleted` operations on `flags`, `variants`, `segments`, `constraints`, `rules`, `distributions`, `namespaces`, and `tokens`. If you do any of these operations through the API, Flipt will emit an audit event log to the specified location.

## Requirements

To run this example application you'll need:

* [Docker](https://docs.docker.com/install/)
* [docker compose](https://docs.docker.com/compose/install/)

## Running the Example

1. Run `docker compose up` from this directory
1. Open the Flipt UI (default: [http://localhost:8080](http://localhost:8080))
1. Create some sample data: Flags/Segments/etc.
1. Open the Redpanda UI (default: [http://localhost:8888/topics/flipt-audit-events](http://localhost:8888/topics/flipt-audit-events))
1. You should see a topic of audit events.
109 changes: 109 additions & 0 deletions examples/audit/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
version: "3"

name: flipt-audit-kafka
services:
flipt:
image: flipt/flipt:latest
command: ["./flipt", "--force-migrate"]
ports:
- "8080:8080"
environment:
- FLIPT_LOG_LEVEL=debug
- FLIPT_AUDIT_SINKS_LOG_ENABLED=true
- FLIPT_AUDIT_SINKS_KAFKA_ENABLED=true
- FLIPT_AUDIT_SINKS_KAFKA_TOPIC=flipt-audit-events
- FLIPT_AUDIT_SINKS_KAFKA_ENCODING=avro
- FLIPT_AUDIT_SINKS_KAFKA_INSECURE_SKIP_TLS=true
- FLIPT_AUDIT_SINKS_KAFKA_BOOTSTRAP_SERVERS=redpanda
- FLIPT_AUDIT_SINKS_KAFKA_SCHEMA_REGISTRY_URL=http://redpanda:8081
- FLIPT_META_TELEMETRY_ENABLED=false
networks:
- flipt_network
depends_on:
redpanda:
condition: service_healthy
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:v24.3.3
command:
- redpanda start
- --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
- --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
- --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
- --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082
- --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
- --rpc-addr redpanda:33145
- --advertise-rpc-addr redpanda:33145
- --mode dev-container
- --smp 1
ports:
- 18081:18081
- 18082:18082
- 19092:19092
- 19644:9644
networks:
- flipt_network
healthcheck:
test:
["CMD-SHELL", "rpk cluster health | grep -E 'Healthy:.+true' || exit 1"]
interval: 15s
timeout: 3s
retries: 5
start_period: 5s
console:
image: docker.redpanda.com/redpandadata/console:v2.8.2
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda:9092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda:9644"]
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083
ports:
- 8888:8080
networks:
- flipt_network
depends_on:
- redpanda
connect:
image: docker.redpanda.com/redpandadata/connectors:v1.0.23
hostname: connect
container_name: connect
networks:
- flipt_network
depends_on:
- redpanda
ports:
- "8083:8083"
environment:
CONNECT_CONFIGURATION: |
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
group.id=connectors-cluster
offset.storage.topic=_internal_connectors_offsets
config.storage.topic=_internal_connectors_configs
status.storage.topic=_internal_connectors_status
config.storage.replication.factor=-1
offset.storage.replication.factor=-1
status.storage.replication.factor=-1
offset.flush.interval.ms=1000
producer.linger.ms=50
producer.batch.size=131072
CONNECT_BOOTSTRAP_SERVERS: redpanda:9092
CONNECT_GC_LOG_ENABLED: "false"
CONNECT_HEAP_OPTS: -Xms512M -Xmx512M
CONNECT_LOG_LEVEL: info

networks:
flipt_network:
19 changes: 17 additions & 2 deletions internal/server/audit/kafka/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package kafka

import (
"bytes"
"encoding/json"
"errors"
"reflect"

"github.com/hamba/avro/v2"
"github.com/twmb/franz-go/pkg/sr"
Expand All @@ -29,12 +31,25 @@ func (a *avroEncoder) Schema() sr.Schema {
func (a *avroEncoder) Encode(v any) ([]byte, error) {
if event, ok := v.(audit.Event); ok {
var e audit.Event
var err error
event.CopyInto(&e)
e.Payload, err = event.PayloadToMap()
payload, err := event.PayloadToMap()
if err != nil {
return nil, err
}

for k, v := range payload {
t := reflect.TypeOf(v)
switch t.Kind() {
case reflect.Map, reflect.Slice, reflect.Array, reflect.Struct:
data, err := json.Marshal(v)
if err != nil {
return nil, err
}
payload[k] = string(data)
}
}

e.Payload = payload
buf := &bytes.Buffer{}
encoder := avro.NewEncoderForSchema(a.schema, buf)
err = encoder.Encode(e)
Expand Down
27 changes: 25 additions & 2 deletions internal/server/audit/kafka/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,26 @@ func TestEncoding(t *testing.T) {
}),
},
{
"rollout",
"rollout-threshold",
audit.NewRollout(&flipt.Rollout{
Description: "this description",
NamespaceKey: "default",
Rule: &flipt.Rollout_Threshold{
Threshold: &flipt.RolloutThreshold{},
},
}),
},
{
"rollout-segment",
audit.NewRollout(&flipt.Rollout{
Description: "this description",
NamespaceKey: "default",
Rule: &flipt.Rollout_Segment{
Segment: &flipt.RolloutSegment{
SegmentOperator: flipt.SegmentOperator_AND_SEGMENT_OPERATOR,
SegmentKeys: []string{"segment-key", "some"},
},
},
}),
},
{
Expand All @@ -49,6 +65,14 @@ func TestEncoding(t *testing.T) {
"nil",
nil,
},
{
"segment",
audit.NewSegment(&flipt.Segment{
NamespaceKey: "default",
Key: "segment-key",
Constraints: []*flipt.Constraint{{Id: "constraint-id"}},
}),
},
}

for _, tt := range tests {
Expand All @@ -70,5 +94,4 @@ func TestEncoding(t *testing.T) {
})
}
}

}
1 change: 1 addition & 0 deletions internal/server/audit/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewSink(ctx context.Context, logger *zap.Logger, cfg config.KafkaSinkConfig
logLevel = zap.InfoLevel
}
opts := []kgo.Opt{
kgo.AllowAutoTopicCreation(),
kgo.SeedBrokers(cfg.BootstrapServers...),
kgo.DefaultProduceTopic(cfg.Topic),
kgo.WithLogger(kzap.New(logger, kzap.AtomicLevel(zap.NewAtomicLevelAt(logLevel)))),
Expand Down

0 comments on commit b994a43

Please sign in to comment.