Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logs + msgio error #210

Merged
merged 4 commits into from
Oct 26, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions blocks/blocks.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package blocks

import (
"fmt"

mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
u "github.com/jbenet/go-ipfs/util"
)
Expand All @@ -20,3 +22,7 @@ func NewBlock(data []byte) *Block {
func (b *Block) Key() u.Key {
return u.Key(b.Multihash)
}

func (b *Block) String() string {
return fmt.Sprintf("[Block %s]", b.Key())
}
6 changes: 5 additions & 1 deletion blockservice/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package blockservice
import (
"bytes"
"testing"
"time"

"code.google.com/p/go.net/context"

ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
blocks "github.com/jbenet/go-ipfs/blocks"
Expand Down Expand Up @@ -37,7 +40,8 @@ func TestBlocks(t *testing.T) {
t.Error("returned key is not equal to block key", err)
}

b2, err := bs.GetBlock(b.Key())
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
b2, err := bs.GetBlock(ctx, b.Key())
if err != nil {
t.Error("failed to retrieve block from BlockService", err)
return
Expand Down
10 changes: 6 additions & 4 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package blockservice

import (
"fmt"
"time"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
Expand Down Expand Up @@ -52,8 +51,8 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {

// GetBlock retrieves a particular block from the service,
// Getting it from the datastore using the key (hash).
func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
log.Debug("BlockService GetBlock: '%s'", k)
func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) {
log.Debugf("BlockService GetBlock: '%s'", k)
datai, err := s.Datastore.Get(k.DsKey())
if err == nil {
log.Debug("Blockservice: Got data in datastore.")
Expand All @@ -67,7 +66,6 @@ func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
}, nil
} else if err == ds.ErrNotFound && s.Remote != nil {
log.Debug("Blockservice: Searching bitswap.")
ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
blk, err := s.Remote.Block(ctx, k)
if err != nil {
return nil, err
Expand All @@ -78,3 +76,7 @@ func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
return nil, u.ErrNotFound
}
}

func (s *BlockService) DeleteBlock(k u.Key) error {
return s.Datastore.Delete(k.DsKey())
}
6 changes: 5 additions & 1 deletion core/commands/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"io"
"io/ioutil"
"os"
"time"

"code.google.com/p/go.net/context"

mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
"github.com/jbenet/go-ipfs/blocks"
Expand All @@ -26,7 +29,8 @@ func BlockGet(n *core.IpfsNode, args []string, opts map[string]interface{}, out

k := u.Key(h)
log.Debug("BlockGet key: '%q'", k)
b, err := n.Blocks.GetBlock(k)
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
b, err := n.Blocks.GetBlock(ctx, k)
if err != nil {
return fmt.Errorf("block get: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions crypto/spipe/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *SecurePipe) handshake() error {
return err
}

log.Debug("handshake: %s <--> %s", s.local, s.remote)
log.Debugf("handshake: %s <--> %s", s.local, s.remote)
myPubKey, err := s.local.PubKey().Bytes()
if err != nil {
return err
Expand Down Expand Up @@ -105,7 +105,7 @@ func (s *SecurePipe) handshake() error {
if err != nil {
return err
}
log.Debug("%s Remote Peer Identified as %s", s.local, s.remote)
log.Debugf("%s Remote Peer Identified as %s", s.local, s.remote)

exchange, err := selectBest(SupportedExchanges, proposeResp.GetExchanges())
if err != nil {
Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *SecurePipe) handshake() error {
return fmt.Errorf("Negotiation failed, got: %s", resp2)
}

log.Debug("%s handshake: Got node id: %s", s.local, s.remote)
log.Debugf("%s handshake: Got node id: %s", s.local, s.remote)
return nil
}

Expand Down
32 changes: 23 additions & 9 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type bitswap struct {
//
// TODO ensure only one active request per key
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
log.Debug("Get Block %v", k)
log.Debugf("Get Block %v", k)

ctx, cancelFunc := context.WithCancel(parent)
bs.wantlist.Add(k)
Expand All @@ -82,10 +82,10 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
}
message.AppendWanted(k)
for peerToQuery := range peersToQuery {
log.Debug("bitswap got peersToQuery: %s", peerToQuery)
log.Debugf("bitswap got peersToQuery: %s", peerToQuery)
go func(p peer.Peer) {

log.Debug("bitswap dialing peer: %s", p)
log.Debugf("bitswap dialing peer: %s", p)
err := bs.sender.DialPeer(p)
if err != nil {
log.Errorf("Error sender.DialPeer(%s)", p)
Expand All @@ -94,7 +94,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)

response, err := bs.sender.SendRequest(ctx, p, message)
if err != nil {
log.Errorf("Error sender.SendRequest(%s)", p)
log.Error("Error sender.SendRequest(%s) = %s", p, err)
return
}
// FIXME ensure accounting is handled correctly when
Expand Down Expand Up @@ -124,7 +124,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
// HasBlock announces the existance of a block to bitswap, potentially sending
// it to peers (Partners) whose WantLists include it.
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
log.Debug("Has Block %v", blk.Key())
log.Debugf("Has Block %v", blk.Key())
bs.wantlist.Remove(blk.Key())
bs.sendToPeersThatWant(ctx, blk)
return bs.routing.Provide(ctx, blk.Key())
Expand All @@ -133,17 +133,23 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
// TODO(brian): handle errors
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
peer.Peer, bsmsg.BitSwapMessage) {
log.Debug("ReceiveMessage from %v", p.Key())
log.Debugf("ReceiveMessage from %v", p.Key())
log.Debugf("Message wantlist: %v", incoming.Wantlist())

if p == nil {
log.Error("Received message from nil peer!")
// TODO propagate the error upward
return nil, nil
}
if incoming == nil {
log.Error("Got nil bitswap message!")
// TODO propagate the error upward
return nil, nil
}

// Record message bytes in ledger
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger
bs.strategy.MessageReceived(p, incoming) // FIRST

for _, block := range incoming.Blocks() {
Expand All @@ -153,7 +159,10 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
}
go bs.notifications.Publish(block)
go func(block blocks.Block) {
_ = bs.HasBlock(ctx, block) // FIXME err ignored
err := bs.HasBlock(ctx, block) // FIXME err ignored
if err != nil {
log.Errorf("HasBlock errored: %s", err)
}
}(block)
}

Expand All @@ -162,6 +171,8 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
message.AppendWanted(wanted)
}
for _, key := range incoming.Wantlist() {
// TODO: might be better to check if we have the block before checking
// if we should send it to someone
if bs.strategy.ShouldSendBlockToPeer(key, p) {
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
continue
Expand All @@ -171,10 +182,13 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
}
}
defer bs.strategy.MessageSent(p, message)

log.Debug("Returning message.")
return p, message
}

func (bs *bitswap) ReceiveError(err error) {
log.Errorf("Bitswap ReceiveError: %s", err)
// TODO log the network error
// TODO bubble the network error up to the parent context/error logger
}
Expand All @@ -187,10 +201,10 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
}

func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
log.Debug("Sending %v to peers that want it", block.Key())
log.Debugf("Sending %v to peers that want it", block.Key())
for _, p := range bs.strategy.Peers() {
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
log.Debug("%v wants %v", p, block.Key())
log.Debugf("%v wants %v", p, block.Key())
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
message := bsmsg.New()
message.AppendBlock(block)
Expand Down
7 changes: 7 additions & 0 deletions exchange/bitswap/network/net_message_adapter.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package network

import (
"errors"

context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/jbenet/go-ipfs/util"

bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
inet "github.com/jbenet/go-ipfs/net"
netmsg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
)

var log = util.Logger("net_message_adapter")

// NetMessageAdapter wraps a NetMessage network service
func NetMessageAdapter(s inet.Service, n inet.Network, r Receiver) Adapter {
adapter := impl{
Expand Down Expand Up @@ -48,6 +53,7 @@ func (adapter *impl) HandleMessage(

// TODO(brian): put this in a helper function
if bsmsg == nil || p == nil {
adapter.receiver.ReceiveError(errors.New("ReceiveMessage returned nil peer or message"))
return nil
}

Expand All @@ -57,6 +63,7 @@ func (adapter *impl) HandleMessage(
return nil
}

log.Debugf("Message size: %d", len(outgoing.Data()))
return outgoing
}

Expand Down
19 changes: 18 additions & 1 deletion merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package merkledag

import (
"fmt"
"time"

"code.google.com/p/go.net/context"

mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
blocks "github.com/jbenet/go-ipfs/blocks"
Expand Down Expand Up @@ -204,10 +207,24 @@ func (n *DAGService) Get(k u.Key) (*Node, error) {
return nil, fmt.Errorf("DAGService is nil")
}

b, err := n.Blocks.GetBlock(k)
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
b, err := n.Blocks.GetBlock(ctx, k)
if err != nil {
return nil, err
}

return Decoded(b.Data)
}

func (n *DAGService) Remove(nd *Node) error {
for _, l := range nd.Links {
if l.Node != nil {
n.Remove(l.Node)
}
}
k, err := nd.Key()
if err != nil {
return err
}
return n.Blocks.DeleteBlock(k)
}
13 changes: 12 additions & 1 deletion net/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
ChanBuffer = 10

// MaxMessageSize is the size of the largest single message
MaxMessageSize = 1 << 20 // 1 MB
MaxMessageSize = 1 << 22 // 4 MB

// HandshakeTimeout for when nodes first connect
HandshakeTimeout = time.Second * 5
Expand Down Expand Up @@ -97,6 +97,17 @@ func (c *singleConn) close() error {
return err
}

func (c *singleConn) GetError() error {
select {
case err := <-c.msgio.incoming.ErrChan:
return err
case err := <-c.msgio.outgoing.ErrChan:
return err
default:
return nil
}
}

// ID is an identifier unique to this connection.
func (c *singleConn) ID() string {
return ID(c)
Expand Down
2 changes: 1 addition & 1 deletion net/conn/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Handshake1(ctx context.Context, c Conn) error {
return fmt.Errorf("could not decode remote version: %q", err)
}

log.Debug("Received remote version (%s) from %s", remoteH, rpeer)
log.Debugf("Received remote version (%s) from %s", remoteH, rpeer)
}

if err := handshake.Handshake1Compatible(localH, remoteH); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions net/conn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Conn interface {
// Out returns a writable message channel
Out() chan<- []byte

GetError() error

// Close ends the connection
// Close() error -- already in ContextCloser
}
Expand Down
Loading