From 3a2e9cb201c463db86f058777b8a8ec44db68da7 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Mon, 20 Jan 2025 04:24:40 -0500 Subject: [PATCH 1/2] make PDS table unique on host name fix old data that had dups on PDS host name --- bgs/bgs.go | 70 ++++++++++++++++++++++++++++++++++++++++++++++-- models/models.go | 2 +- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index 9f14492b5..8b95bc652 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -140,7 +140,11 @@ func DefaultBGSConfig() *BGSConfig { } func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr api.HandleResolver, config *BGSConfig) (*BGS, error) { - + logger := slog.Default().With("system", "bgs") + err := fixupDupPDSRows(db, logger) + if err != nil { + return nil, err + } if config == nil { config = DefaultBGSConfig() } @@ -169,7 +173,7 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm userCache: uc, - log: slog.Default().With("system", "bgs"), + log: logger, } ix.CreateExternalUser = bgs.createExternalUser @@ -202,6 +206,68 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm return bgs, nil } +func fixupDupPDSRows(db *gorm.DB, logger *slog.Logger) error { + rows, err := db.Raw("SELECT id, host FROM pds").Rows() + if err != nil { + logger.Warn("could not list PDS rows; assume blank db", "err", err) + return nil + } + hostCounts := make(map[string][]uint) + maxPDSId := uint(0) + maxHostCount := 0 + for rows.Next() { + var pdsId uint + var host string + if err := rows.Scan(&pdsId, &host); err != nil { + return fmt.Errorf("pds sql row err, %w", err) + } + idlist := hostCounts[host] + idlist = append(idlist, pdsId) + count := len(idlist) + if count > maxHostCount { + maxHostCount = count + } + hostCounts[host] = idlist + if pdsId > maxPDSId { + maxPDSId = pdsId + } + } + if maxHostCount <= 1 { + logger.Debug("no pds dup rows found") + return nil + } + for host, idlist := range hostCounts { + if len(idlist) > 1 { + logger.Info("dup PDS", "host", host, "count", len(idlist)) + minPDSId := idlist[0] + for _, otherid := range idlist[1:] { + if otherid < minPDSId { + minPDSId = otherid + } + } + for _, xPDSId := range idlist { + if xPDSId == minPDSId { + continue + } + logger.Info("dup PDS", "host", host, "from", xPDSId, "to", minPDSId) + err = db.Exec("UPDATE users SET pds = ? WHERE pds = ?", minPDSId, xPDSId).Error + if err != nil { + return fmt.Errorf("failed to update user pds %d -> %d: %w", xPDSId, minPDSId, err) + } + err = db.Exec("UPDATE actor_infos SET pds = ? WHERE pds = ?", minPDSId, xPDSId).Error + if err != nil { + return fmt.Errorf("failed to update actor_infos pds %d -> %d: %w", xPDSId, minPDSId, err) + } + err = db.Exec("DELETE FROM pds WHERE id = ?", xPDSId).Error + if err != nil { + return fmt.Errorf("failed to delete pds %d: %w", xPDSId, err) + } + } + } + } + return nil +} + func (bgs *BGS) StartMetrics(listen string) error { http.Handle("/metrics", promhttp.Handler()) return http.ListenAndServe(listen, nil) diff --git a/models/models.go b/models/models.go index 9781e75bd..d61ec2311 100644 --- a/models/models.go +++ b/models/models.go @@ -104,7 +104,7 @@ type FollowRecord struct { type PDS struct { gorm.Model - Host string + Host string `gorm:"unique"` Did string SSL bool Cursor int64 From 61356009340e28f46fe4084fd8cf9ac0df90401d Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Tue, 4 Feb 2025 11:23:22 -0500 Subject: [PATCH 2/2] use transaction, note TODOs --- bgs/bgs.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/bgs/bgs.go b/bgs/bgs.go index 8b95bc652..0cc3c030b 100644 --- a/bgs/bgs.go +++ b/bgs/bgs.go @@ -141,7 +141,10 @@ func DefaultBGSConfig() *BGSConfig { func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtman *events.EventManager, didr did.Resolver, rf *indexer.RepoFetcher, hr api.HandleResolver, config *BGSConfig) (*BGS, error) { logger := slog.Default().With("system", "bgs") - err := fixupDupPDSRows(db, logger) + // TODO: 2025-02-04 hopefully about a month from now this migration will have run everywhere and it can be deleted. + err := db.Transaction(func(tx *gorm.DB) error { + return fixupDupPDSRows(tx, logger) + }) if err != nil { return nil, err } @@ -206,6 +209,8 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm return bgs, nil } +// TODO: 2025-02-04 hopefully about a month from now this migration will have run everywhere and it can be deleted. +// TODO: check for INFO "no pds dup rows found" on all deploys func fixupDupPDSRows(db *gorm.DB, logger *slog.Logger) error { rows, err := db.Raw("SELECT id, host FROM pds").Rows() if err != nil { @@ -233,7 +238,7 @@ func fixupDupPDSRows(db *gorm.DB, logger *slog.Logger) error { } } if maxHostCount <= 1 { - logger.Debug("no pds dup rows found") + logger.Info("no pds dup rows found") return nil } for host, idlist := range hostCounts {