Skip to content

Commit

Permalink
tatanka: switch to lexi db package and clean up message handling (#3153)
Browse files Browse the repository at this point in the history
* switch to lexi db package and clean up message handling

* version db. simplify rep scoring
  • Loading branch information
buck54321 authored Feb 20, 2025
1 parent 1057ed8 commit 2b683d5
Show file tree
Hide file tree
Showing 15 changed files with 657 additions and 772 deletions.
5 changes: 5 additions & 0 deletions dex/encode/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func Uint64Bytes(i uint64) []byte {
return b
}

// BytesToUint64 converts the length-8, big-endian encoded byte slice to a uint64.
func BytesToUint64(i []byte) uint64 {
return IntCoder.Uint64(i[:8])
}

// CopySlice makes a copy of the slice.
func CopySlice(b []byte) []byte {
newB := make([]byte, len(b))
Expand Down
21 changes: 16 additions & 5 deletions dex/lexi/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ func (db *DB) Table(name string) (*Table, error) {
}, nil
}

// Get retrieves a value from the Table.
func (t *Table) Get(k encoding.BinaryMarshaler, v encoding.BinaryUnmarshaler) error {
// GetRaw retrieves a value from the Table as raw bytes.
func (t *Table) GetRaw(k encoding.BinaryMarshaler) (b []byte, err error) {
kB, err := k.MarshalBinary()
if err != nil {
return fmt.Errorf("error marshaling key: %w", err)
return nil, fmt.Errorf("error marshaling key: %w", err)
}
return t.View(func(txn *badger.Txn) error {
err = t.View(func(txn *badger.Txn) error {
dbID, err := t.keyID(txn, kB, true)
if err != nil {
return convertError(err)
Expand All @@ -50,8 +50,19 @@ func (t *Table) Get(k encoding.BinaryMarshaler, v encoding.BinaryUnmarshaler) er
if err != nil {
return err
}
return v.UnmarshalBinary(d.v)
b = d.v
return nil
})
return
}

// Get retrieves a value from the Table.
func (t *Table) Get(k encoding.BinaryMarshaler, thing encoding.BinaryUnmarshaler) error {
b, err := t.GetRaw(k)
if err != nil {
return err
}
return thing.UnmarshalBinary(b)
}

// func (t *Table) GetDBID(dbID DBID, v encoding.BinaryUnmarshaler) error {
Expand Down
160 changes: 96 additions & 64 deletions tatanka/client_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func (t *Tatanka) registerRemoteClient(tankaID, clientID tanka.PeerID) {
}
}

// Tatanka.specialHandlers

// handleClientConnect handles a new locally-connected client. checking
// reputation before adding the client to the map.
func (t *Tatanka) handleClientConnect(cl tanka.Sender, msg *msgjson.Message) *msgjson.Error {
Expand All @@ -102,7 +104,7 @@ func (t *Tatanka) handleClientConnect(cl tanka.Sender, msg *msgjson.Message) *ms
return msgjson.NewError(mj.ErrBadRequest, "error unmarshaling client connection configuration from %q: %v", cl.PeerID(), err)
}

p, rrs, err := t.loadPeer(conn.ID)
p, err := t.db.Peer(conn.ID)
if err != nil {
return msgjson.NewError(mj.ErrInternal, "error getting peer info for peer %q: %v", conn.ID, err)
}
Expand All @@ -113,7 +115,7 @@ func (t *Tatanka) handleClientConnect(cl tanka.Sender, msg *msgjson.Message) *ms

cl.SetPeerID(p.ID)

pp := &peer{Peer: p, Sender: cl, rrs: rrs}
pp := &peer{Peer: p, Sender: cl, rrs: make(map[tanka.PeerID]*tanka.Reputation)}
if pp.banned() {
return msgjson.NewError(mj.ErrBannned, "your tier is <= 0. post some bonds")
}
Expand All @@ -132,67 +134,25 @@ func (t *Tatanka) handleClientConnect(cl tanka.Sender, msg *msgjson.Message) *ms

t.sendResult(cl, msg.ID, t.generateConfig(bondTier))

note := mj.MustNotification(mj.RouteNewClient, conn)
for _, s := range t.tatankaNodes() {
if err := t.send(s, note); err != nil {
req := mj.MustRequest(mj.RouteNewClient, conn)
for tt, s := range t.tatankaNodes() {
if err := t.request(s, req, func(m *msgjson.Message) {
var rep *tanka.Reputation
if err := m.UnmarshalResult(&rep); err != nil {
t.log.Errorf("error parsing response for new client request to %s: %w", tt, err)
return
}
pp.mtx.Lock()
pp.rrs[conn.ID] = rep
pp.mtx.Unlock()
}); err != nil {
t.log.Errorf("error sharing new client info with tatanka node %q", s.ID)
}
}

return nil
}

// handleClientMessage handles incoming message from locally-connected clients.
// All messages except for handleClientConnect and handlePostBond are handled
// here, with some common pre-processing and validation done before the
// subsequent route handler is called.
func (t *Tatanka) handleClientMessage(cl tanka.Sender, msg *msgjson.Message) *msgjson.Error {
peerID := cl.PeerID()
c := t.clientNode(peerID)
if c == nil {
t.log.Errorf("Ignoring message from unknown client %s", peerID)
cl.Disconnect()
return nil
}

if err := mj.CheckSig(msg, c.PubKey); err != nil {
t.log.Errorf("Signature error for %q message from %q: %v", msg.Route, c.ID, err)
return msgjson.NewError(mj.ErrSig, "signature doesn't check")
}

t.clientMtx.RLock()
c, found := t.clients[peerID]
t.clientMtx.RUnlock()
if !found {
t.log.Errorf("client %s sent a message requiring tier before connecting", peerID)
return msgjson.NewError(mj.ErrAuth, "not connected")
}

switch msg.Type {
case msgjson.Request:
switch msg.Route {
case mj.RouteSubscribe:
return t.handleSubscription(c, msg)
// case mj.RouteUnsubscribe:
// return t.handleUnsubscribe(c, msg)
case mj.RouteBroadcast:
return t.handleBroadcast(c.peer, msg, true)
case mj.RouteTankagram:
return t.handleTankagram(c, msg)
default:
return msgjson.NewError(mj.ErrBadRequest, "unknown request route %q", msg.Route)
}
case msgjson.Notification:
switch msg.Route {
default:
// TODO: What? Can't let this happen too much.
return msgjson.NewError(mj.ErrBadRequest, "unknown notification route %q", msg.Route)
}
default:
return msgjson.NewError(mj.ErrBadRequest, "unknown message type %d", msg.Type)
}
}

// handlePostBond handles a new bond sent from a locally connected client.
// handlePostBond is the only client route than can be invoked before the user
// is bonded.
Expand Down Expand Up @@ -220,7 +180,6 @@ func (t *Tatanka) handlePostBond(cl tanka.Sender, msg *msgjson.Message) *msgjson
}
}

var allBonds []*tanka.Bond
for _, b := range bonds {
if b == nil {
t.log.Errorf("Bond-posting client %s sent a nil bond", peerID)
Expand All @@ -244,18 +203,21 @@ func (t *Tatanka) handlePostBond(cl tanka.Sender, msg *msgjson.Message) *msgjson
return msgjson.NewError(mj.ErrBadRequest, "failed validation")
}

var err error
allBonds, err = t.db.StoreBond(b)
if err != nil {
if err := t.db.StoreBond(b); err != nil {
t.log.Errorf("Error storing bond for client %s in db: %v", peerID, err)
return msgjson.NewError(mj.ErrInternal, "internal error")
}
}

liveBonds, err := t.db.GetBonds(peerID)
if err != nil {
t.log.Errorf("Error retrieving bonds for client %s in db: %v", peerID, err)
msgjson.NewError(mj.ErrInternal, "internal error")
}

if len(allBonds) > 0 { // Probably no way to get here with empty allBonds, but checking anyway.
if len(liveBonds) > 0 { // Probably no way to get here with empty liveBonds, but checking anyway.
if c := t.clientNode(peerID); c != nil {
c.updateBonds(allBonds)
c.updateBonds(liveBonds)
}
}

Expand All @@ -264,6 +226,38 @@ func (t *Tatanka) handlePostBond(cl tanka.Sender, msg *msgjson.Message) *msgjson
return nil
}

// Tatanka.clientHandlers

type clientRequestHandler = func(c *client, msg *msgjson.Message) *msgjson.Error
type clientNotificationHandler = func(c *client, msg *msgjson.Message)

// handleClientMessage handles incoming messages from locally-connected clients.
// All messages except for handleClientConnect and handlePostBond are handled
// here, with some common pre-processing and validation done before the
// subsequent route handler is called.
func (t *Tatanka) handleClientMessage(cl tanka.Sender, msg *msgjson.Message) *msgjson.Error {
peerID := cl.PeerID()
c := t.clientNode(peerID)
if c == nil {
t.log.Errorf("Ignoring message from unknown client %s", peerID)
cl.Disconnect()
return nil
}

if err := mj.CheckSig(msg, c.PubKey); err != nil {
t.log.Errorf("Signature error for %q message from %q: %v", msg.Route, c.ID, err)
return msgjson.NewError(mj.ErrSig, "signature doesn't check")
}

switch handle := t.clientHandlers[msg.Route].(type) {
case clientRequestHandler:
return handle(c, msg)
case clientNotificationHandler:
handle(c, msg)
}
return nil // Notification
}

// handleSubscription handles a new subscription, adding the subject to the
// map if it doesn't exist. It then distributes a NewSubscriber broadcast
// to all current subscribers and remote tatankas.
Expand Down Expand Up @@ -478,7 +472,8 @@ func (t *Tatanka) distributeBroadcastedMessage(bcast *mj.Broadcast, mustExist bo

// handleBroadcast handles a broadcast from a locally connected client,
// forwarding the message to all remote tatankas and local subscribers.
func (t *Tatanka) handleBroadcast(p *peer, msg *msgjson.Message, mustExist bool) *msgjson.Error {
func (t *Tatanka) handleBroadcast(c *client, msg *msgjson.Message) *msgjson.Error {
p := c.peer
if t.skipRelay(msg) {
return nil
}
Expand All @@ -503,7 +498,7 @@ func (t *Tatanka) handleBroadcast(p *peer, msg *msgjson.Message, mustExist bool)
t.relayBroadcast(bcast, p.ID)

// Send to local subscribers.
if msgErr := t.distributeBroadcastedMessage(bcast, mustExist); msgErr != nil {
if msgErr := t.distributeBroadcastedMessage(bcast, true); msgErr != nil {
return msgErr
}

Expand Down Expand Up @@ -635,6 +630,43 @@ func (t *Tatanka) handleTankagram(c *client, msg *msgjson.Message) *msgjson.Erro
return nil
}

func (t *Tatanka) handleSetScore(c *client, msg *msgjson.Message) {
scorer := c.peer.ID
var score *mj.ScoreReport
if err := msg.Unmarshal(&score); err != nil {
t.log.Errorf("error unmarshaling set_score from %s: %v", scorer, err)
return
}
if err := t.db.SetScore(score.PeerID, scorer, score.Score, time.Now()); err != nil {
t.log.Errorf("error adding score from %s for %s to db: %v", scorer, score.PeerID, err)
}
rep, err := t.db.Reputation(score.PeerID)
if err != nil {
t.log.Errorf("error getting reputation after score update from %s for %s: %v", score.PeerID, scorer, err)
return
}
t.clientMtx.RLock()
c, found := t.clients[score.PeerID]
t.clientMtx.RUnlock()
if found {
c.mtx.Lock()
c.Reputation = rep
c.mtx.Unlock()
}

note := mj.MustNotification(mj.RouteShareScore, &mj.SharedScore{
Scorer: scorer,
Scored: score.PeerID,
Score: score.Score,
Reputation: rep,
})
for _, tt := range t.tatankaNodes() {
if err := t.send(tt, note); err != nil {
t.log.Errorf("error notifying %s of new score: %v", tt.ID, err)
}
}
}

const ErrNoPath = dex.ErrorKind("no path")

// requestAnyOne tries to request from the senders in order until one succeeds.
Expand Down
Loading

0 comments on commit 2b683d5

Please sign in to comment.