diff --git a/dot/network/config.go b/dot/network/config.go index a3cf8e2a02..af178811c7 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -19,6 +19,7 @@ package network import ( "errors" "path" + "time" log "github.com/ChainSafe/log15" "github.com/libp2p/go-libp2p-core/crypto" @@ -93,6 +94,9 @@ type Config struct { // PublishMetrics enables collection of network metrics PublishMetrics bool + + // telemetryInterval how often to send telemetry metrics + telemetryInterval time.Duration } // build checks the configuration, sets up the private key for the network service, @@ -134,6 +138,11 @@ func (c *Config) build() error { c.logger.Warn("Bootstrap is enabled but no bootstrap nodes are defined") } + // set telemetryInterval to default + if c.telemetryInterval.Microseconds() == 0 { + c.telemetryInterval = time.Second * 5 + } + return nil } diff --git a/dot/network/host.go b/dot/network/host.go index d3920f7049..78445dc37c 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -27,6 +27,7 @@ import ( badger "github.com/ipfs/go-ds-badger2" "github.com/libp2p/go-libp2p" libp2phost "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/metrics" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peerstore" @@ -59,6 +60,7 @@ type host struct { cm *ConnManager ds *badger.Datastore messageCache *messageCache + bwc *metrics.BandwidthCounter } // newHost creates a host wrapper with a new libp2p host instance @@ -167,6 +169,8 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { return nil, err } + bwc := metrics.NewBandwidthCounter() + host := &host{ ctx: ctx, h: h, @@ -177,6 +181,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { ds: ds, persistentPeers: pps, messageCache: msgCache, + bwc: bwc, } cm.host = host @@ -305,8 +310,14 @@ func (h *host) writeToStream(s libp2pnetwork.Stream, msg Message) error { lenBytes := uint64ToLEB128(msgLen) encMsg = append(lenBytes, encMsg...) - _, err = s.Write(encMsg) - return err + sent, err := s.Write(encMsg) + if err != nil { + return err + } + + h.bwc.LogSentMessage(int64(sent)) + + return nil } // getOutboundStream returns the outbound message stream for the given peer or returns diff --git a/dot/network/service.go b/dot/network/service.go index d048e237db..bed9b7203e 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -26,11 +26,11 @@ import ( "time" gssmrmetrics "github.com/ChainSafe/gossamer/dot/metrics" + "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/services" - "github.com/ethereum/go-ethereum/metrics" - log "github.com/ChainSafe/log15" + "github.com/ethereum/go-ethereum/metrics" libp2pnetwork "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" @@ -89,6 +89,10 @@ type Service struct { noDiscover bool noMDNS bool noGossip bool // internal option + + // telemetry + telemetryInterval time.Duration + closeCh chan interface{} } // NewService creates a new network service from the configuration and message channels @@ -142,6 +146,8 @@ func NewService(cfg *Config) (*Service, error) { syncer: cfg.Syncer, notificationsProtocols: make(map[byte]*notificationsProtocol), lightRequest: make(map[peer.ID]struct{}), + telemetryInterval: cfg.telemetryInterval, + closeCh: make(chan interface{}), } network.syncQueue = newSyncQueue(network) @@ -245,6 +251,9 @@ func (s *Service) Start() error { } go s.logPeerCount() + go s.publishNetworkTelemetry(s.closeCh) + go s.sentBlockIntervalTelemetry() + return nil } @@ -278,6 +287,47 @@ func (s *Service) logPeerCount() { } } +func (s *Service) publishNetworkTelemetry(done chan interface{}) { + ticker := time.NewTicker(s.telemetryInterval) + defer ticker.Stop() + +main: + for { + select { + case <-done: + break main + + case <-ticker.C: + o := s.host.bwc.GetBandwidthTotals() + telemetry.GetInstance().SendNetworkData(telemetry.NewNetworkData(s.host.peerCount(), o.RateIn, o.RateOut)) + } + + } +} + +func (s *Service) sentBlockIntervalTelemetry() { + for { + best, err := s.blockState.BestBlockHeader() + if err != nil { + continue + } + finalized, err := s.blockState.GetFinalizedHeader(0, 0) //nolint + if err != nil { + continue + } + + telemetry.GetInstance().SendBlockIntervalData(&telemetry.BlockIntervalData{ + BestHash: best.Hash(), + BestHeight: best.Number, + FinalizedHash: finalized.Hash(), + FinalizedHeight: finalized.Number, + TXCount: 0, // todo (ed) determine where to get tx count + UsedStateCacheSize: 0, // todo (ed) determine where to get used_state_cache_size + }) + time.Sleep(s.telemetryInterval) + } +} + func (s *Service) handleConn(conn libp2pnetwork.Conn) { // give new peers a slight weight // TODO: do this once handshake is received @@ -343,6 +393,19 @@ func (s *Service) Stop() error { logger.Error("Failed to close host", "error", err) } + // check if closeCh is closed, if not, close it. +mainloop: + for { + select { + case _, hasMore := <-s.closeCh: + if !hasMore { + break mainloop + } + default: + close(s.closeCh) + } + } + return nil } @@ -524,6 +587,8 @@ func (s *Service) readStream(stream libp2pnetwork.Stream, decoder messageDecoder _ = stream.Close() return } + + s.host.bwc.LogRecvMessage(int64(tot)) } } diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 9c12659c42..8e94411cac 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" @@ -31,9 +32,9 @@ import ( // Handler struct for holding telemetry related things type Handler struct { - buf bytes.Buffer - wsConn []*websocket.Conn - telemetryLogger *log.Entry + buf bytes.Buffer + wsConn []*websocket.Conn + sync.RWMutex } // MyJSONFormatter struct for defining JSON Formatter @@ -65,6 +66,7 @@ func GetInstance() *Handler { } log.SetOutput(&handlerInstance.buf) log.SetFormatter(new(MyJSONFormatter)) + go handlerInstance.sender() }) } return handlerInstance @@ -76,7 +78,7 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { c, _, err := websocket.DefaultDialer.Dial(v.Endpoint, nil) if err != nil { fmt.Printf("Error %v\n", err) - return + continue } h.wsConn = append(h.wsConn, c) } @@ -96,25 +98,85 @@ type ConnectionData struct { // SendConnection sends connection request message to telemetry connection func (h *Handler) SendConnection(data *ConnectionData) { + h.Lock() + defer h.Unlock() payload := log.Fields{"authority": data.Authority, "chain": data.Chain, "config": "", "genesis_hash": data.GenesisHash, "implementation": data.SystemName, "msg": "system.connected", "name": data.NodeName, "network_id": data.NetworkID, "startup_time": data.StartTime, "version": data.SystemVersion} - h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) - h.telemetryLogger.Print() - h.sendTelemtry() + telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) + telemetryLogger.Print() } // SendBlockImport sends block imported message to telemetry connection func (h *Handler) SendBlockImport(bestHash string, height *big.Int) { + h.Lock() + defer h.Unlock() payload := log.Fields{"best": bestHash, "height": height.Int64(), "msg": "block.import", "origin": "NetworkInitialSync"} - h.telemetryLogger = log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) - h.telemetryLogger.Print() - h.sendTelemtry() + telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) + telemetryLogger.Print() } -func (h *Handler) sendTelemtry() { - for _, c := range h.wsConn { - _ = c.WriteMessage(websocket.TextMessage, h.buf.Bytes()) +// NetworkData struct to hold network data telemetry information +type NetworkData struct { + peers int + rateIn float64 + rateOut float64 +} + +// NewNetworkData creates networkData struct +func NewNetworkData(peers int, rateIn, rateOut float64) *NetworkData { + return &NetworkData{ + peers: peers, + rateIn: rateIn, + rateOut: rateOut, + } +} + +// SendNetworkData send network data system.interval message to telemetry connection +func (h *Handler) SendNetworkData(data *NetworkData) { + h.Lock() + defer h.Unlock() + payload := log.Fields{"bandwidth_download": data.rateIn, "bandwidth_upload": data.rateOut, "msg": "system.interval", "peers": data.peers} + telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) + telemetryLogger.Print() +} + +// BlockIntervalData struct to hold data for block system.interval message +type BlockIntervalData struct { + BestHash common.Hash + BestHeight *big.Int + FinalizedHash common.Hash + FinalizedHeight *big.Int + TXCount int + UsedStateCacheSize int +} + +// SendBlockIntervalData send block data system interval information to telemetry connection +func (h *Handler) SendBlockIntervalData(data *BlockIntervalData) { + h.Lock() + defer h.Unlock() + payload := log.Fields{"best": data.BestHash.String(), "finalized_hash": data.FinalizedHash.String(), // nolint + "finalized_height": data.FinalizedHeight, "height": data.BestHeight, "msg": "system.interval", "txcount": data.TXCount, // nolint + "used_state_cache_size": data.UsedStateCacheSize} + telemetryLogger := log.WithFields(log.Fields{"id": 1, "payload": payload, "ts": time.Now()}) + telemetryLogger.Print() +} + +func (h *Handler) sender() { + for { + h.RLock() + line, err := h.buf.ReadBytes(byte(10)) // byte 10 is newline character, used as delimiter + h.RUnlock() + if err != nil { + continue + } + + for _, c := range h.wsConn { + err := c.WriteMessage(websocket.TextMessage, line) + if err != nil { + // TODO (ed) determine how to handle this error + fmt.Printf("ERROR connecting to telemetry %v\n", err) + } + } } - h.buf.Reset() } diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index d8a505f6cc..3cdbed43e0 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -1,20 +1,23 @@ package telemetry import ( + "bytes" "log" "math/big" "net/http" "os" + "sort" + "sync" "testing" "time" + "github.com/ChainSafe/gossamer/lib/common" "github.com/ChainSafe/gossamer/lib/genesis" "github.com/gorilla/websocket" "github.com/stretchr/testify/require" ) var upgrader = websocket.Upgrader{} -var lastMessage []byte func TestMain(m *testing.M) { // start server to listen for websocket connections @@ -35,30 +38,74 @@ func TestMain(m *testing.M) { code := m.Run() os.Exit(code) } -func TestHandler_SendConnection(t *testing.T) { - expected := []byte(`{"id":1,"payload":{"authority":false,"chain":"chain","config":"","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","version":"version"},"ts":`) - data := &ConnectionData{ - Authority: false, - Chain: "chain", - GenesisHash: "hash", - SystemName: "systemName", - NodeName: "nodeName", - SystemVersion: "version", - NetworkID: "netID", - StartTime: "startTime", + +var resultCh chan []byte + +func TestHandler_SendMulti(t *testing.T) { + var wg sync.WaitGroup + wg.Add(4) + + resultCh = make(chan []byte) + + go func() { + GetInstance().SendConnection(&ConnectionData{ + Authority: false, + Chain: "chain", + GenesisHash: "hash", + SystemName: "systemName", + NodeName: "nodeName", + SystemVersion: "version", + NetworkID: "netID", + StartTime: "startTime", + }) + wg.Done() + }() + + go func() { + GetInstance().SendBlockImport("hash", big.NewInt(2)) + wg.Done() + }() + + go func() { + GetInstance().SendNetworkData(NewNetworkData(1, 2, 3)) + wg.Done() + }() + + go func() { + GetInstance().SendBlockIntervalData(&BlockIntervalData{ + BestHash: common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6"), + BestHeight: big.NewInt(32375), + FinalizedHash: common.MustHexToHash("0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2"), + FinalizedHeight: big.NewInt(32256), + TXCount: 2, + UsedStateCacheSize: 1886357, + }) + wg.Done() + }() + wg.Wait() + + expected1 := []byte(`{"id":1,"payload":{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1},"ts":`) + expected2 := []byte(`{"id":1,"payload":{"best":"hash","height":2,"msg":"block.import","origin":"NetworkInitialSync"},"ts":`) + expected3 := []byte(`{"id":1,"payload":{"authority":false,"chain":"chain","config":"","genesis_hash":"hash","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","version":"version"},"ts":`) + expected4 := []byte(`{"id":1,"payload":{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","txcount":2,"used_state_cache_size":1886357},"ts":`) // nolint + + expected := [][]byte{expected3, expected1, expected4, expected2} + + var actual [][]byte + for data := range resultCh { + actual = append(actual, data) + if len(actual) == 4 { + break + } } - GetInstance().SendConnection(data) - time.Sleep(time.Millisecond) - // note, we only check the first 234 bytes because the remaining bytes are the timestamp, which we can't estimate - require.Equal(t, expected, lastMessage[:234]) -} -func TestHandler_SendBlockImport(t *testing.T) { - expected := []byte(`{"id":1,"payload":{"best":"hash","height":2,"msg":"block.import","origin":"NetworkInitialSync"},"ts":`) - GetInstance().SendBlockImport("hash", big.NewInt(2)) - time.Sleep(time.Millisecond) - // note, we only check the first 101 bytes because the remaining bytes are the timestamp, which we can't estimate - require.Equal(t, expected, lastMessage[:101]) + sort.Slice(actual, func(i, j int) bool { + return bytes.Compare(actual[i], actual[j]) < 0 + }) + require.Contains(t, string(actual[0]), string(expected[0])) + require.Contains(t, string(actual[1]), string(expected[1])) + require.Contains(t, string(actual[2]), string(expected[2])) + require.Contains(t, string(actual[3]), string(expected[3])) } func listen(w http.ResponseWriter, r *http.Request) { @@ -73,6 +120,7 @@ func listen(w http.ResponseWriter, r *http.Request) { log.Printf("read err %v", err) break } - lastMessage = msg + + resultCh <- msg } }