-
Notifications
You must be signed in to change notification settings - Fork 226
/
dht.go
214 lines (172 loc) · 6.06 KB
/
dht.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package dht
import (
"fmt"
"sync"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
ma "github.com/multiformats/go-multiaddr"
"github.com/plprobelab/go-kademlia/coord"
"github.com/plprobelab/go-kademlia/kad"
"github.com/plprobelab/go-kademlia/key"
"golang.org/x/exp/slog"
)
// DHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type DHT struct {
// host holds a reference to the underlying libp2p host
host host.Host
// cfg holds a reference to the DHT configuration struct
cfg *Config
// mode indicates the current mode the DHT operates in. This can differ from
// the desired mode if set to auto-client or auto-server. The desired mode
// can be configured via the Config struct.
modeMu sync.RWMutex
mode mode
// kad is a reference to the go-kademlia coordinator
kad *coord.Coordinator[key.Key256, ma.Multiaddr]
// rt holds a reference to the routing table implementation. This can be
// configured via the Config struct.
rt kad.RoutingTable[key.Key256, kad.NodeID[key.Key256]]
// backends
backends map[string]Backend
// log is a convenience accessor to the logging instance. It gets the value
// of the logger field from the configuration.
log *slog.Logger
// sub holds a subscription to the libp2p event bus. The DHT subscribes to
// these events in networkEventsSubscription and consumes them
// asynchronously in consumeNetworkEvents.
sub event.Subscription
}
// New constructs a new [DHT] for the given underlying host and with the given
// configuration. Use [DefaultConfig] to construct a configuration.
func New(h host.Host, cfg *Config) (*DHT, error) {
var err error
// check if the configuration is valid
if err = cfg.Validate(); err != nil {
return nil, fmt.Errorf("validate DHT config: %w", err)
}
d := &DHT{
host: h,
cfg: cfg,
log: cfg.Logger,
}
nid := nodeID(d.host.ID())
// Use the configured routing table if it was provided
if cfg.RoutingTable != nil {
d.rt = cfg.RoutingTable
} else if d.rt, err = DefaultRoutingTable(nid); err != nil {
return nil, fmt.Errorf("new trie routing table: %w", err)
}
if len(cfg.Backends) != 0 {
d.backends = cfg.Backends
} else if cfg.ProtocolID == ProtocolIPFS {
var dstore Datastore
if cfg.Datastore != nil {
dstore = cfg.Datastore
} else if dstore, err = InMemoryDatastore(); err != nil {
return nil, fmt.Errorf("new default datastore: %w", err)
}
pbeCfg := DefaultProviderBackendConfig()
pbeCfg.Logger = cfg.Logger
pbe, err := NewBackendProvider(h.Peerstore(), dstore, pbeCfg)
if err != nil {
return nil, fmt.Errorf("new provider backend: %w", err)
}
rbeCfg := DefaultRecordBackendConfig()
rbeCfg.Logger = cfg.Logger
d.backends = map[string]Backend{
"ipns": NewBackendIPNS(dstore, h.Peerstore(), rbeCfg),
"pk": NewBackendPublicKey(dstore, rbeCfg),
"providers": pbe,
}
}
// instantiate a new Kademlia DHT coordinator.
d.kad, err = coord.NewCoordinator[key.Key256, ma.Multiaddr](nid, nil, nil, d.rt, cfg.Kademlia)
if err != nil {
return nil, fmt.Errorf("new coordinator: %w", err)
}
// determine mode to start in
switch cfg.Mode {
case ModeOptClient, ModeOptAutoClient:
d.setClientMode()
case ModeOptServer, ModeOptAutoServer:
d.setServerMode()
default:
// should never happen because of the configuration validation above
return nil, fmt.Errorf("invalid dht mode %s", cfg.Mode)
}
// create subscription to various network events
d.sub, err = d.networkEventsSubscription()
if err != nil {
return nil, fmt.Errorf("failed subscribing to event bus: %w", err)
}
// consume these events asynchronously
go d.consumeNetworkEvents(d.sub)
return d, nil
}
// Close cleans up all resources associated with this DHT.
func (d *DHT) Close() error {
if err := d.sub.Close(); err != nil {
d.log.With("err", err).Debug("failed closing event bus subscription")
}
// TODO clean up backends
// kill all active streams using the DHT protocol.
for _, c := range d.host.Network().Conns() {
for _, s := range c.GetStreams() {
if s.Protocol() != d.cfg.ProtocolID {
continue
}
if err := s.Reset(); err != nil {
d.log.With("err", err).Debug("failed closing stream")
}
}
}
return nil
}
// setServerMode advertises (via libp2p identify updates) that we are able to
// respond to DHT queries for the configured protocol and sets the appropriate
// stream handler. This method is safe to call even if the DHT is already in
// server mode.
func (d *DHT) setServerMode() {
d.modeMu.Lock()
defer d.modeMu.Unlock()
d.log.Info("Activating DHT server mode")
d.mode = modeServer
d.host.SetStreamHandler(d.cfg.ProtocolID, d.streamHandler)
}
// setClientMode stops advertising (and rescinds advertisements via libp2p
// identify updates) that we are able to respond to DHT queries for the
// configured protocol and removes the registered stream handlers. We also kill
// all inbound streams that were utilizing the handled protocols. If we are
// already in client mode, this method is a no-op. This method is safe to call
// even if the DHT is already in client mode.
func (d *DHT) setClientMode() {
d.modeMu.Lock()
defer d.modeMu.Unlock()
d.log.Info("Activating DHT client mode")
d.mode = modeClient
d.host.RemoveStreamHandler(d.cfg.ProtocolID)
// kill all active inbound streams using the DHT protocol. Note that if we
// request something from a remote peer behind a NAT that succeeds with a
// connection reversal, the connection would be inbound but the stream would
// still be outbound and therefore not reset here.
for _, c := range d.host.Network().Conns() {
for _, s := range c.GetStreams() {
if s.Protocol() != d.cfg.ProtocolID {
continue
}
switch s.Stat().Direction {
case network.DirUnknown:
case network.DirInbound:
case network.DirOutbound:
// don't reset outbound connections because these are queries
// that we have initiated.
continue
}
if err := s.Reset(); err != nil {
d.log.With("err", err).Debug("failed closing stream")
}
}
}
}