Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SMQ-2740 - Create events streams per service per action type #2744

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions channels/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,19 @@
"github.com/go-chi/chi/v5/middleware"
)

const streamID = "supermq.channels"
const (
supermqPrefix = "supermq."
createStream = supermqPrefix + channelCreate
updateStream = supermqPrefix + channelUpdate
changeStatusStream = supermqPrefix + channelChangeStatus
removeStream = supermqPrefix + channelRemove
viewStream = supermqPrefix + channelView
listStream = supermqPrefix + channelList
connectStream = supermqPrefix + channelConnect
disconnectStream = supermqPrefix + channelDisconnect
setParentStream = supermqPrefix + channelSetParent
removeParentStream = supermqPrefix + channelRemoveParent
)

var _ channels.Service = (*eventStore)(nil)

Expand All @@ -29,7 +41,7 @@
// NewEventStoreMiddleware returns wrapper around clients service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc channels.Service, url string) (channels.Service, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)

Check warning on line 44 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L44

Added line #L44 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -55,7 +67,7 @@
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, createStream, event); err != nil {

Check warning on line 70 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L70

Added line #L70 was not covered by tests
return chs, rps, err
}
}
Expand Down Expand Up @@ -89,7 +101,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, updateStream, event); err != nil {

Check warning on line 104 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L104

Added line #L104 was not covered by tests
return ch, err
}

Expand All @@ -107,7 +119,7 @@
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, viewStream, event); err != nil {

Check warning on line 122 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L122

Added line #L122 was not covered by tests
return chann, err
}

Expand All @@ -124,7 +136,7 @@
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listStream, event); err != nil {

Check warning on line 139 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L139

Added line #L139 was not covered by tests
return cp, err
}

Expand All @@ -142,7 +154,7 @@
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listStream, event); err != nil {

Check warning on line 157 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L157

Added line #L157 was not covered by tests
return cp, err
}

Expand Down Expand Up @@ -176,7 +188,7 @@
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, changeStatusStream, event); err != nil {

Check warning on line 191 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L191

Added line #L191 was not covered by tests
return ch, err
}

Expand All @@ -194,7 +206,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, removeStream, event); err != nil {

Check warning on line 209 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L209

Added line #L209 was not covered by tests
return err
}

Expand All @@ -214,7 +226,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, connectStream, event); err != nil {

Check warning on line 229 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L229

Added line #L229 was not covered by tests
return err
}

Expand All @@ -234,7 +246,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, disconnectStream, event); err != nil {

Check warning on line 249 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L249

Added line #L249 was not covered by tests
return err
}

Expand All @@ -253,7 +265,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, setParentStream, event); err != nil {

Check warning on line 268 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L268

Added line #L268 was not covered by tests
return err
}

Expand All @@ -271,7 +283,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, removeParentStream, event); err != nil {

Check warning on line 286 in channels/events/streams.go

View check run for this annotation

Codecov / codecov/patch

channels/events/streams.go#L286

Added line #L286 was not covered by tests
return err
}

Expand Down
35 changes: 24 additions & 11 deletions clients/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,20 @@
"github.com/go-chi/chi/v5/middleware"
)

const streamID = "supermq.clients"
const (
supermqPrefix = "supermq."
createStream = supermqPrefix + clientCreate
updateStream = supermqPrefix + clientUpdate
changeStatusStream = supermqPrefix + clientChangeStatus
removeStream = supermqPrefix + clientRemove
viewStream = supermqPrefix + clientView
viewPermsStream = supermqPrefix + clientViewPerms
listStream = supermqPrefix + clientList
identifyStream = supermqPrefix + clientIdentify
authorizeStream = supermqPrefix + clientAuthorize
setParentStream = supermqPrefix + clientSetParent
removeParentStream = supermqPrefix + clientRemoveParent
)

var _ clients.Service = (*eventStore)(nil)

