Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

(Long-lived branch): Introspection – harvest data from go-libp2p-swarm #159

Closed
wants to merge 14 commits into from
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ require (
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-addr-util v0.0.1
github.com/libp2p/go-conn-security-multistream v0.1.0
github.com/libp2p/go-libp2p-core v0.3.0
github.com/libp2p/go-libp2p-core v0.3.1-0.20200210163958-6d6f8284b841
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.1.4
github.com/libp2p/go-libp2p-secio v0.2.1
Expand All @@ -18,6 +18,7 @@ require (
github.com/multiformats/go-multiaddr v0.2.0
github.com/multiformats/go-multiaddr-fmt v0.1.0
github.com/multiformats/go-multiaddr-net v0.1.2
github.com/stretchr/testify v1.4.0
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
)

Expand Down
75 changes: 14 additions & 61 deletions go.sum

Large diffs are not rendered by default.

219 changes: 219 additions & 0 deletions introspect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
package swarm

import (
"fmt"
"math"

"github.com/libp2p/go-libp2p-core/introspection"
introspect_pb "github.com/libp2p/go-libp2p-core/introspection/pb"
introspection_pb "github.com/libp2p/go-libp2p-core/introspection/pb"
"github.com/libp2p/go-libp2p-core/network"
)

// IntrospectTraffic introspects and returns total traffic stats for this swarm.
func (s *Swarm) IntrospectTraffic() (*introspection_pb.Traffic, error) {
if s.bwc == nil {
return nil, nil
}

metrics := s.bwc.GetBandwidthTotals()
t := &introspection_pb.Traffic{
TrafficIn: &introspection_pb.DataGauge{
CumBytes: uint64(metrics.TotalIn),
InstBw: uint64(metrics.RateIn),
},
TrafficOut: &introspection_pb.DataGauge{
CumBytes: uint64(metrics.TotalOut),
InstBw: uint64(metrics.RateOut),
},
}

return t, nil
}

// IntrospectConnections introspects & returns the swarm connections
func (s *Swarm) IntrospectConnections(q introspection.ConnectionQueryParams) ([]*introspection_pb.Connection, error) {
var conns []network.Conn

switch l := len(q.Include); l {
case 0:
conns = s.Conns()

default:
conns = make([]network.Conn, 0, l)
filter := make(map[string]struct{}, l)
for _, id := range q.Include {
filter[string(id)] = struct{}{}
}

for _, c := range s.Conns() {
if _, ok := filter[c.(*Conn).ID()]; !ok {
continue
}
conns = append(conns, c)
}
}

introspected := make([]*introspection_pb.Connection, 0, len(conns))

switch q.Output {
case introspection.QueryOutputFull:
for _, c := range conns {
ic := c.(*Conn).Introspect(s, q)
introspected = append(introspected, ic)
}

case introspection.QueryOutputList:
for _, c := range conns {
introspected = append(introspected, &introspection_pb.Connection{Id: c.(*Conn).ID()})
}

default:
return nil, fmt.Errorf("unexpected query type: %v", q.Output)
}

return introspected, nil
}

// IntrospectStreams processes a streams introspection query.
func (s *Swarm) IntrospectStreams(q introspection.StreamQueryParams) (*introspection_pb.StreamList, error) {
var streams []network.Stream

switch l := len(q.Include); l {
case 0:
for _, c := range s.Conns() {
for _, s := range c.GetStreams() {
streams = append(streams, s)
}
}

default:
streams = make([]network.Stream, 0, l)
filter := make(map[string]struct{}, l)
for _, id := range q.Include {
filter[string(id)] = struct{}{}
}

for _, c := range s.Conns() {
for _, s := range c.GetStreams() {
if _, ok := filter[s.(*Stream).ID()]; !ok {
continue
}
streams = append(streams, s)
}
}
}

switch q.Output {
case introspection.QueryOutputFull:
introspected := make([]*introspection_pb.Stream, 0, len(streams))
for _, st := range streams {
is := st.(*Stream).Introspect(s, q)
introspected = append(introspected, is)
}
return &introspection_pb.StreamList{Streams: introspected}, nil

case introspection.QueryOutputList:
introspected := make([]string, 0, len(streams))
for _, st := range streams {
introspected = append(introspected, st.(*Stream).ID())
}
return &introspection_pb.StreamList{StreamIds: introspected}, nil
}

return nil, fmt.Errorf("unexpected query type: %v", q.Output)
}

func (c *Conn) Introspect(s *Swarm, q introspection.ConnectionQueryParams) *introspect_pb.Connection {
stat := c.Stat()
res := &introspection_pb.Connection{
Id: c.ID(),
Status: introspection_pb.Status_ACTIVE,
PeerId: c.RemotePeer().Pretty(),
Endpoints: &introspection_pb.EndpointPair{
SrcMultiaddr: c.LocalMultiaddr().String(),
DstMultiaddr: c.RemoteMultiaddr().String(),
},
Role: translateRole(stat),
Timeline: &introspection_pb.Connection_Timeline{
OpenTs: &stat.Opened,
UpgradedTs: &stat.Opened,
// TODO ClosedTs, UpgradedTs.
},
}

// TODO this is a per-peer, not a per-conn measurement. In the future, when
// we have multiple connections per peer, this will produce inaccurate
// numbers.
// Also, we do not record stream-level stats.
if s.bwc != nil {
bw := s.bwc.GetBandwidthForPeer(c.RemotePeer())
res.Traffic = &introspect_pb.Traffic{
// TODO we don't have packet I/O stats.
TrafficIn: &introspect_pb.DataGauge{
CumBytes: uint64(bw.TotalIn),
InstBw: uint64(math.Round(bw.RateIn)),
},
TrafficOut: &introspect_pb.DataGauge{
CumBytes: uint64(bw.TotalOut),
InstBw: uint64(math.Round(bw.RateOut)),
},
}
}

// TODO I don't think we pin the multiplexer and the secure channel we've
// negotiated anywhere.
res.Attribs = &introspect_pb.Connection_Attributes{}

// TODO can we get the transport ID from the multiaddr?
res.TransportId = "unknown"

// TODO there's the ping protocol, but that's higher than this layer.
// How do we source this? We may need some kind of latency manager.
res.LatencyNs = 0

c.streams.Lock()
sids := make([]string, 0, len(c.streams.m))
for s := range c.streams.m {
sids = append(sids, s.ID())
}
c.streams.Unlock()

res.Streams = &introspection_pb.StreamList{StreamIds: sids}

return res
}

func (s *Stream) Introspect(sw *Swarm, q introspection.StreamQueryParams) *introspect_pb.Stream {
stat := s.Stat()
res := &introspection_pb.Stream{
Id: s.ID(),
Status: introspect_pb.Status_ACTIVE,
Conn: &introspect_pb.Stream_ConnectionRef{
Connection: &introspection_pb.Stream_ConnectionRef_ConnId{
ConnId: s.conn.ID(),
},
},
Protocol: string(s.Protocol()),
Role: translateRole(stat),
Timeline: &introspect_pb.Stream_Timeline{
OpenTs: &stat.Opened,
// TODO CloseTs.
},
// TODO Traffic: we are not tracking per-stream traffic stats at the
// moment.
}

return res
}

func translateRole(stat network.Stat) introspect_pb.Role {
switch stat.Direction {
case network.DirInbound:
return introspect_pb.Role_RESPONDER
case network.DirOutbound:
return introspect_pb.Role_INITIATOR
default:
return 99 // TODO placeholder value
}
}
133 changes: 133 additions & 0 deletions introspect_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package swarm_test

import (
"context"
"testing"

"github.com/libp2p/go-libp2p-core/introspection"
introspection_pb "github.com/libp2p/go-libp2p-core/introspection/pb"
"github.com/libp2p/go-libp2p-core/protocol"
swarm "github.com/libp2p/go-libp2p-swarm"

"github.com/stretchr/testify/require"
)

func TestConnsAndStreamIntrospect(t *testing.T) {
ctx := context.Background()
swarms := makeSwarms(ctx, t, 2)
connectSwarms(t, ctx, []*swarm.Swarm{swarms[0], swarms[1]})

// ----- Swarm 1 opens TWO streams to Swarm 2
pid1 := protocol.ID("1")
pid2 := protocol.ID("2")
s1, err := swarms[0].NewStream(ctx, swarms[1].LocalPeer())
s1.SetProtocol(pid1)
require.NoError(t, err)
s2, err := swarms[0].NewStream(ctx, swarms[1].LocalPeer())
require.NoError(t, err)
s2.SetProtocol(pid2)

// send 4 bytes on stream 1 & 5 bytes on stream 2
msg1 := "abcd"
msg2 := "12345"
_, err = s1.Write([]byte(msg1))
require.NoError(t, err)
_, err = s2.Write([]byte(msg2))
require.NoError(t, err)

// wait for the metrics to kick in
for {
cis, err := swarms[0].IntrospectConnections(introspection.ConnectionQueryParams{Output: introspection.QueryOutputFull})
require.NoError(t, err)

if cis[0].Traffic.TrafficOut.CumBytes != 0 {
break
}
}

// ----- Introspect Swarm 1
cis, err := swarms[0].IntrospectConnections(introspection.ConnectionQueryParams{Output: introspection.QueryOutputFull})
require.NoError(t, err)

// connection checks
require.Len(t, cis, 1)
require.Len(t, cis[0].Streams.StreamIds, 2)
require.NotEmpty(t, cis[0].Id)
require.Equal(t, swarms[1].LocalPeer().String(), cis[0].PeerId)
require.Equal(t, introspection_pb.Status_ACTIVE, cis[0].Status)
require.Equal(t, introspection_pb.Role_INITIATOR, cis[0].Role)
require.Equal(t, swarms[0].Conns()[0].LocalMultiaddr().String(), cis[0].Endpoints.SrcMultiaddr)
require.Equal(t, swarms[0].Conns()[0].RemoteMultiaddr().String(), cis[0].Endpoints.DstMultiaddr)
require.True(t, int(cis[0].Traffic.TrafficOut.CumBytes) == len(msg1)+len(msg2))

// verify we get connectionIds correctly
cids, err := swarms[0].IntrospectConnections(introspection.ConnectionQueryParams{Output: introspection.QueryOutputList})
require.NoError(t, err)
require.Len(t, cids, 1)
require.NotEmpty(t, cids[0].Id)
require.Empty(t, cids[0].PeerId)

// verify we get the same result if we pass in the connection Ids
cs, err := swarms[0].IntrospectConnections(introspection.ConnectionQueryParams{introspection.QueryOutputFull,
[]introspection.ConnectionID{introspection.ConnectionID(cis[0].Id)}})
require.NoError(t, err)
require.Len(t, cs, 1)
require.Equal(t, cis[0].PeerId, cs[0].PeerId)
require.Equal(t, cis[0].Id, cs[0].Id)

// fetch streams by reading Ids from connection
var sids []introspection.StreamID
for _, s := range cis[0].Streams.StreamIds {
sids = append(sids, introspection.StreamID(s))
}

// Now, introspect Streams
sl, err := swarms[0].IntrospectStreams(introspection.StreamQueryParams{introspection.QueryOutputFull, sids})
require.Len(t, sl.Streams, 2)
require.NoError(t, err)

// map stream to protocols
protocolToStream := make(map[string]*introspection_pb.Stream)
for _, s := range sl.Streams {
protocolToStream[s.Protocol] = s
}

// introspect stream 1
stream1 := protocolToStream["1"]
require.NotEmpty(t, stream1)
require.Equal(t, "1", stream1.Protocol)
require.Equal(t, introspection_pb.Role_INITIATOR, stream1.Role)
require.Equal(t, introspection_pb.Status_ACTIVE, stream1.Status)
require.NotEmpty(t, stream1.Id)
// require.True(t, len(msg1) == int(stream1.Traffic.TrafficOut.CumBytes))
// require.True(t, 0 == int(stream1.Traffic.TrafficIn.CumBytes))

// introspect stream 2
stream2 := protocolToStream["2"]
require.NotEmpty(t, stream2)
require.Equal(t, "2", stream2.Protocol)
require.Equal(t, introspection_pb.Role_INITIATOR, stream2.Role)
require.Equal(t, introspection_pb.Status_ACTIVE, stream2.Status)
require.NotEmpty(t, stream2.Id)
require.NotEqual(t, stream2.Id, stream1.Id)
// require.True(t, len(msg2) == int(stream2.Traffic.TrafficOut.CumBytes))
// require.True(t, 0 == int(stream2.Traffic.TrafficIn.CumBytes))

// Assert query ONLY for streaIds
streamList, err := swarms[0].IntrospectStreams(introspection.StreamQueryParams{Output: introspection.QueryOutputList})
require.NoError(t, err)
require.Len(t, streamList.Streams, 0)
require.Len(t, streamList.StreamIds, 2)

// reset stream 1 & verify
require.NoError(t, s1.Reset())
cis, err = swarms[0].IntrospectConnections(introspection.ConnectionQueryParams{Output: introspection.QueryOutputFull})
require.NoError(t, err)
require.Len(t, cis[0].Streams.StreamIds, 1)

// introspect traffic
tr, err := swarms[0].IntrospectTraffic()
require.NoError(t, err)
require.True(t, tr.TrafficOut.CumBytes == uint64(len(msg1)+len(msg2)))
require.True(t, tr.TrafficIn.CumBytes == 0)
}
Loading