Skip to content

Commit

Permalink
Merge pull request ipfs#107 from libp2p/feat/event-log
Browse files Browse the repository at this point in the history
Feat/event log
  • Loading branch information
Stebalien authored Dec 12, 2017
2 parents a68a53a + 24c9006 commit c9557b6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 18 deletions.
12 changes: 9 additions & 3 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (
"peer": p,
}

defer log.EventBegin(ctx, "getValueSingle", meta).Done()
eip := log.EventBegin(ctx, "getValueSingle", meta)
defer eip.Done()

pmes := pb.NewMessage(pb.Message_GET_VALUE, key, 0)
resp, err := dht.sendRequest(ctx, p, pmes)
Expand All @@ -211,6 +212,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (
log.Warningf("read timeout: %s %s", p.Pretty(), key)
fallthrough
default:
eip.SetError(err)
return nil, err
}
}
Expand Down Expand Up @@ -283,7 +285,8 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
defer log.EventBegin(ctx, "findPeerSingle", p, id).Done()
eip := log.EventBegin(ctx, "findPeerSingle", p, id)
defer eip.Done()

pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
Expand All @@ -294,12 +297,14 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
log.Warningf("read timeout: %s %s", p.Pretty(), id)
fallthrough
default:
eip.SetError(err)
return nil, err
}
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid.Cid) (*pb.Message, error) {
defer log.EventBegin(ctx, "findProvidersSingle", p, key).Done()
eip := log.EventBegin(ctx, "findProvidersSingle", p, key)
defer eip.Done()

pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.KeyString(), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
Expand All @@ -310,6 +315,7 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid
log.Warningf("read timeout: %s %s", p.Pretty(), key)
fallthrough
default:
eip.SetError(err)
return nil, err
}
}
Expand Down
31 changes: 24 additions & 7 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
}
}

func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handleGetValue", p).Done()
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
eip := log.EventBegin(ctx, "handleGetValue", p)
defer func() {
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())

// setup response
Expand Down Expand Up @@ -147,8 +153,15 @@ func (dht *IpfsDHT) checkLocalDatastore(k string) (*recpb.Record, error) {
}

// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
defer log.EventBegin(ctx, "handlePutValue", p).Done()
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
eip := log.EventBegin(ctx, "handlePutValue", p)
defer func() {
if err != nil {
eip.SetError(err)
}
eip.Done()
}()

dskey := convertToDsKey(pmes.GetKey())

rec := pmes.GetRecord()
Expand All @@ -157,7 +170,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
return nil, errors.New("nil record")
}

if err := dht.verifyRecordLocally(rec); err != nil {
if err = dht.verifyRecordLocally(rec); err != nil {
log.Warningf("Bad dht record in PUT from: %s. %s", peer.ID(pmes.GetRecord().GetAuthor()), err)
return nil, err
}
Expand Down Expand Up @@ -214,11 +227,13 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
lm := make(lgbl.DeferredMap)
lm["peer"] = func() interface{} { return p.Pretty() }
defer log.EventBegin(ctx, "handleGetProviders", lm).Done()
eip := log.EventBegin(ctx, "handleGetProviders", lm)
defer eip.Done()

resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
c, err := cid.Cast([]byte(pmes.GetKey()))
if err != nil {
eip.SetError(err)
return nil, err
}

Expand Down Expand Up @@ -263,10 +278,12 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
lm := make(lgbl.DeferredMap)
lm["peer"] = func() interface{} { return p.Pretty() }
eip := log.EventBegin(ctx, "handleAddProvider", lm)
defer eip.Done()

defer log.EventBegin(ctx, "handleAddProvider", lm).Done()
c, err := cid.Cast([]byte(pmes.GetKey()))
if err != nil {
eip.SetError(err)
return nil, err
}

Expand Down
51 changes: 43 additions & 8 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,15 @@ var asyncQueryBuffer = 10

// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) error {
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) (err error) {
eip := log.EventBegin(ctx, "PutValue")
defer func() {
eip.Append(loggableKey(key))
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
log.Debugf("PutValue %s", key)
sk, err := dht.getOwnPrivateKey()
if err != nil {
Expand Down Expand Up @@ -84,7 +92,15 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte) erro
}

// GetValue searches for the value corresponding to given Key.
func (dht *IpfsDHT) GetValue(ctx context.Context, key string) ([]byte, error) {
func (dht *IpfsDHT) GetValue(ctx context.Context, key string) (_ []byte, err error) {
eip := log.EventBegin(ctx, "GetValue")
defer func() {
eip.Append(loggableKey(key))
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

Expand Down Expand Up @@ -143,7 +159,15 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string) ([]byte, error) {
return best, nil
}

func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]routing.RecvdVal, error) {
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []routing.RecvdVal, err error) {
eip := log.EventBegin(ctx, "GetValues")
defer func() {
eip.Append(loggableKey(key))
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
vals := make([]routing.RecvdVal, 0, nvals)
var valslock sync.Mutex

Expand Down Expand Up @@ -234,15 +258,20 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) ([]rou
}

return vals, nil

}

// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.

// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) error {
defer log.EventBegin(ctx, "provide", key, logging.LoggableMap{"broadcast": brdcst}).Done()
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
defer func() {
if err != nil {
eip.SetError(err)
}
eip.Done()
}()

// add self locally
dht.providers.AddProvider(ctx, key, dht.self)
Expand Down Expand Up @@ -407,8 +436,14 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid,
}

// FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (pstore.PeerInfo, error) {
defer log.EventBegin(ctx, "FindPeer", id).Done()
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error) {
eip := log.EventBegin(ctx, "FindPeer", id)
defer func() {
if err != nil {
eip.SetError(err)
}
eip.Done()
}()

// Check if were already connected to them
if pi := dht.FindLocal(id); pi.ID != "" {
Expand Down

0 comments on commit c9557b6

Please sign in to comment.