Skip to content

Commit

Permalink
rollback irrelevant changes
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Mar 6, 2025
1 parent 109e101 commit 4048d23
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 15 deletions.
1 change: 0 additions & 1 deletion bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ func NewBGS(db *gorm.DB, ix *indexer.Indexer, repoman *repomgr.RepoManager, evtm
slOpts.DefaultRepoLimit = config.DefaultRepoLimit
slOpts.ConcurrencyPerPDS = config.ConcurrencyPerPDS
slOpts.MaxQueuePerPDS = config.MaxQueuePerPDS
slOpts.Logger = bgs.log
s, err := NewSlurper(db, bgs.handleFedEvent, slOpts)
if err != nil {
return nil, err
Expand Down
5 changes: 0 additions & 5 deletions bgs/fedmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ type Slurper struct {
shutdownResult chan []error

ssl bool

log *slog.Logger
}

type Limiters struct {
Expand All @@ -75,7 +73,6 @@ type SlurperOptions struct {
DefaultRepoLimit int64
ConcurrencyPerPDS int64
MaxQueuePerPDS int64
Logger *slog.Logger
}

func DefaultSlurperOptions() *SlurperOptions {
Expand All @@ -88,7 +85,6 @@ func DefaultSlurperOptions() *SlurperOptions {
DefaultRepoLimit: 100,
ConcurrencyPerPDS: 100,
MaxQueuePerPDS: 1_000,
Logger: slog.Default(),
}
}

Expand Down Expand Up @@ -119,7 +115,6 @@ func NewSlurper(db *gorm.DB, cb IndexCallback, opts *SlurperOptions) (*Slurper,
ssl: opts.SSL,
shutdownChan: make(chan bool),
shutdownResult: make(chan []error),
log: opts.Logger,
}
if err := s.loadConfig(); err != nil {
return nil, err
Expand Down
9 changes: 1 addition & 8 deletions events/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,13 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,
go func() {
t := time.NewTicker(time.Second * 30)
defer t.Stop()
failcount := 0

for {

select {
case <-t.C:
if err := con.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil {
log.Warn("failed to ping", "err", err)
failcount++
if failcount >= 4 {
log.Error("too many ping fails", "count", failcount)
con.Close()
return
}
}
case <-ctx.Done():
con.Close()
Expand Down Expand Up @@ -176,7 +169,7 @@ func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler,

mt, rawReader, err := con.NextReader()
if err != nil {
return fmt.Errorf("con err at read: %w", err)
return err
}

switch mt {
Expand Down
60 changes: 60 additions & 0 deletions events/repostream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package events

import (
"context"

"github.com/bluesky-social/indigo/repomgr"

"github.com/gorilla/websocket"
cid "github.com/ipfs/go-cid"
)

type LiteStreamHandleFunc func(op repomgr.EventKind, seq int64, path string, did string, rcid *cid.Cid, rec any) error

func ConsumeRepoStreamLite2(ctx context.Context, con *websocket.Conn, cb LiteStreamHandleFunc) error {
/*
return HandleRepoStream(ctx, con, &RepoStreamCallbacks{
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
if evt.TooBig {
log.Errorf("skipping too big events for now: %d", evt.Seq)
return nil
}
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
if err != nil {
return fmt.Errorf("reading repo from car (seq: %d, len: %d): %w", evt.Seq, len(evt.Blocks), err)
}
for _, op := range evt.Ops {
ek := repomgr.EventKind(op.Action)
switch ek {
case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
rc, rec, err := r.GetRecord(ctx, op.Path)
if err != nil {
e := fmt.Errorf("getting record %s (%s) within seq %d for %s: %w", op.Path, *op.Cid, evt.Seq, evt.Repo, err)
log.Error(e)
continue
}
if lexutil.LexLink(rc) != *op.Cid {
// TODO: do we even error here?
return fmt.Errorf("mismatch in record and op cid: %s != %s", rc, *op.Cid)
}
if err := cb(ek, evt.Seq, op.Path, evt.Repo, &rc, rec); err != nil {
log.Errorf("event consumer callback (%s): %s", ek, err)
continue
}
case repomgr.EvtKindDeleteRecord:
if err := cb(ek, evt.Seq, op.Path, evt.Repo, nil, nil); err != nil {
log.Errorf("event consumer callback (%s): %s", ek, err)
continue
}
}
}
return nil
},
})
*/
return nil
}
2 changes: 1 addition & 1 deletion models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type FollowRecord struct {
type PDS struct {
gorm.Model

Host string `gorm:"unique"`
Host string
Did string
SSL bool
Cursor int64
Expand Down

0 comments on commit 4048d23

Please sign in to comment.