Expand All @@ -28,7 +41,7 @@
// NewEventStoreMiddleware returns wrapper around clients service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc clients.Service, url string) (clients.Service, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)

Check warning on line 44 in clients/events/streams.go

View check run for this annotation

Codecov / codecov/patch

clients/events/streams.go#L44

Added line #L44 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -54,7 +67,7 @@
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, createStream, event); err != nil {

Check warning on line 70 in clients/events/streams.go

View check run for this annotation

Codecov / codecov/patch

clients/events/streams.go#L70

Added line #L70 was not covered by tests
return clis, rps, err
}
}
Expand Down Expand Up @@ -97,7 +110,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, updateStream, event); err != nil {

Check warning on line 113 in clients/events/streams.go

View check run for this annotation

Codecov / codecov/patch

clients/events/streams.go#L113

Added line #L113 was not covered by tests
return client, err
}

Expand All @@ -115,7 +128,7 @@
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, viewStream, event); err != nil {

Check warning on line 131 in clients/events/streams.go

View check run for this annotation

Codecov / codecov/patch

clients/events/streams.go#L131

Added line #L131 was not covered by tests
return cli, err
}

Expand All @@ -132,7 +145,7 @@
session,
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listStream, event); err != nil {

Check warning on line 148 in clients/events/streams.go

View check run for this annotation

Codecov / codecov/patch

clients/events/streams.go#L148

Added line #L148 was not covered by tests
return cp, err
}

Expand All @@ -150,7 +163,7 @@
session,
middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listStream, event); err != nil {

Check warning on line 166 in clients/events/streams.go

View check run for this annotation

Codecov / codecov/patch

clients/events/streams.go#L166

Added line #L166 was not covered by tests
return cp, err
}

Expand Down Expand Up @@ -184,7 +197,7 @@
Session: session,
requestID: middleware.GetReqID(ctx),
}
if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, changeStatusStream, event); err != nil {

Check warning on line 200 in clients/events/streams.go

View check run for this annotation

Codecov / codecov/patch

clients/events/streams.go#L200

Added line #L200 was not covered by tests
return cli, err
}

Expand All @@ -202,7 +215,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, removeStream, event); err != nil {

Check warning on line 218 in clients/events/streams.go

View check run for this annotation

Codecov / codecov/patch

clients/events/streams.go#L218

Added line #L218 was not covered by tests
return err
}

Expand All @@ -221,7 +234,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, setParentStream, event); err != nil {

Check warning on line 237 in clients/events/streams.go

View check run for this annotation

Codecov / codecov/patch

clients/events/streams.go#L237

Added line #L237 was not covered by tests
return err
}

Expand All @@ -239,7 +252,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, removeParentStream, event); err != nil {

Check warning on line 255 in clients/events/streams.go

View check run for this annotation

Codecov / codecov/patch

clients/events/streams.go#L255

Added line #L255 was not covered by tests
return err
}

Expand Down
45 changes: 30 additions & 15 deletions domains/events/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,22 @@
"github.com/go-chi/chi/v5/middleware"
)

const streamID = "supermq.domains"
const (
supermqPrefix = "supermq."
createStream = supermqPrefix + domainCreate
retrieveStream = supermqPrefix + domainRetrieve
retrieveStatusStream = supermqPrefix + domainRetrieveStatus
updateStream = supermqPrefix + domainUpdate
changeStatusStream = supermqPrefix + domainPrefix + "change_status"
listStream = supermqPrefix + domainList
userDeleteStream = supermqPrefix + domainUserDelete
sendInvitationStream = supermqPrefix + invitationSend
acceptInvitationStream = supermqPrefix + invitationAccept
rejectInvitationStream = supermqPrefix + invitationReject
listInvitationsStream = supermqPrefix + invitationList
retrieveInvitationStream = supermqPrefix + invitationRetrieve
deleteInvitationStream = supermqPrefix + invitationDelete
)

var _ domains.Service = (*eventStore)(nil)

