Skip to content

Commit

Permalink
net: dont wait while pushing (#310)
Browse files Browse the repository at this point in the history
Signed-off-by: Sander Pick <sanderpick@gmail.com>
  • Loading branch information
sanderpick authored Apr 14, 2020
1 parent 33170c5 commit 1ea8c06
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 11 deletions.
3 changes: 1 addition & 2 deletions integrationtests/foldersync/foldersync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

func TestMain(m *testing.M) {
logging.SetLogLevel("foldersync", "info")
_ = logging.SetLogLevel("foldersync", "info")
// logging.SetLogLevel("store", "debug")
// logging.SetLogLevel("threads", "debug")
// logging.SetLogLevel("threadstore", "debug")
Expand Down Expand Up @@ -162,7 +162,6 @@ func TestNUsersBootstrap(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(fmt.Sprintf("Total%dCore%d", tt.totalClients, tt.totalCorePeers), func(t *testing.T) {
// t.Parallel()
var clients []*client

client0, clean0 := createRootClient(t, fmt.Sprintf("client%d", 0))
Expand Down
1 change: 1 addition & 0 deletions net/api/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func TestClient_Subscribe(t *testing.T) {
if _, err = client1.CreateRecord(context.Background(), info.ID, body2); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)

lock.Lock()
if rcount != 2 {
Expand Down
7 changes: 1 addition & 6 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,8 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec
}

// Push to each address
wg := sync.WaitGroup{}
for _, addr := range addrs {
wg.Add(1)
go func(addr ma.Multiaddr) {
defer wg.Done()
p, err := addr.ValueForProtocol(ma.P_P2P)
if err != nil {
log.Error(err)
Expand All @@ -337,7 +334,7 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec

log.Debugf("pushing record to %s...", p)

cctx, cancel := context.WithTimeout(ctx, reqTimeout)
cctx, cancel := context.WithTimeout(context.Background(), reqTimeout)
defer cancel()
conn, err := s.dial(cctx, pid, grpc.WithInsecure())
if err != nil {
Expand Down Expand Up @@ -386,8 +383,6 @@ func (s *server) pushRecord(ctx context.Context, id thread.ID, lid peer.ID, rec
if err = s.ps.Publish(ctx, id, req); err != nil {
log.Errorf("error publishing record: %s", err)
}

wg.Wait()
return nil
}

Expand Down
9 changes: 6 additions & 3 deletions net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,10 @@ func (n *net) AddRecord(ctx context.Context, id thread.ID, lid peer.ID, rec core
if err = rec.Verify(logpk); err != nil {
return err
}
return n.PutRecord(ctx, id, lid, rec)
if err = n.PutRecord(ctx, id, lid, rec); err != nil {
return err
}
return n.server.pushRecord(ctx, id, lid, rec)
}

func (n *net) GetRecord(ctx context.Context, id thread.ID, rid cid.Cid, opts ...core.ThreadOption) (core.Record, error) {
Expand Down Expand Up @@ -819,10 +822,10 @@ func (n *net) putRecord(ctx context.Context, id thread.ID, lid peer.ID, rec core

log.Debugf("put record %s (thread=%s, log=%s)", r.Cid(), id, lg.ID)

if err = n.bus.SendWithTimeout(NewRecord(r, id, lg.ID), notifyTimeout); err != nil {
if err = n.store.SetHead(id, lg.ID, r.Cid()); err != nil {
return err
}
if err = n.store.SetHead(id, lg.ID, r.Cid()); err != nil {
if err = n.bus.SendWithTimeout(NewRecord(r, id, lg.ID), notifyTimeout); err != nil {
return err
}
}
Expand Down
2 changes: 2 additions & 0 deletions net/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
rand "crypto/rand"
"testing"
"time"

bserv "github.com/ipfs/go-blockservice"
ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestNet_AddThread(t *testing.T) {
if _, err = n2.CreateRecord(ctx, info2.ID, body2); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)

info3, err := n1.GetThread(context.Background(), info.ID)
if err != nil {
Expand Down

0 comments on commit 1ea8c06

Please sign in to comment.