Skip to content

Commit

Permalink
feat: purge peer connections and information (#194)
Browse files Browse the repository at this point in the history
* feat: purge peer connections and information

Connections to a specific peer, or to all peers, can be closed and the peer information removed from the peer store. This can be useful to help determine if the presence/absence of a connection to a peer is affecting behavior. Be aware that purging a connection is inherently racey as it is possible for the peer to reestablish a connection at any time following a purge.

* Output peers list in json
* Do not export mgr handler functions
* Update README.md
  • Loading branch information
gammazero authored Nov 4, 2024
1 parent 6380341 commit 54dbdda
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The following emojis are used to highlight certain changes:

### Added

- Added endpoints to show and purge connected peers [#194](https://github.com/ipfs/rainbow/pull/194)

### Changed

### Removed
Expand Down
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ possible to dynamically modify the logging at runtime.
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/log/level?subsystem=<system name or * for all system>&level=<level>` will set the logging level for a subsystem
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/log/ls` will return a comma separated list of available logging subsystems

## Purging Peer Connections

Connections to a specific peer, or to all peers, can be closed and the peer information removed from the peer store. This can be useful to help determine if the presence/absence of a connection to a peer is affecting behavior. Be aware that purging a connection is inherently racey as it is possible for the peer to reestablish a connection at any time following a purge.

If `RAINBOW_DHT_SHARED_HOST=false` this endpoint will not show peers connected to DHT host, and only list ones used for Bitswap.

- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=<peer_id>` purges connection and info for peer identifid by peer_id
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/purge?peer=all` purges connections and info for all peers
- `http://$RAINBOW_CTL_LISTEN_ADDRESS/mgr/peers` returns a list of currently connected peers

Example cURL commmand to show connected peers and purge peer connection:

curl http://127.0.0.1:8091/mgr/peers
curl http://127.0.0.1:8091/mgr/purge?peer=QmQzqxhK82kAmKvARFZSkUVS6fo9sySaiogAnx5EnZ6ZmC

## Tracing

See [docs/tracing.md](docs/tracing.md).
Expand Down
117 changes: 116 additions & 1 deletion handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/ipfs/boxo/blockstore"
leveldb "github.com/ipfs/go-ds-leveldb"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

_ "embed"
_ "net/http/pprof"
Expand Down Expand Up @@ -72,7 +74,7 @@ func addLogHandlers(mux *http.ServeMux) {
})
}

func GCHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) {
func gcHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

Expand All @@ -92,6 +94,119 @@ func GCHandler(gnd *Node) func(w http.ResponseWriter, r *http.Request) {
}
}

func purgePeerHandler(p2pHost host.Host) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

q := r.URL.Query()
peerIDStr := q.Get("peer")
if peerIDStr == "" {
http.Error(w, "missing peer id", http.StatusBadRequest)
return
}

if peerIDStr == "all" {
purgeCount, err := purgeAllConnections(p2pHost)
if err != nil {
goLog.Errorw("Error closing all libp2p connections", "err", err)
http.Error(w, "error closing connections", http.StatusInternalServerError)
return
}
goLog.Infow("Purged connections", "count", purgeCount)

w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprintln(w, "Peer connections purged:", purgeCount)
return
}

peerID, err := peer.Decode(peerIDStr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

err = purgeConnection(p2pHost, peerID)
if err != nil {
goLog.Errorw("Error closing libp2p connection", "err", err, "peer", peerID)
http.Error(w, "error closing connection", http.StatusInternalServerError)
return
}
goLog.Infow("Purged connection", "peer", peerID)

w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprintln(w, "Purged connection to peer", peerID)
}
}

func purgeConnection(p2pHost host.Host, peerID peer.ID) error {
peerStore := p2pHost.Peerstore()
if peerStore != nil {
peerStore.RemovePeer(peerID)
peerStore.ClearAddrs(peerID)
}
return p2pHost.Network().ClosePeer(peerID)
}

func purgeAllConnections(p2pHost host.Host) (int, error) {
net := p2pHost.Network()
peers := net.Peers()

peerStore := p2pHost.Peerstore()
if peerStore != nil {
for _, peerID := range peers {
peerStore.RemovePeer(peerID)
peerStore.ClearAddrs(peerID)
}
}

var errCount, purgeCount int
for _, peerID := range peers {
err := net.ClosePeer(peerID)
if err != nil {
goLog.Errorw("Closing libp2p connection", "err", err, "peer", peerID)
errCount++
} else {
purgeCount++
}
}

if errCount != 0 {
return 0, fmt.Errorf("error closing connections to %d peers", errCount)
}

return purgeCount, nil
}

func showPeersHandler(p2pHost host.Host) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

w.Header().Set("Content-Type", "application/json; charset=utf-8")

peers := p2pHost.Network().Peers()
body := struct {
Count int64
Peers []string
}{
Count: int64(len(peers)),
}

if len(peers) != 0 {
peerStrs := make([]string, len(peers))
for i, peerID := range peers {
peerStrs[i] = peerID.String()
}
body.Peers = peerStrs
}

enc := json.NewEncoder(w)
if err := enc.Encode(body); err != nil {
goLog.Errorw("cannot write response", "err", err)
http.Error(w, "", http.StatusInternalServerError)
}
}
}

func withConnect(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// ServeMux does not support requests with CONNECT method,
Expand Down
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,9 @@ share the same seed as long as the indexes are different.
otel.SetTextMapPropagator(autoprop.NewTextMapPropagator())

apiMux := makeMetricsAndDebuggingHandler()
apiMux.HandleFunc("/mgr/gc", GCHandler(gnd))
apiMux.HandleFunc("/mgr/gc", gcHandler(gnd))
apiMux.HandleFunc("/mgr/purge", purgePeerHandler(gnd.host))
apiMux.HandleFunc("/mgr/peers", showPeersHandler(gnd.host))
addLogHandlers(apiMux)

apiSrv := &http.Server{
Expand Down

0 comments on commit 54dbdda

Please sign in to comment.