Expand All @@ -28,7 +43,7 @@
// NewEventStoreMiddleware returns wrapper around auth service that sends
// events to event store.
func NewEventStoreMiddleware(ctx context.Context, svc domains.Service, url string) (domains.Service, error) {
publisher, err := store.NewPublisher(ctx, url, streamID)
publisher, err := store.NewPublisher(ctx, url)

Check warning on line 46 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L46

Added line #L46 was not covered by tests
if err != nil {
return nil, err
}
Expand All @@ -55,7 +70,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, createStream, event); err != nil {

Check warning on line 73 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L73

Added line #L73 was not covered by tests
return domain, rps, err
}

Expand All @@ -74,7 +89,7 @@
middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, retrieveStream, event); err != nil {

Check warning on line 92 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L92

Added line #L92 was not covered by tests
return domain, err
}

Expand All @@ -93,7 +108,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, updateStream, event); err != nil {

Check warning on line 111 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L111

Added line #L111 was not covered by tests
return domain, err
}

Expand All @@ -114,7 +129,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, changeStatusStream, event); err != nil {

Check warning on line 132 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L132

Added line #L132 was not covered by tests
return domain, err
}

Expand All @@ -135,7 +150,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, changeStatusStream, event); err != nil {

Check warning on line 153 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L153

Added line #L153 was not covered by tests
return domain, err
}

Expand All @@ -156,7 +171,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, changeStatusStream, event); err != nil {

Check warning on line 174 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L174

Added line #L174 was not covered by tests
return domain, err
}

Expand All @@ -178,7 +193,7 @@
requestID: middleware.GetReqID(ctx),
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listStream, event); err != nil {

Check warning on line 196 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L196

Added line #L196 was not covered by tests
return dp, err
}

Expand All @@ -195,7 +210,7 @@
session: session,
}

return es.Publish(ctx, event)
return es.Publish(ctx, sendInvitationStream, event)

Check warning on line 213 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L213

Added line #L213 was not covered by tests
}

func (es *eventStore) ViewInvitation(ctx context.Context, session authn.Session, userID, domainID string) (domains.Invitation, error) {
Expand All @@ -212,7 +227,7 @@
session: session,
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, retrieveInvitationStream, event); err != nil {

Check warning on line 230 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L230

Added line #L230 was not covered by tests
return invitation, err
}

Expand All @@ -230,7 +245,7 @@
session: session,
}

if err := es.Publish(ctx, event); err != nil {
if err := es.Publish(ctx, listInvitationsStream, event); err != nil {

Check warning on line 248 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L248

Added line #L248 was not covered by tests
return ip, err
}

Expand All @@ -247,7 +262,7 @@
session: session,
}

return es.Publish(ctx, event)
return es.Publish(ctx, acceptInvitationStream, event)

Check warning on line 265 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L265

Added line #L265 was not covered by tests
}

func (es *eventStore) RejectInvitation(ctx context.Context, session authn.Session, domainID string) error {
Expand All @@ -260,7 +275,7 @@
session: session,
}

return es.Publish(ctx, event)
return es.Publish(ctx, rejectInvitationStream, event)

Check warning on line 278 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L278

Added line #L278 was not covered by tests
}

func (es *eventStore) DeleteInvitation(ctx context.Context, session authn.Session, inviteeUserID, domainID string) error {
Expand All @@ -274,5 +289,5 @@
session: session,
}

return es.Publish(ctx, event)
return es.Publish(ctx, deleteInvitationStream, event)

Check warning on line 292 in domains/events/streams.go

View check run for this annotation

Codecov / codecov/patch

domains/events/streams.go#L292

Added line #L292 was not covered by tests
}
2 changes: 1 addition & 1 deletion groups/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/absmach/supermq/pkg/roles"
)

var (
const (
groupPrefix = "group."
groupCreate = groupPrefix + "create"
groupUpdate = groupPrefix + "update"
Expand Down
Loading
Loading