-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
relay sync1.1 #961
base: main
Are you sure you want to change the base?
relay sync1.1 #961
Conversation
sync 1.1 induction firehose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got through some review. I still want to look at:
- the actual generated postgresql schemas. getting those right seems important. the gorm models are still spread through many files, seems like they could all be in
cmd/relay/models/models.go
? - account state and lifecycle
- rate-limiting
it feels like there is still a lot of low-hanging fruit for code that could just be ripped out? eg, all the deprecated event types. there seemed to be a lot of config arguments which are not used.
I was surprised the disk persister is still used, instead of the pebble persister used in rainbow.
cmd/relay/README.md
Outdated
|
||
In atproto, a Relay subscribes to multiple PDS hosts and outputs a combined "firehose" event stream. Downstream services can subscribe to this single firehose a get all relevant events for the entire network, or a specific sub-graph of the network. The Relay maintains a mirror of repo data from all accounts on the upstream PDS instances, and verifies repo data structure integrity and identity signatures. It is agnostic to applications, and does not validate data against atproto Lexicon schemas. | ||
|
||
This Relay implementation is designed to subscribe to the entire global network. The current state of the codebase is informally expected to scale to around 20 million accounts in the network, and thousands of repo events per second (peak). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
chunks of this README are out of date. maybe simpler to just leave it a couple lines in this PR, and I can update it in a follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-read it just now and I think most of it is still valid and I deleted the obviously obsolete stuff.
events/consumer.go
Outdated
|
||
for { | ||
|
||
select { | ||
case <-t.C: | ||
if err := con.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil { | ||
log.Warn("failed to ping", "err", err) | ||
failcount++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same feedback as before: should this get cleared to 0 if the ping is successful?
(in this for
loop)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not the active code, see cmd/relay/events/consumer.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this doesn't impact the indigo:cmd/relay
code, can you pull it out of this PR?
models/models.go
Outdated
@@ -104,7 +104,7 @@ type FollowRecord struct { | |||
type PDS struct { | |||
gorm.Model | |||
|
|||
Host string | |||
Host string `gorm:"unique"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this only impacts the old bigsky
, not the new relay
, right? seems like we should minimize any changes to bigsky
in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I should probably rollback most of the changes outside of cmd/relay
@@ -0,0 +1,52 @@ | |||
name: container-relay-aws |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should do a container-relay-ghcr
CI action as well; we will remove the bigsky
GHCR soon so it won't be a net increase in container builds
&cli.StringFlag{ | ||
Name: "db-url", | ||
Usage: "database connection string for BGS database", | ||
Value: "sqlite://./data/bigsky/bgs.sqlite", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there still sqlite support? I thought that got dropped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not working right now because some transactions changed, but it's maybe not far off from being fixed?
cmd/relay/bgs/bgs.go
Outdated
} | ||
|
||
func (bgs *BGS) handleSync(ctx context.Context, host *models.PDS, evt *comatproto.SyncSubscribeRepos_Sync) error { | ||
// TODO: actually do something with #sync event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should at least be updating the metadata for the account (DID) here, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the rough expected behavior is:
- parse out the commit object
- verify the message matches the fields in the commit object
- fetch user metadata
- if account isn't active, drop event
- if rev is old/bad, drop event
- update user metadata with sync info
- emit the event
return nil | ||
} | ||
|
||
if ustatus == events.AccountStatusDeactivated { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also account status throttled
cmd/relay/bgs/bgs.go
Outdated
} | ||
} | ||
|
||
if u.GetTombstoned() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure we just don't / shouldn't have a concept of "tombstoned" accounts in the relay. If the DID is tombstoned, that is basically account status deleted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, old code, I basically haven't had time to update the account status flow yet, that part seemed to have been in flux up until last week
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like adding per-account rate-limits would currently mean a few slidingwindow.Limiter
in-process for each of tens of millions of accounts (and growing). do you think that will scale, for us and for others (eg, with less RAM)? if they could expire out of memory, then the total number of counters could be more like "DAU" than "MAU".
the limits we have discussed are total bytes of data, and number of record ops, per account (DID). and then also the number of "expensive" events (like a #sync
which is not a no-op; or an #identity
event), or "broken" events (MST does not invert, for example).
overall account lifecycle state tracking:
- should store "local" account state as a string, not boolean flags
- if local state is "active', then default to upstream state (which may or may not be "active"). otherwise, use local state
- could store an "until" timestamp in account status, for temporary throttled situations
cmd/relay/bgs/models.go
Outdated
|
||
type DomainBan struct { | ||
gorm.Model | ||
Domain string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unique
constraint on the domain?
cmd/relay/bgs/bgs.go
Outdated
return u.UpstreamStatus | ||
} | ||
|
||
type UserPreviousState struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this just be merged on to the existing Users
table? I guess this table has higher churn and it might be good to keep that separate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, that's my motivation. I think Postgres works better with the high churn on separate smaller rows from the relatively low-churn data.
|
||
type UserPreviousState struct { | ||
Uid models.Uid `gorm:"column:uid;primaryKey"` | ||
Cid models.DbCID `gorm:"column:cid"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better column name would be good; data_cid
? commit_data
or commit_data_cid
?
cmd/relay/bgs/bgs.go
Outdated
} | ||
|
||
func (bgs *BGS) handleSync(ctx context.Context, host *models.PDS, evt *comatproto.SyncSubscribeRepos_Sync) error { | ||
// TODO: actually do something with #sync event |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the rough expected behavior is:
- parse out the commit object
- verify the message matches the fields in the commit object
- fetch user metadata
- if account isn't active, drop event
- if rev is old/bad, drop event
- update user metadata with sync info
- emit the event
cmd/relay/bgs/bgs.go
Outdated
// createExternalUser is a mess and poorly defined | ||
// did is the user | ||
// host is the PDS we received this from, not necessarily the canonical PDS in the DID document | ||
// TODO: rename? This also updates users, and 'external' is an old phrasing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like a good time to rename this, can update comments around what it does.
cmd/relay/bgs/bgs.go
Outdated
return nil, fmt.Errorf("cannot create user on pds with banned domain") | ||
} | ||
|
||
if peering.ID == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is old code, maybe from the days when BGS had indexing and would "spider" to discover accounts? or if there is a bulk catch-up happening?
would be good if you could clarify what the expected behavior is here ("maybe this never happens").
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a bunch of reorg and commenting on this function as a whole and I think it's better now
cmd/relay/bgs/bgs.go
Outdated
} | ||
|
||
err = bgs.db.Transaction(func(tx *gorm.DB) error { | ||
res := tx.Model(&models.PDS{}).Where("id = ? AND repo_count < repo_limit", peering.ID).Update("repo_count", gorm.Expr("repo_count + 1")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand the repo_count < repo_limit
clause here... is this how account limits are enforced? the increment fails if the clause filters out the relevant row?
my intuition is that the account should be created (inserted), but with local account state "throttled", if the overall PDS is over quota. instead of silently being dropped; the later is hard for external folks to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a potential waste-our-storage attack for an evil PDS sending one event each from a zililon different DIDs; but aside from that I'd like your approach. We could try for both with limit and a hard limit? DIDs beyond the basic limit get #throttled
, DIDs beyond the second limit get ignored?
cbor "github.com/ipfs/go-ipld-cbor" | ||
mh "github.com/multiformats/go-multihash" | ||
) | ||
|
||
func CborStore(bs blockstore.Blockstore) *cbor.BasicIpldStore { | ||
func CborStore(bs cbor.IpldBlockstore) *cbor.BasicIpldStore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should split out these changes to function signatures (here, and indigo:mst
and indigo:repo
) to a separate PR, if they aren't touched by the indigo:cmd/relay
code.
BGS.createExternalUser() is now syncPDSAccount() metric events_warn_counter{pds,warn}
"tombstoned" gone from account model cleanup, fixes
relay radical rehacking for sync1.1 induction firehose
drops all
carstore
code for keeping archive of all reposdrops all
indexer
code for runninggetRepo
on all those reposdrops a bunch of other stuff that fell out being deprecated and unneeded.
adds sync1.1 protocol features for checking
#commit
.prevData
and#sync
internally vendors a bunch of code that we should re-share back out to common code
formerly known as #951