Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay sync1.1 #961

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open

relay sync1.1 #961

wants to merge 22 commits into from

Conversation

brianolson
Copy link
Contributor

relay radical rehacking for sync1.1 induction firehose
drops all carstore code for keeping archive of all repos
drops all indexer code for running getRepo on all those repos
drops a bunch of other stuff that fell out being deprecated and unneeded.
adds sync1.1 protocol features for checking #commit.prevData and #sync
internally vendors a bunch of code that we should re-share back out to common code

formerly known as #951

Copy link
Collaborator

@bnewbold bnewbold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got through some review. I still want to look at:

  • the actual generated postgresql schemas. getting those right seems important. the gorm models are still spread through many files, seems like they could all be in cmd/relay/models/models.go?
  • account state and lifecycle
  • rate-limiting

it feels like there is still a lot of low-hanging fruit for code that could just be ripped out? eg, all the deprecated event types. there seemed to be a lot of config arguments which are not used.

I was surprised the disk persister is still used, instead of the pebble persister used in rainbow.


In atproto, a Relay subscribes to multiple PDS hosts and outputs a combined "firehose" event stream. Downstream services can subscribe to this single firehose a get all relevant events for the entire network, or a specific sub-graph of the network. The Relay maintains a mirror of repo data from all accounts on the upstream PDS instances, and verifies repo data structure integrity and identity signatures. It is agnostic to applications, and does not validate data against atproto Lexicon schemas.

This Relay implementation is designed to subscribe to the entire global network. The current state of the codebase is informally expected to scale to around 20 million accounts in the network, and thousands of repo events per second (peak).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunks of this README are out of date. maybe simpler to just leave it a couple lines in this PR, and I can update it in a follow-up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-read it just now and I think most of it is still valid and I deleted the obviously obsolete stuff.


for {

select {
case <-t.C:
if err := con.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil {
log.Warn("failed to ping", "err", err)
failcount++
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same feedback as before: should this get cleared to 0 if the ping is successful?

(in this for loop)

Copy link
Contributor Author

@brianolson brianolson Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not the active code, see cmd/relay/events/consumer.go

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this doesn't impact the indigo:cmd/relay code, can you pull it out of this PR?

models/models.go Outdated
@@ -104,7 +104,7 @@ type FollowRecord struct {
type PDS struct {
gorm.Model

Host string
Host string `gorm:"unique"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only impacts the old bigsky, not the new relay, right? seems like we should minimize any changes to bigsky in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I should probably rollback most of the changes outside of cmd/relay

@@ -0,0 +1,52 @@
name: container-relay-aws
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should do a container-relay-ghcr CI action as well; we will remove the bigsky GHCR soon so it won't be a net increase in container builds

&cli.StringFlag{
Name: "db-url",
Usage: "database connection string for BGS database",
Value: "sqlite://./data/bigsky/bgs.sqlite",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there still sqlite support? I thought that got dropped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not working right now because some transactions changed, but it's maybe not far off from being fixed?

}

func (bgs *BGS) handleSync(ctx context.Context, host *models.PDS, evt *comatproto.SyncSubscribeRepos_Sync) error {
// TODO: actually do something with #sync event
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should at least be updating the metadata for the account (DID) here, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the rough expected behavior is:

  • parse out the commit object
  • verify the message matches the fields in the commit object
  • fetch user metadata
    • if account isn't active, drop event
    • if rev is old/bad, drop event
  • update user metadata with sync info
  • emit the event

return nil
}

if ustatus == events.AccountStatusDeactivated {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also account status throttled

}
}

if u.GetTombstoned() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure we just don't / shouldn't have a concept of "tombstoned" accounts in the relay. If the DID is tombstoned, that is basically account status deleted

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, old code, I basically haven't had time to update the account status flow yet, that part seemed to have been in flux up until last week

Copy link
Collaborator

@bnewbold bnewbold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like adding per-account rate-limits would currently mean a few slidingwindow.Limiter in-process for each of tens of millions of accounts (and growing). do you think that will scale, for us and for others (eg, with less RAM)? if they could expire out of memory, then the total number of counters could be more like "DAU" than "MAU".

the limits we have discussed are total bytes of data, and number of record ops, per account (DID). and then also the number of "expensive" events (like a #sync which is not a no-op; or an #identity event), or "broken" events (MST does not invert, for example).


overall account lifecycle state tracking:

  • should store "local" account state as a string, not boolean flags
  • if local state is "active', then default to upstream state (which may or may not be "active"). otherwise, use local state
  • could store an "until" timestamp in account status, for temporary throttled situations


type DomainBan struct {
gorm.Model
Domain string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unique constraint on the domain?

return u.UpstreamStatus
}

type UserPreviousState struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this just be merged on to the existing Users table? I guess this table has higher churn and it might be good to keep that separate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's my motivation. I think Postgres works better with the high churn on separate smaller rows from the relatively low-churn data.


type UserPreviousState struct {
Uid models.Uid `gorm:"column:uid;primaryKey"`
Cid models.DbCID `gorm:"column:cid"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better column name would be good; data_cid? commit_data or commit_data_cid?

}

func (bgs *BGS) handleSync(ctx context.Context, host *models.PDS, evt *comatproto.SyncSubscribeRepos_Sync) error {
// TODO: actually do something with #sync event
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the rough expected behavior is:

  • parse out the commit object
  • verify the message matches the fields in the commit object
  • fetch user metadata
    • if account isn't active, drop event
    • if rev is old/bad, drop event
  • update user metadata with sync info
  • emit the event

// createExternalUser is a mess and poorly defined
// did is the user
// host is the PDS we received this from, not necessarily the canonical PDS in the DID document
// TODO: rename? This also updates users, and 'external' is an old phrasing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a good time to rename this, can update comments around what it does.

return nil, fmt.Errorf("cannot create user on pds with banned domain")
}

if peering.ID == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is old code, maybe from the days when BGS had indexing and would "spider" to discover accounts? or if there is a bulk catch-up happening?

would be good if you could clarify what the expected behavior is here ("maybe this never happens").

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a bunch of reorg and commenting on this function as a whole and I think it's better now

}

err = bgs.db.Transaction(func(tx *gorm.DB) error {
res := tx.Model(&models.PDS{}).Where("id = ? AND repo_count < repo_limit", peering.ID).Update("repo_count", gorm.Expr("repo_count + 1"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand the repo_count < repo_limit clause here... is this how account limits are enforced? the increment fails if the clause filters out the relevant row?

my intuition is that the account should be created (inserted), but with local account state "throttled", if the overall PDS is over quota. instead of silently being dropped; the later is hard for external folks to debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a potential waste-our-storage attack for an evil PDS sending one event each from a zililon different DIDs; but aside from that I'd like your approach. We could try for both with limit and a hard limit? DIDs beyond the basic limit get #throttled, DIDs beyond the second limit get ignored?

cbor "github.com/ipfs/go-ipld-cbor"
mh "github.com/multiformats/go-multihash"
)

func CborStore(bs blockstore.Blockstore) *cbor.BasicIpldStore {
func CborStore(bs cbor.IpldBlockstore) *cbor.BasicIpldStore {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should split out these changes to function signatures (here, and indigo:mst and indigo:repo) to a separate PR, if they aren't touched by the indigo:cmd/relay code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants