From fbbfd94863168ad7c90090d19b6788b2e5c4379c Mon Sep 17 00:00:00 2001 From: Zakhar Petukhov Date: Thu, 19 Sep 2024 19:54:09 +0300 Subject: [PATCH 1/2] create a separate func implementation for sending blockchain messages --- cmd/api/main.go | 2 +- pkg/blockchain/msg_sender.go | 28 ++++++++++++---------------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/cmd/api/main.go b/cmd/api/main.go index ee1914c0..e27d1ad1 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -62,7 +62,7 @@ func main() { msgSender, err := blockchain.NewMsgSender(log, cfg.App.LiteServers, map[string]chan<- blockchain.ExtInMsgCopy{ "mempool": mempoolCh, - }) + }, nil) if err != nil { log.Fatal("failed to create msg sender", zap.Error(err)) } diff --git a/pkg/blockchain/msg_sender.go b/pkg/blockchain/msg_sender.go index f8e99483..fcba9fd4 100644 --- a/pkg/blockchain/msg_sender.go +++ b/pkg/blockchain/msg_sender.go @@ -7,10 +7,7 @@ import ( "sync" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/sourcegraph/conc/iter" - "github.com/tonkeeper/tongo" "github.com/tonkeeper/tongo/config" "github.com/tonkeeper/tongo/liteapi" @@ -31,6 +28,8 @@ type MsgSender struct { mu sync.Mutex // batches is used as a cache for boc multi-sending. batches []batchOfMessages + + send func(ctx context.Context, payload []byte, clients []*liteapi.Client) error } type batchOfMessages struct { @@ -50,15 +49,11 @@ type ExtInMsgCopy struct { Accounts map[tongo.AccountID]struct{} } -var liteserverMessageSendMc = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "liteserver_message_send", -}, []string{"server", "result", "iteration"}) - func (m *ExtInMsgCopy) IsEmulation() bool { return len(m.Accounts) > 0 } -func NewMsgSender(logger *zap.Logger, servers []config.LiteServer, receivers map[string]chan<- ExtInMsgCopy) (*MsgSender, error) { +func NewMsgSender(logger *zap.Logger, servers []config.LiteServer, receivers map[string]chan<- ExtInMsgCopy, send func(ctx context.Context, payload []byte, clients []*liteapi.Client) error) (*MsgSender, error) { var ( client *liteapi.Client clients []*liteapi.Client @@ -91,11 +86,14 @@ func NewMsgSender(logger *zap.Logger, servers []config.LiteServer, receivers map if len(clients) == 0 { return nil, fmt.Errorf("no lite clients available") } - + if send == nil { + send = simpleSend + } msgSender := &MsgSender{ sendingClients: clients, logger: logger, receivers: receivers, + send: send, } go func() { for { @@ -151,20 +149,18 @@ func (ms *MsgSender) SendMessage(ctx context.Context, msgCopy ExtInMsgCopy) erro ms.logger.Warn("receiver is too slow", zap.String("name", name)) } } - return ms.send(ctx, msgCopy.Payload) + return ms.send(ctx, msgCopy.Payload, ms.sendingClients) } -func (ms *MsgSender) send(ctx context.Context, payload []byte) error { +func simpleSend(ctx context.Context, payload []byte, clients []*liteapi.Client) error { var err error for i := 0; i < 3; i++ { - serverNumber := rand.Intn(len(ms.sendingClients)) - c := ms.sendingClients[serverNumber] + serverNumber := rand.Intn(len(clients)) + c := clients[serverNumber] _, err = c.SendMessage(ctx, payload) if err == nil { - liteserverMessageSendMc.WithLabelValues(fmt.Sprintf("%d", serverNumber), "success", fmt.Sprintf("%d", i)).Inc() return nil } - liteserverMessageSendMc.WithLabelValues(fmt.Sprintf("%d", serverNumber), "error", fmt.Sprintf("%d", i)).Inc() } return err } @@ -178,7 +174,7 @@ func (ms *MsgSender) sendMessageFromBatch(msgCopy ExtInMsgCopy) error { } ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() - return ms.send(ctx, msgCopy.Payload) + return ms.send(ctx, msgCopy.Payload, ms.sendingClients) } func (ms *MsgSender) SendMultipleMessages(ctx context.Context, copies []ExtInMsgCopy) { From 10b7287593c2a0178863dfa479d383adbde2ede5 Mon Sep 17 00:00:00 2001 From: Zakhar Petukhov Date: Fri, 20 Sep 2024 08:08:44 +0300 Subject: [PATCH 2/2] in working --- pkg/blockchain/msg_sender.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/blockchain/msg_sender.go b/pkg/blockchain/msg_sender.go index fcba9fd4..83456284 100644 --- a/pkg/blockchain/msg_sender.go +++ b/pkg/blockchain/msg_sender.go @@ -28,8 +28,6 @@ type MsgSender struct { mu sync.Mutex // batches is used as a cache for boc multi-sending. batches []batchOfMessages - - send func(ctx context.Context, payload []byte, clients []*liteapi.Client) error } type batchOfMessages struct { @@ -53,7 +51,7 @@ func (m *ExtInMsgCopy) IsEmulation() bool { return len(m.Accounts) > 0 } -func NewMsgSender(logger *zap.Logger, servers []config.LiteServer, receivers map[string]chan<- ExtInMsgCopy, send func(ctx context.Context, payload []byte, clients []*liteapi.Client) error) (*MsgSender, error) { +func NewMsgSender(logger *zap.Logger, servers []config.LiteServer, receivers map[string]chan<- ExtInMsgCopy) (*MsgSender, error) { var ( client *liteapi.Client clients []*liteapi.Client @@ -86,14 +84,10 @@ func NewMsgSender(logger *zap.Logger, servers []config.LiteServer, receivers map if len(clients) == 0 { return nil, fmt.Errorf("no lite clients available") } - if send == nil { - send = simpleSend - } msgSender := &MsgSender{ sendingClients: clients, logger: logger, receivers: receivers, - send: send, } go func() { for { @@ -149,14 +143,14 @@ func (ms *MsgSender) SendMessage(ctx context.Context, msgCopy ExtInMsgCopy) erro ms.logger.Warn("receiver is too slow", zap.String("name", name)) } } - return ms.send(ctx, msgCopy.Payload, ms.sendingClients) + return ms.send(ctx, msgCopy.Payload) } -func simpleSend(ctx context.Context, payload []byte, clients []*liteapi.Client) error { +func (ms *MsgSender) send(ctx context.Context, payload []byte) error { var err error for i := 0; i < 3; i++ { - serverNumber := rand.Intn(len(clients)) - c := clients[serverNumber] + serverNumber := rand.Intn(len(ms.sendingClients)) + c := ms.sendingClients[serverNumber] _, err = c.SendMessage(ctx, payload) if err == nil { return nil @@ -174,7 +168,7 @@ func (ms *MsgSender) sendMessageFromBatch(msgCopy ExtInMsgCopy) error { } ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) defer cancel() - return ms.send(ctx, msgCopy.Payload, ms.sendingClients) + return ms.send(ctx, msgCopy.Payload) } func (ms *MsgSender) SendMultipleMessages(ctx context.Context, copies []ExtInMsgCopy) { @@ -193,3 +187,11 @@ func (ms *MsgSender) SendMultipleMessages(ctx context.Context, copies []ExtInMsg func (ms *MsgSender) SendingClientsNumber() int { return len(ms.sendingClients) } + +func (ms *MsgSender) GetReceivers() map[string]chan<- ExtInMsgCopy { + return ms.receivers +} + +func (ms *MsgSender) GetClients() []*liteapi.Client { + return ms.sendingClients +}