Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make PDS table unique on host name #913

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 73 additions & 2 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,14 @@ func DefaultBGSConfig() *BGSConfig {
}

func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr api.HandleResolver, config *BGSConfig) (*BGS, error) {

logger := slog.Default().With("system", "bgs")
// TODO: 2025-02-04 hopefully about a month from now this migration will have run everywhere and it can be deleted.
err := db.Transaction(func(tx *gorm.DB) error {
return fixupDupPDSRows(tx, logger)
})
if err != nil {
return nil, err
}
if config == nil {
config = DefaultBGSConfig()
}
Expand Down Expand Up @@ -169,7 +176,7 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm

userCache: uc,

log: slog.Default().With("system", "bgs"),
log: logger,
}

ix.CreateExternalUser = bgs.createExternalUser
Expand Down Expand Up @@ -202,6 +209,70 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
return bgs, nil
}

// TODO: 2025-02-04 hopefully about a month from now this migration will have run everywhere and it can be deleted.
// TODO: check for INFO "no pds dup rows found" on all deploys
func fixupDupPDSRows(db *gorm.DB, logger *slog.Logger) error {
rows, err := db.Raw("SELECT id, host FROM pds").Rows()
if err != nil {
logger.Warn("could not list PDS rows; assume blank db", "err", err)
return nil
}
hostCounts := make(map[string][]uint)
maxPDSId := uint(0)
maxHostCount := 0
for rows.Next() {
var pdsId uint
var host string
if err := rows.Scan(&pdsId, &host); err != nil {
return fmt.Errorf("pds sql row err, %w", err)
}
idlist := hostCounts[host]
idlist = append(idlist, pdsId)
count := len(idlist)
if count > maxHostCount {
maxHostCount = count
}
hostCounts[host] = idlist
if pdsId > maxPDSId {
maxPDSId = pdsId
}
}
if maxHostCount <= 1 {
logger.Info("no pds dup rows found")
return nil
}
for host, idlist := range hostCounts {
if len(idlist) > 1 {
logger.Info("dup PDS", "host", host, "count", len(idlist))
minPDSId := idlist[0]
for _, otherid := range idlist[1:] {
if otherid < minPDSId {
minPDSId = otherid
}
}
for _, xPDSId := range idlist {
if xPDSId == minPDSId {
continue
}
logger.Info("dup PDS", "host", host, "from", xPDSId, "to", minPDSId)
err = db.Exec("UPDATE users SET pds = ? WHERE pds = ?", minPDSId, xPDSId).Error
if err != nil {
return fmt.Errorf("failed to update user pds %d -> %d: %w", xPDSId, minPDSId, err)
}
err = db.Exec("UPDATE actor_infos SET pds = ? WHERE pds = ?", minPDSId, xPDSId).Error
if err != nil {
return fmt.Errorf("failed to update actor_infos pds %d -> %d: %w", xPDSId, minPDSId, err)
}
err = db.Exec("DELETE FROM pds WHERE id = ?", xPDSId).Error
Comment on lines +258 to +266
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to do these in a transaction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda? I put it effectively into relay 'boot code' before threads of work are started. I guess the bad case is multiple bigsky processes pointed at the same PostgreSQL, but I bet it handles that badly in many ways.

if err != nil {
return fmt.Errorf("failed to delete pds %d: %w", xPDSId, err)
}
}
}
}
return nil
}

func (bgs *BGS) StartMetrics(listen string) error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(listen, nil)
Expand Down
2 changes: 1 addition & 1 deletion models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type FollowRecord struct {
type PDS struct {
gorm.Model

Host string
Host string `gorm:"unique"`
Did string
SSL bool
Cursor int64
Expand Down
Loading