diff --git a/core/chains/evm/txm/metrics.go b/core/chains/evm/txm/metrics.go index 5ccc711ef09..7ff0a3377e3 100644 --- a/core/chains/evm/txm/metrics.go +++ b/core/chains/evm/txm/metrics.go @@ -5,12 +5,15 @@ import ( "fmt" "math/big" + "github.com/ethereum/go-ethereum/common" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "go.opentelemetry.io/otel/metric" + "google.golang.org/protobuf/proto" "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/metrics" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txm/pb" ) var ( @@ -91,3 +94,27 @@ func (m *txmMetrics) RecordTimeUntilTxConfirmed(ctx context.Context, duration fl promTimeUntilTxConfirmed.WithLabelValues(m.chainID.String()).Observe(duration) m.timeUntilTxConfirmed.Record(ctx, duration) } + +func (m *txmMetrics) EmitTxMessage(ctx context.Context, tx common.Hash, fromAddress common.Address, toAddress common.Address, nonce uint64) error { + message := &pb.TxMessage{ + Hash: tx.String(), + FromAddress: fromAddress.String(), + ToAddress: toAddress.String(), + Nonce: nonce, + } + + messageBytes, err := proto.Marshal(message) + if err != nil { + return err + } + + err = beholder.GetEmitter().Emit( + ctx, + messageBytes, + "beholder_domain", "svr", + "beholder_entity", "TxMessage", + "beholder_data_schema", "/beholder-tx-message/versions/1", + ) + + return err +} diff --git a/core/chains/evm/txm/pb/beholder-tx-message.pb.go b/core/chains/evm/txm/pb/beholder-tx-message.pb.go new file mode 100644 index 00000000000..bd205b11478 --- /dev/null +++ b/core/chains/evm/txm/pb/beholder-tx-message.pb.go @@ -0,0 +1,159 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.35.2 +// protoc v5.29.1 +// source: pb/beholder-tx-message.proto + +package pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type TxMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Hash string `protobuf:"bytes,1,opt,name=Hash,proto3" json:"Hash,omitempty"` + FromAddress string `protobuf:"bytes,2,opt,name=FromAddress,proto3" json:"FromAddress,omitempty"` + ToAddress string `protobuf:"bytes,3,opt,name=ToAddress,proto3" json:"ToAddress,omitempty"` + Nonce uint64 `protobuf:"varint,4,opt,name=Nonce,proto3" json:"Nonce,omitempty"` +} + +func (x *TxMessage) Reset() { + *x = TxMessage{} + mi := &file_pb_beholder_tx_message_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *TxMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TxMessage) ProtoMessage() {} + +func (x *TxMessage) ProtoReflect() protoreflect.Message { + mi := &file_pb_beholder_tx_message_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TxMessage.ProtoReflect.Descriptor instead. +func (*TxMessage) Descriptor() ([]byte, []int) { + return file_pb_beholder_tx_message_proto_rawDescGZIP(), []int{0} +} + +func (x *TxMessage) GetHash() string { + if x != nil { + return x.Hash + } + return "" +} + +func (x *TxMessage) GetFromAddress() string { + if x != nil { + return x.FromAddress + } + return "" +} + +func (x *TxMessage) GetToAddress() string { + if x != nil { + return x.ToAddress + } + return "" +} + +func (x *TxMessage) GetNonce() uint64 { + if x != nil { + return x.Nonce + } + return 0 +} + +var File_pb_beholder_tx_message_proto protoreflect.FileDescriptor + +var file_pb_beholder_tx_message_proto_rawDesc = []byte{ + 0x0a, 0x1c, 0x70, 0x62, 0x2f, 0x62, 0x65, 0x68, 0x6f, 0x6c, 0x64, 0x65, 0x72, 0x2d, 0x74, 0x78, + 0x2d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, + 0x70, 0x62, 0x22, 0x75, 0x0a, 0x09, 0x54, 0x78, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, + 0x12, 0x0a, 0x04, 0x48, 0x61, 0x73, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x48, + 0x61, 0x73, 0x68, 0x12, 0x20, 0x0a, 0x0b, 0x46, 0x72, 0x6f, 0x6d, 0x41, 0x64, 0x64, 0x72, 0x65, + 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x46, 0x72, 0x6f, 0x6d, 0x41, 0x64, + 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x54, 0x6f, 0x41, 0x64, 0x64, 0x72, 0x65, + 0x73, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x54, 0x6f, 0x41, 0x64, 0x64, 0x72, + 0x65, 0x73, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x04, 0x52, 0x05, 0x4e, 0x6f, 0x6e, 0x63, 0x65, 0x42, 0x3e, 0x5a, 0x3c, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x6d, 0x61, 0x72, 0x74, 0x63, 0x6f, 0x6e, + 0x74, 0x72, 0x61, 0x63, 0x74, 0x6b, 0x69, 0x74, 0x2f, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x6c, 0x69, + 0x6e, 0x6b, 0x2d, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x62, 0x65, + 0x68, 0x6f, 0x6c, 0x64, 0x65, 0x72, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, +} + +var ( + file_pb_beholder_tx_message_proto_rawDescOnce sync.Once + file_pb_beholder_tx_message_proto_rawDescData = file_pb_beholder_tx_message_proto_rawDesc +) + +func file_pb_beholder_tx_message_proto_rawDescGZIP() []byte { + file_pb_beholder_tx_message_proto_rawDescOnce.Do(func() { + file_pb_beholder_tx_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_beholder_tx_message_proto_rawDescData) + }) + return file_pb_beholder_tx_message_proto_rawDescData +} + +var file_pb_beholder_tx_message_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_pb_beholder_tx_message_proto_goTypes = []any{ + (*TxMessage)(nil), // 0: pb.TxMessage +} +var file_pb_beholder_tx_message_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pb_beholder_tx_message_proto_init() } +func file_pb_beholder_tx_message_proto_init() { + if File_pb_beholder_tx_message_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pb_beholder_tx_message_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_pb_beholder_tx_message_proto_goTypes, + DependencyIndexes: file_pb_beholder_tx_message_proto_depIdxs, + MessageInfos: file_pb_beholder_tx_message_proto_msgTypes, + }.Build() + File_pb_beholder_tx_message_proto = out.File + file_pb_beholder_tx_message_proto_rawDesc = nil + file_pb_beholder_tx_message_proto_goTypes = nil + file_pb_beholder_tx_message_proto_depIdxs = nil +} diff --git a/core/chains/evm/txm/pb/beholder-tx-message.proto b/core/chains/evm/txm/pb/beholder-tx-message.proto new file mode 100644 index 00000000000..941058d0cf0 --- /dev/null +++ b/core/chains/evm/txm/pb/beholder-tx-message.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +option go_package = "github.com/smartcontractkit/chainlink-common/pkg/beholder/pb"; + +package pb; + +message TxMessage { + string Hash = 1; + string FromAddress = 2; + string ToAddress = 3; + uint64 Nonce = 4; +} \ No newline at end of file diff --git a/core/chains/evm/txm/txm.go b/core/chains/evm/txm/txm.go index bf53e00e81a..75ee2dcacbb 100644 --- a/core/chains/evm/txm/txm.go +++ b/core/chains/evm/txm/txm.go @@ -358,6 +358,10 @@ func (t *Txm) sendTransactionWithError(ctx context.Context, tx *types.Transactio } t.metrics.IncrementNumBroadcastedTxs(ctx) + if err = t.metrics.EmitTxMessage(ctx, attempt.Hash, address, tx.ToAddress, *tx.Nonce); err != nil { + t.lggr.Errorw("Beholder error emitting tx message", "err", err) + } + return t.txStore.UpdateTransactionBroadcast(ctx, attempt.TxID, *tx.Nonce, attempt.Hash, address) }