Skip to content

Commit

Permalink
feat: add example queue using river
Browse files Browse the repository at this point in the history
  • Loading branch information
vaayne committed Jul 15, 2024
1 parent 36347cc commit 9f621d4
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 0 deletions.
29 changes: 29 additions & 0 deletions internal/core/queue/email.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package queue

import (
"context"
"vibrain/internal/pkg/logger"

"github.com/riverqueue/river"
)

type EmailWorkerArgs struct {
From string `json:"from"`
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}

func (EmailWorkerArgs) Kind() string {
return "send_email"
}

type EmailWorker struct {
river.WorkerDefaults[EmailWorkerArgs]
}

func (w *EmailWorker) Work(ctx context.Context, job *river.Job[EmailWorkerArgs]) error {
// Send email
logger.FromContext(ctx).Info("Sending email", "from", job.Args.From, "to", job.Args.To, "subject", job.Args.Subject, "body", job.Args.Body)
return nil
}
7 changes: 7 additions & 0 deletions internal/core/queue/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package queue

import "github.com/riverqueue/river"

func NewDefaultPeriodJobs() []*river.PeriodicJob {
return []*river.PeriodicJob{}
}
75 changes: 75 additions & 0 deletions internal/core/queue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package queue

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

type Queue struct {
*river.Client[pgx.Tx]
name string
workers *river.Workers
periodicJobs []*river.PeriodicJob
databaseUrl string
}

type Option func(q *Queue)

func WithQueueName(queueName string) Option {
return func(q *Queue) {
q.name = queueName
}
}

func WithWorkers(workers *river.Workers) Option {
return func(q *Queue) {
q.workers = workers
}
}

func WithPeriodicJobs(jobs []*river.PeriodicJob) Option {
return func(q *Queue) {
q.periodicJobs = jobs
}
}

func New(databaseUrl string, opts ...Option) (*Queue, error) {
ctx := context.Background()

q := &Queue{
name: river.QueueDefault,
workers: NewDefaultWorkers(),
periodicJobs: NewDefaultPeriodJobs(),
databaseUrl: databaseUrl,
}
for _, opt := range opts {
opt(q)
}

dbPool, err := pgxpool.New(ctx, q.databaseUrl)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}

cfg := &river.Config{
Queues: map[string]river.QueueConfig{
q.name: {MaxWorkers: 100},
},
PeriodicJobs: q.periodicJobs,
Workers: q.workers,
}

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), cfg)
if err != nil {
return nil, fmt.Errorf("failed to create river client: %w", err)
}

q.Client = riverClient

return q, nil
}
11 changes: 11 additions & 0 deletions internal/core/queue/workers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package queue

import (
"github.com/riverqueue/river"
)

func NewDefaultWorkers() *river.Workers {
workers := river.NewWorkers()
river.AddWorker(workers, &EmailWorker{})
return workers
}

0 comments on commit 9f621d4

Please sign in to comment.