Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BLOCK-2121] merge 1.1.0 #8

Merged
merged 20 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
4ebd4d1
fix decoding panic by double checking variant type bounds (#97)
fschoell Nov 9, 2023
b8c3e68
update firehose-core to v0.2.1 (#99)
fschoell Nov 9, 2023
c0ad21c
bump firehose-core to v0.2.2 (#100)
fschoell Nov 10, 2023
9565470
add buf yaml (#101)
YaroShkvorets Nov 13, 2023
03a93ac
bump firehose-core to v0.2.3 (#102)
fschoell Nov 14, 2023
72f7699
bump firehose-core to v0.2.4 (#104)
fschoell Dec 6, 2023
e5800f8
remove unused proto dependency on generate (#106)
fschoell Dec 12, 2023
1647451
upgrade to firehose-core v1.0.0 (#107)
fschoell Dec 14, 2023
1de921f
add Silkworm poller for Antelope EVM (#108)
fschoell Dec 14, 2023
be58954
Bump github.com/libp2p/go-libp2p from 0.26.3 to 0.27.8 (#109)
dependabot[bot] Dec 18, 2023
deff673
add workaround for missing finalized query on eos evm (#110)
fschoell Dec 18, 2023
d12d908
enable multitests for the consolereader (#111)
fschoell Dec 22, 2023
a9bc45d
Bump golang.org/x/crypto from 0.14.0 to 0.17.0 (#112)
dependabot[bot] Dec 22, 2023
e6c7d5a
add check-blocks tool to check merged blocks for decoding issues (#114)
fschoell Jan 2, 2024
4309f0d
fix check-blocks starting at block 0 (#115)
fschoell Jan 2, 2024
a102c71
add block sanitize function for comparisons (#116)
fschoell Jan 3, 2024
c8fd828
add leap 5.0 test data (#117)
YaroShkvorets Jan 8, 2024
ac4205f
bump firehose-core to v1.1.0 (#118)
fschoell Jan 25, 2024
e87a04d
Merge branch 'upstream-1.1.0' into feature/BLOCK-2121-merge-1.1.0
Duncan-Ultra Jan 30, 2024
6b14fa2
update go mod
Duncan-Ultra Jan 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions blockfetcher/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package blockfetcher

import (
"context"
"fmt"
"sync"
"time"

"github.com/abourget/llerrgroup"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/eth-go/rpc"
pbeth "github.com/streamingfast/firehose-ethereum/types/pb/sf/ethereum/type/v2"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/timestamppb"
)

type ToEthBlock func(in *rpc.Block, receipts map[string]*rpc.TransactionReceipt) (*pbeth.Block, map[string]bool)

type BlockFetcher struct {
rpcClient *rpc.Client
latest uint64
latestBlockRetryInterval time.Duration
fetchInterval time.Duration
toEthBlock ToEthBlock
lastFetchAt time.Time
logger *zap.Logger
}

func NewBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch, latestBlockRetryInterval time.Duration, toEthBlock ToEthBlock, logger *zap.Logger) *BlockFetcher {
return &BlockFetcher{
rpcClient: rpcClient,
latestBlockRetryInterval: latestBlockRetryInterval,
toEthBlock: toEthBlock,
fetchInterval: intervalBetweenFetch,
logger: logger,
}
}

func (f *BlockFetcher) Fetch(ctx context.Context, blockNum uint64) (block *pbbstream.Block, err error) {
f.logger.Debug("fetching block", zap.Uint64("block_num", blockNum))
for f.latest < blockNum {
f.latest, err = f.rpcClient.LatestBlockNum(ctx)
if err != nil {
return nil, fmt.Errorf("fetching latest block num: %w", err)
}

f.logger.Info("got latest block", zap.Uint64("latest", f.latest), zap.Uint64("block_num", blockNum))

if f.latest < blockNum {
time.Sleep(f.latestBlockRetryInterval)
continue
}
break
}

sinceLastFetch := time.Since(f.lastFetchAt)
if sinceLastFetch < f.fetchInterval {
time.Sleep(f.fetchInterval - sinceLastFetch)
}

rpcBlock, err := f.rpcClient.GetBlockByNumber(ctx, rpc.BlockNumber(blockNum), rpc.WithGetBlockFullTransaction())
if err != nil {
return nil, fmt.Errorf("fetching block %d: %w", blockNum, err)
}

receipts, err := FetchReceipts(ctx, rpcBlock, f.rpcClient)
if err != nil {
return nil, fmt.Errorf("fetching receipts for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err)
}

f.logger.Debug("fetched receipts", zap.Int("count", len(receipts)))

f.lastFetchAt = time.Now()

if err != nil {
return nil, fmt.Errorf("fetching logs for block %d %q: %w", rpcBlock.Number, rpcBlock.Hash.Pretty(), err)
}

ethBlock, _ := f.toEthBlock(rpcBlock, receipts)
anyBlock, err := anypb.New(ethBlock)
if err != nil {
return nil, fmt.Errorf("create any block: %w", err)
}

return &pbbstream.Block{
Number: ethBlock.Number,
Id: ethBlock.GetFirehoseBlockID(),
ParentId: ethBlock.GetFirehoseBlockParentID(),
Timestamp: timestamppb.New(ethBlock.GetFirehoseBlockTime()),
LibNum: ethBlock.LIBNum(),
ParentNum: ethBlock.GetFirehoseBlockParentNumber(),
Payload: anyBlock,
}, nil
}

func FetchReceipts(ctx context.Context, block *rpc.Block, client *rpc.Client) (out map[string]*rpc.TransactionReceipt, err error) {
out = make(map[string]*rpc.TransactionReceipt)
lock := sync.Mutex{}

eg := llerrgroup.New(10)
for _, tx := range block.Transactions.Transactions {
if eg.Stop() {
continue // short-circuit the loop if we got an error
}
eg.Go(func() error {
receipt, err := client.TransactionReceipt(ctx, tx.Hash)
if err != nil {
return fmt.Errorf("fetching receipt for tx %q: %w", tx.Hash.Pretty(), err)
}
lock.Lock()
out[tx.Hash.Pretty()] = receipt
lock.Unlock()
return err
})
}

if err := eg.Wait(); err != nil {
return nil, err
}

return
}
29 changes: 29 additions & 0 deletions blockfetcher/silkworm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package blockfetcher

import (
"context"
"time"

"go.uber.org/zap"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/eth-go/rpc"
"github.com/streamingfast/firehose-ethereum/block"
)

type SilkwormBlockFetcher struct {
fetcher *BlockFetcher
}

func NewSilkwormBlockFetcher(rpcClient *rpc.Client, intervalBetweenFetch time.Duration, latestBlockRetryInterval time.Duration, logger *zap.Logger) *SilkwormBlockFetcher {
fetcher := NewBlockFetcher(rpcClient, intervalBetweenFetch, latestBlockRetryInterval, block.RpcToEthBlock, logger)
return &SilkwormBlockFetcher{
fetcher: fetcher,
}
}

func (f *SilkwormBlockFetcher) PollingInterval() time.Duration { return 1 * time.Second }

func (f *SilkwormBlockFetcher) Fetch(ctx context.Context, blockNum uint64) (*pbbstream.Block, error) {
return f.fetcher.Fetch(ctx, blockNum)
}
38 changes: 21 additions & 17 deletions cmd/fireantelope/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,29 @@ import (
"github.com/spf13/cobra"
"github.com/spf13/pflag"
firecore "github.com/streamingfast/firehose-core"
fhCmd "github.com/streamingfast/firehose-core/cmd"
"github.com/streamingfast/logging"
"github.com/streamingfast/node-manager/mindreader"
pbbstream "github.com/streamingfast/pbgo/sf/bstream/v1"
"go.uber.org/zap"
)

func init() {
firecore.UnsafePayloadKind = pbbstream.Protocol_EOS
func main() {
fhCmd.Main(Chain())
}

func main() {
firecore.Main(&firecore.Chain[*pbantelope.Block]{
var chain *firecore.Chain[*pbantelope.Block]

func Chain() *firecore.Chain[*pbantelope.Block] {
if chain != nil {
return chain
}

chain = &firecore.Chain[*pbantelope.Block]{
ShortName: "antelope",
LongName: "Antelope",
ExecutableName: "nodeos",
FullyQualifiedModule: "github.com/pinax-network/firehose-antelope",
Version: version,

Protocol: "EOS",
ProtocolVersion: 1,
FirstStreamableBlock: 2,

BlockFactory: func() firecore.Block { return new(pbantelope.Block) },
Expand All @@ -39,9 +42,7 @@ func main() {
// transform.ReceiptFilterMessageName: transform.BasicReceiptFilterFactory,
//},

ConsoleReaderFactory: func(lines chan string, blockEncoder firecore.BlockEncoder, logger *zap.Logger, tracer logging.Tracer) (mindreader.ConsolerReader, error) {
return codec.NewConsoleReader(lines, blockEncoder, logger, tracer)
},
ConsoleReaderFactory: codec.NewConsoleReader,

RegisterExtraStartFlags: func(flags *pflag.FlagSet) {
flags.String("reader-node-config-file", "", "Node configuration file, the file is copied inside the {data-dir}/reader/data folder Use {hostname} label to use short hostname in path")
Expand All @@ -53,27 +54,30 @@ func main() {
// ReaderNodeBootstrapperFactory: newReaderNodeBootstrapper,

Tools: &firecore.ToolsConfig[*pbantelope.Block]{
BlockPrinter: printBlock,

RegisterExtraCmd: func(chain *firecore.Chain[*pbantelope.Block], toolsCmd *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error {
RegisterExtraCmd: func(chain *firecore.Chain[*pbantelope.Block], parent *cobra.Command, zlog *zap.Logger, tracer logging.Tracer) error {
//toolsCmd.AddCommand(newToolsGenerateNodeKeyCmd(chain))
//toolsCmd.AddCommand(newToolsBackfillCmd(zlog))
parent.AddCommand(newPollerCmd(zlog, tracer))
parent.AddCommand(newSilkwormPollerCmd(zlog, tracer))
parent.AddCommand(newCheckBlocksCmd(zlog))

return nil
},

SanitizeBlockForCompare: sanitizeBlockForCompare,

//TransformFlags: map[string]*firecore.TransformFlag{
// "receipt-account-filters": {
// Description: "Comma-separated accounts to use as filter/index. If it contains a colon (:), it will be interpreted as <prefix>:<suffix> (each of which can be empty, ex: 'hello:' or ':world')",
// Parser: parseReceiptAccountFilters,
// },
//},
},
})
}

return chain
}

// Version value, injected via go build `ldflags` at build time
var version = "dev"

// Commit sha1 value, injected via go build `ldflags` at build time
var commit = ""
85 changes: 85 additions & 0 deletions cmd/fireantelope/poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package main

import (
"fmt"
"github.com/pinax-network/firehose-antelope/blockfetcher"
"path"
"strconv"
"time"

"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/eth-go/rpc"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/blockpoller"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func newPollerCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "poller",
Short: "poll blocks from different sources",
}

cmd.AddCommand(newSilkwormPollerCmd(logger, tracer))
return cmd
}

func newSilkwormPollerCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "silkworm <rpc-endpoint> <first-streamable-block>",
Short: "poll blocks from silkworm rpc",
Args: cobra.ExactArgs(2),
RunE: pollerRunE(logger, tracer),
}
cmd.Flags().Duration("interval-between-fetch", 0, "interval between fetch")

return cmd
}

func pollerRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()

rpcEndpoint := args[0]

dataDir := sflags.MustGetString(cmd, "data-dir")
stateDir := path.Join(dataDir, "poller-state")

logger.Info("launching firehose-antelope poller", zap.String("rpc_endpoint", rpcEndpoint), zap.String("data_dir", dataDir), zap.String("state_dir", stateDir))

rpcClient := rpc.NewClient(rpcEndpoint)

firstStreamableBlock, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return fmt.Errorf("unable to parse first streamable block %d: %w", firstStreamableBlock, err)
}

fetchInterval := sflags.MustGetDuration(cmd, "interval-between-fetch")

fetcher := blockfetcher.NewSilkwormBlockFetcher(rpcClient, fetchInterval, 1*time.Second, logger)
handler := blockpoller.NewFireBlockHandler("type.googleapis.com/sf.ethereum.type.v2.Block")
poller := blockpoller.New(fetcher, handler, blockpoller.WithStoringState(stateDir), blockpoller.WithLogger(logger))

// there is currently no support for rpc.FinalizedBlock on eos evm, so query the latest one and then pass
// latest - 200 as the latest finalized block
latestBlock, err := rpcClient.GetBlockByNumber(ctx, rpc.LatestBlock)
if err != nil {
return fmt.Errorf("getting latest block: %w", err)
}

latestFinalizedBlock, err := rpcClient.GetBlockByNumber(ctx, rpc.BlockNumber(uint64(latestBlock.Number-200)))
if err != nil {
return fmt.Errorf("getting latest finalized block: %w", err)
}

err = poller.Run(ctx, firstStreamableBlock, bstream.NewBlockRef(latestFinalizedBlock.Hash.String(), uint64(latestFinalizedBlock.Number)))
if err != nil {
return fmt.Errorf("running poller: %w", err)
}

return nil
}
}
38 changes: 0 additions & 38 deletions cmd/fireantelope/tools.go

This file was deleted.

Loading
Loading