Skip to content

Commit

Permalink
Merge pull request #441 from jbenet/rtable-simplify
Browse files Browse the repository at this point in the history
remove multilayered routing table from the DHT (for now)
  • Loading branch information
whyrusleeping committed Dec 11, 2014
2 parents ba1a6c0 + 8cdf566 commit a62b239
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 62 deletions.
63 changes: 20 additions & 43 deletions routing/dht/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const doPinging = false
type IpfsDHT struct {
// Array of routing tables for differently distanced nodes
// NOTE: (currently, only a single table is used)
routingTables []*kb.RoutingTable
routingTable *kb.RoutingTable

// the network services we need
dialer inet.Dialer
Expand Down Expand Up @@ -80,10 +80,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, dialer inet.Dia
dht.providers = NewProviderManager(dht.Context(), p.ID())
dht.AddCloserChild(dht.providers)

dht.routingTables = make([]*kb.RoutingTable, 3)
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Minute)
dht.birth = time.Now()

dht.Validators = make(map[string]ValidatorFunc)
Expand Down Expand Up @@ -243,9 +240,9 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er
}

func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
key u.Key, level int) ([]byte, []peer.Peer, error) {
key u.Key) ([]byte, []peer.Peer, error) {

pmes, err := dht.getValueSingle(ctx, p, key, level)
pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
return nil, nil, err
}
Expand All @@ -265,7 +262,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,

// TODO decide on providers. This probably shouldn't be happening.
if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
val, err := dht.getFromPeerList(ctx, key, prv, level)
val, err := dht.getFromPeerList(ctx, key, prv)
if err != nil {
return nil, nil, err
}
Expand All @@ -292,9 +289,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,

// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
key u.Key, level int) (*pb.Message, error) {
key u.Key) (*pb.Message, error) {

pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level)
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
return dht.sendRequest(ctx, p, pmes)
}

Expand All @@ -303,7 +300,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
// one to get the value from? Or just connect to one at a time until we get a
// successful connection and request the value from it?
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
peerlist []*pb.Message_Peer, level int) ([]byte, error) {
peerlist []*pb.Message_Peer) ([]byte, error) {

for _, pinfo := range peerlist {
p, err := dht.ensureConnectedToPeer(ctx, pinfo)
Expand All @@ -312,7 +309,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
continue
}

pmes, err := dht.getValueSingle(ctx, p, key, level)
pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
log.Errorf("getFromPeers error: %s\n", err)
continue
Expand Down Expand Up @@ -379,47 +376,30 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
return dht.datastore.Put(key.DsKey(), data)
}

// Update signals to all routingTables to Update their last-seen status
// Update signals the routingTable to Update its last-seen status
// on the given peer.
func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) {
log.Event(ctx, "updatePeer", p)
removedCount := 0
for _, route := range dht.routingTables {
removed := route.Update(p)
// Only close the connection if no tables refer to this peer
if removed != nil {
removedCount++
}
}

// Only close the connection if no tables refer to this peer
// if removedCount == len(dht.routingTables) {
// dht.network.ClosePeer(p)
// }
// ACTUALLY, no, let's not just close the connection. it may be connected
// due to other things. it seems that we just need connection timeouts
// after some deadline of inactivity.
dht.routingTable.Update(p)
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
for _, table := range dht.routingTables {
p := table.Find(id)
if p != nil {
return p, table
}
p := dht.routingTable.Find(id)
if p != nil {
return p, dht.routingTable
}
return nil, nil
}

// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
return dht.sendRequest(ctx, p, pmes)
}

func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key) (*pb.Message, error) {
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
return dht.sendRequest(ctx, p, pmes)
}

Expand All @@ -446,11 +426,8 @@ func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.Peer

// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
level := pmes.GetClusterLevel()
cluster := dht.routingTables[level]

key := u.Key(pmes.GetKey())
closer := cluster.NearestPeers(kb.ConvertKey(key), count)
closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
return closer
}

Expand Down Expand Up @@ -537,7 +514,7 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
case <-tick:
id := make([]byte, 16)
rand.Read(id)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(u.Key(id)), 5)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(u.Key(id)), 5)
for _, p := range peers {
ctx, _ := context.WithTimeout(dht.Context(), time.Second*5)
err := dht.Ping(ctx, p)
Expand Down
2 changes: 1 addition & 1 deletion routing/dht/diag.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di.LifeSpan = time.Since(dht.birth)
di.Keys = nil // Currently no way to query datastore

for _, p := range dht.routingTables[0].ListPeers() {
for _, p := range dht.routingTable.ListPeers() {
d := connDiagInfo{p.GetLatency(), p.ID()}
di.Connections = append(di.Connections, d)
}
Expand Down
29 changes: 11 additions & 18 deletions routing/dht/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
return err
}

var peers []peer.Peer
for _, route := range dht.routingTables {
npeers := route.NearestPeers(kb.ConvertKey(key), KValue)
peers = append(peers, npeers...)
}
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)

query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
log.Debugf("%s PutValue qry part %v", dht.self, p)
Expand Down Expand Up @@ -71,9 +67,8 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
return val, nil
}

// get closest peers in the routing tables
routeLevel := 0
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
// get closest peers in the routing table
closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 {
log.Warning("Got no peers back from routing table!")
return nil, kb.ErrLookupFailure
Expand All @@ -82,7 +77,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
// setup the Query
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {

val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel)
val, peers, err := dht.getValueOrPeers(ctx, p, key)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -116,7 +111,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {

dht.providers.AddProvider(key, dht.self)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
if len(peers) == 0 {
return nil
}
Expand Down Expand Up @@ -166,7 +161,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
// setup the Query
query := newQuery(key, dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {

pmes, err := dht.findProvidersSingle(ctx, p, key, 0)
pmes, err := dht.findProvidersSingle(ctx, p, key)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -205,7 +200,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
return &dhtQueryResult{closerPeers: clpeers}, nil
})

peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
_, err := query.Run(ctx, peers)
if err != nil {
log.Errorf("FindProviders Query error: %s", err)
Expand Down Expand Up @@ -253,8 +248,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
return p, nil
}

routeLevel := 0
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure
}
Expand All @@ -270,7 +264,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
// setup the Query
query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {

pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -316,16 +310,15 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
peerchan := make(chan peer.Peer, asyncQueryBuffer)
peersSeen := map[string]peer.Peer{}

routeLevel := 0
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure
}

// setup the Query
query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {

pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a62b239

Please sign in to comment.