Skip to content

Commit

Permalink
Eliminate semaphore contention (#435)
Browse files Browse the repository at this point in the history
* Implement semaphore and pool

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Use semaphors in net

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Control event bus capacity with var

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Make app-connector operations thread-safe

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* WIP: implement putting record into the log with decoupled data loading

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Fix: limit record chain length

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Generic semaphor IDs in pool

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Protect getRecords call with per-thread-per-log semaphore

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Remove no longer required semaphores from log/thread operations

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Lock pulls on a thread-level

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>

* Move error wrapper to package level

Signed-off-by: Anton Dort-Golts <dortgolts@gmail.com>
  • Loading branch information
dgtony authored Sep 22, 2020
1 parent 771a545 commit 28845b5
Show file tree
Hide file tree
Showing 4 changed files with 430 additions and 236 deletions.
172 changes: 97 additions & 75 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package net

import (
"context"
"errors"
"fmt"
nnet "net"
"sync"
Expand All @@ -13,10 +12,10 @@ import (
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
gostream "github.com/libp2p/go-libp2p-gostream"
ma "github.com/multiformats/go-multiaddr"
"github.com/textileio/go-threads/cbor"
lstore "github.com/textileio/go-threads/core/logstore"
core "github.com/textileio/go-threads/core/net"
"github.com/textileio/go-threads/core/thread"
sym "github.com/textileio/go-threads/crypto/symmetric"
Expand Down Expand Up @@ -166,8 +165,14 @@ func (r *records) Store(p peer.ID, key cid.Cid, value core.Record) {
}

// getRecords from log addresses.
func (s *server) getRecords(ctx context.Context, id thread.ID, lid peer.ID, offsets map[peer.ID]cid.Cid, limit int) (map[peer.ID][]core.Record, error) {
sk, err := s.net.store.ServiceKey(id)
func (s *server) getRecords(
ctx context.Context,
tid thread.ID,
lid peer.ID,
offsets map[peer.ID]cid.Cid,
limit int,
) (map[peer.ID][]core.Record, error) {
sk, err := s.net.store.ServiceKey(tid)
if err != nil {
return nil, err
}
Expand All @@ -185,7 +190,7 @@ func (s *server) getRecords(ctx context.Context, id thread.ID, lid peer.ID, offs
}

body := &pb.GetRecordsRequest_Body{
ThreadID: &pb.ProtoThreadID{ID: id},
ThreadID: &pb.ProtoThreadID{ID: tid},
ServiceKey: &pb.ProtoKey{Key: sk},
Logs: pblgs,
}
Expand All @@ -201,81 +206,84 @@ func (s *server) getRecords(ctx context.Context, id thread.ID, lid peer.ID, offs
Body: body,
}

lg, err := s.net.store.GetLog(id, lid)
logAddrs, err := s.net.store.Addrs(tid, lid)
if err != nil {
return nil, err
}

var (
recs = newRecords()
wg sync.WaitGroup
)

// Pull from each address
recs := newRecords()
wg := sync.WaitGroup{}
for _, addr := range lg.Addrs {
for _, addr := range logAddrs {
wg.Add(1)
go func(addr ma.Multiaddr) {

go withErrLog(addr, func(addr ma.Multiaddr) error {
defer wg.Done()
p, err := addr.ValueForProtocol(ma.P_P2P)
pid, ok, err := s.callablePeer(addr)
if err != nil {
log.Error(err)
return
}
pid, err := peer.Decode(p)
if err != nil {
log.Error(err)
return
}
if pid.String() == s.net.host.ID().String() {
return
return err
} else if !ok {
// skip calling itself
return nil
}

log.Debugf("getting records from %s...", p)
log.Debugf("getting records from %s...", pid)

client, err := s.dial(pid)
if err != nil {
log.Errorf("dial %s failed: %s", p, err)
return
return fmt.Errorf("dial %s failed: %w", pid, err)
}

cctx, cancel := context.WithTimeout(ctx, PullTimeout)
defer cancel()
reply, err := client.GetRecords(cctx, req)
if err != nil {
log.Warnf("get records from %s failed: %s", p, err)
return
log.Warnf("get records from %s failed: %s", pid, err)
return nil
}

for _, l := range reply.Logs {
log.Debugf("received %d records in log %s from %s", len(l.Records), l.LogID.ID, p)
var logID = l.LogID.ID
log.Debugf("received %d records in log %s from %s", len(l.Records), logID, pid)

lg, err := s.net.store.GetLog(id, l.LogID.ID)
if err != nil && !errors.Is(err, lstore.ErrLogNotFound) {
log.Error(err)
return
if l.Log != nil && len(l.Log.Addrs) > 0 {
if err = s.net.store.AddAddrs(tid, logID, addrsFromProto(l.Log.Addrs), pstore.PermanentAddrTTL); err != nil {
return err
}
}

pk, err := s.net.store.PubKey(tid, logID)
if err != nil {
return err
}
if lg.PubKey == nil {
if l.Log != nil {
lg = logFromProto(l.Log)
lg.Head = cid.Undef
if err = s.net.store.AddLog(id, lg); err != nil {
log.Error(err)
return
}
} else {

if pk == nil {
if l.Log == nil || l.Log.PubKey == nil {
// cannot verify received records
continue
}
if err := s.net.store.AddPubKey(tid, logID, l.Log.PubKey); err != nil {
return err
}
pk = l.Log.PubKey
}

for _, r := range l.Records {
rec, err := cbor.RecordFromProto(r, sk)
if err != nil {
log.Error(err)
return
return err
}
if err = rec.Verify(lg.PubKey); err != nil {
log.Error(err)
return
if err = rec.Verify(pk); err != nil {
return err
}
recs.Store(lg.ID, rec.Cid(), rec)
recs.Store(logID, rec.Cid(), rec)
}
}
}(addr)
return nil
})
}
wg.Wait()

Expand Down Expand Up @@ -317,47 +325,35 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec

// Push to each address
for _, addr := range addrs {
go func(addr ma.Multiaddr) {
p, err := addr.ValueForProtocol(ma.P_P2P)
if err != nil {
log.Error(err)
return
}
pid, err := peer.Decode(p)
go withErrLog(addr, func(addr ma.Multiaddr) error {
pid, ok, err := s.callablePeer(addr)
if err != nil {
log.Error(err)
return
return err
} else if !ok {
// skip calling itself
return nil
}
if pid.String() == s.net.host.ID().String() {
return
}

log.Debugf("pushing record to %s...", p)

client, err := s.dial(pid)
if err != nil {
log.Errorf("dial %s failed: %s", p, err)
return
return fmt.Errorf("dial %s failed: %w", pid, err)
}
cctx, cancel := context.WithTimeout(context.Background(), PushTimeout)
defer cancel()
if _, err = client.PushRecord(cctx, req); err != nil {
if status.Convert(err).Code() == codes.NotFound { // Send the missing log
log.Debugf("pushing log %s to %s...", lid, p)

log.Debugf("pushing log %s to %s...", lid, pid)
l, err := s.net.store.GetLog(id, lid)
if err != nil {
log.Error(err)
return
return err
}
body := &pb.PushLogRequest_Body{
ThreadID: &pb.ProtoThreadID{ID: id},
Log: logToProto(l),
}
sig, key, err := s.signRequestBody(body)
if err != nil {
log.Error(err)
return
return err
}
lreq := &pb.PushLogRequest{
Header: &pb.Header{
Expand All @@ -367,15 +363,16 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec
Body: body,
}
if _, err = client.PushLog(cctx, lreq); err != nil {
log.Warnf("push log to %s failed: %s", p, err)
return
log.Warnf("push log to %s failed: %s", pid, err)
return nil
}
return
return nil
}
log.Warnf("push record to %s failed: %s", p, err)
return
log.Warnf("push record to %s failed: %s", pid, err)
return nil
}
}(addr)
return nil
})
}

// Finally, publish to the thread's topic
Expand All @@ -388,6 +385,25 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec
return nil
}

// callablePeer attempts to obtain external peer ID from the multiaddress.
func (s *server) callablePeer(addr ma.Multiaddr) (peer.ID, bool, error) {
p, err := addr.ValueForProtocol(ma.P_P2P)
if err != nil {
return "", false, err
}

pid, err := peer.Decode(p)
if err != nil {
return "", false, err
}

if pid.String() == s.net.host.ID().String() {
return pid, false, nil
}

return pid, true, nil
}

// dial attempts to open a gRPC connection over libp2p to a peer.
func (s *server) dial(peerID peer.ID) (pb.ServiceClient, error) {
s.Lock()
Expand Down Expand Up @@ -446,3 +462,9 @@ func (s *server) signRequestBody(msg proto.Marshaler) (sig []byte, pk crypto.Pub
}
return sig, sk.GetPublic(), nil
}

func withErrLog(addr ma.Multiaddr, f func(addr ma.Multiaddr) error) {
if err := f(addr); err != nil {
log.Error(err.Error())
}
}
Loading

0 comments on commit 28845b5

Please sign in to comment.