Skip to content

Commit

Permalink
Merge pull request #73 from streamingfast/feature/first_streamable_bl…
Browse files Browse the repository at this point in the history
…ock_refactor

feature/first streamable block refactor
  • Loading branch information
sduchesneau authored Dec 16, 2024
2 parents fbf82d0 + f9e8183 commit ad5559c
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 249 deletions.
107 changes: 42 additions & 65 deletions block/fetcher/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type fetchBlock func(ctx context.Context, requestedSlot uint64) (slot uint64, ou

type RPCFetcher struct {
rpcClients *firecoreRPC.Clients[*rpc.Client]
optimizeForSingleTarget bool
latestConfirmedSlot uint64
latestFinalizedSlot uint64
latestBlockRetryInterval time.Duration
Expand All @@ -46,11 +45,9 @@ type RPCFetcher struct {
logger *zap.Logger
}

func NewRPC(rpcClients *firecoreRPC.Clients[*rpc.Client], fetchInterval time.Duration, latestBlockRetryInterval time.Duration, optimizeForSingleTarget bool, isMainnet bool, logger *zap.Logger) *RPCFetcher {
func NewRPC(fetchInterval time.Duration, latestBlockRetryInterval time.Duration, isMainnet bool, logger *zap.Logger) *RPCFetcher {
f := &RPCFetcher{
rpcClients: rpcClients,
fetchInterval: fetchInterval,
optimizeForSingleTarget: optimizeForSingleTarget,
latestBlockRetryInterval: latestBlockRetryInterval,
isMainnet: isMainnet,
logger: logger,
Expand All @@ -63,48 +60,48 @@ func (f *RPCFetcher) IsBlockAvailable(requestedSlot uint64) bool {
return requestedSlot <= f.latestConfirmedSlot
}

func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbstream.Block, skip bool, err error) {
func (f *RPCFetcher) FetchSortValue(ctx context.Context, client *rpc.Client) (sortValue uint64, err error) {
num, err := client.GetSlot(ctx, rpc.CommitmentConfirmed)
if err != nil {
return 0, fmt.Errorf("fetching head block num: %w", err)
}
return num, nil
}

func (f *RPCFetcher) Fetch(ctx context.Context, client *rpc.Client, requestedSlot uint64) (b *pbbstream.Block, skipped bool, err error) {
if f.isMainnet && requestedSlot >= 13334464 && requestedSlot <= 13334475 {
// know issue fetching these blocks on mainnet, ugly but works
return nil, true, nil
}

sleepDuration := time.Duration(0)
_, err = firecoreRPC.WithClients(f.rpcClients, func(client *rpc.Client) (na interface{}, err error) {

for f.latestConfirmedSlot < requestedSlot {
time.Sleep(sleepDuration)
f.latestConfirmedSlot, err = client.GetSlot(ctx, rpc.CommitmentConfirmed)
if err != nil {
return nil, fmt.Errorf("fetching latestConfirmedSlot block num: %w", err)
}

f.logger.Info("got latest confirmed slot block", zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot), zap.Uint64("requested_block_num", requestedSlot))
//
if f.latestConfirmedSlot >= requestedSlot {
break
}
sleepDuration = f.latestBlockRetryInterval
for f.latestConfirmedSlot < requestedSlot {
time.Sleep(sleepDuration)
f.latestConfirmedSlot, err = client.GetSlot(ctx, rpc.CommitmentConfirmed)
if err != nil {
return nil, false, fmt.Errorf("fetching latestConfirmedSlot block num: %w", err)
}

if f.latestFinalizedSlot < requestedSlot {
f.latestFinalizedSlot, err = client.GetSlot(ctx, rpc.CommitmentFinalized)
if err != nil {
return nil, fmt.Errorf("fetching latest finalized Slot block num: %w", err)
}
f.logger.Info("got latest finalized slot block", zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("requested_block_num", requestedSlot))
f.logger.Info("got latest confirmed slot block", zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot), zap.Uint64("requested_block_num", requestedSlot))
//
if f.latestConfirmedSlot >= requestedSlot {
break
}
sleepDuration = f.latestBlockRetryInterval
}

return nil, nil
})

if err != nil {
return nil, false, err
if f.latestFinalizedSlot < requestedSlot {
f.latestFinalizedSlot, err = client.GetSlot(ctx, rpc.CommitmentFinalized)
if err != nil {
return nil, false, fmt.Errorf("fetching latest finalized Slot block num: %w", err)
}
f.logger.Info("got latest finalized slot block", zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("requested_block_num", requestedSlot))
}

f.logger.Info("fetcher fetching block", zap.Uint64("block_num", requestedSlot), zap.Uint64("latest_finalized_slot", f.latestFinalizedSlot), zap.Uint64("latest_confirmed_slot", f.latestConfirmedSlot))

blockResult, skip, err := f.fetch(ctx, requestedSlot, f.latestConfirmedSlot)
blockResult, skip, err := f.fetch(ctx, client, requestedSlot, f.latestConfirmedSlot)
if err != nil {
return nil, false, fmt.Errorf("fetching block %d: %w", requestedSlot, err)
}
Expand All @@ -126,48 +123,28 @@ func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbs
return block, false, nil
}

func (f *RPCFetcher) fetch(ctx context.Context, requestedSlot uint64, lastConfirmBlockNum uint64) (*rpc.GetBlockResult, bool, error) {
currentSlot := requestedSlot
var lastErrorPrintedAt time.Time
func (f *RPCFetcher) fetch(ctx context.Context, client *rpc.Client, requestedSlot uint64, lastConfirmBlockNum uint64) (*rpc.GetBlockResult, bool, error) {
f.logger.Info("calling GetBlockWithOptions", zap.String("endpoints", fmt.Sprintf("%s", client)))
blockResult, err := client.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts)

for {
out, err := firecoreRPC.WithClients(f.rpcClients, func(client *rpc.Client) (*rpc.GetBlockResult, error) {
f.logger.Info("calling GetBlockWithOptions", zap.String("endpoints", fmt.Sprintf("%s", client)))
blockResult, err := client.GetBlockWithOpts(ctx, currentSlot, GetBlockOpts)
return blockResult, err
})
if err != nil {
var rpcErr *jsonrpc.RPCError
if errors.As(err, &rpcErr) {

if err != nil {
var rpcErr *jsonrpc.RPCError
if errors.As(err, &rpcErr) {

if rpcErr.Code == -32009 || rpcErr.Code == -32007 {
f.logger.Info("fetcher block was skipped", zap.Uint64("block_num", currentSlot), zap.Int("rpc_error_code", rpcErr.Code))
return nil, true, nil
}

if rpcErr.Code == -32004 {
if f.optimizeForSingleTarget && currentSlot < lastConfirmBlockNum {
f.logger.Info("fetcher block was supposedly skipped", zap.Uint64("block_num", currentSlot))
return nil, true, nil
}

f.logger.Warn("block not available. trying same block", zap.Uint64("block_num", currentSlot))
continue
}
if rpcErr.Code == -32009 || rpcErr.Code == -32007 {
f.logger.Info("fetcher block was skipped", zap.Uint64("block_num", requestedSlot))
return nil, true, nil
}

if lastErrorPrintedAt.IsZero() || time.Since(lastErrorPrintedAt) > 30*time.Second {
f.logger.Warn("error getting block", zap.Uint64("block_num", currentSlot), zap.Error(err))
lastErrorPrintedAt = time.Now()
if rpcErr.Code == -32004 {
return nil, false, fmt.Errorf("block not available %d", requestedSlot)
}

//we retry forever!
continue
}

return out, false, nil
return nil, false, fmt.Errorf("getting block %d: %w", requestedSlot, err)
}

return blockResult, false, nil
}

func blockFromBlockResult(slot uint64, finalizedSlot uint64, result *rpc.GetBlockResult, logger *zap.Logger) (*pbbstream.Block, error) {
Expand Down
3 changes: 0 additions & 3 deletions cmd/firesol/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"fmt"
"os"
"time"

"github.com/spf13/cobra"
"github.com/streamingfast/firehose-core/cmd/tools"
Expand Down Expand Up @@ -39,8 +38,6 @@ func newFetchCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
Short: "fetch blocks from different sources",
Args: cobra.ExactArgs(2),
}
time.Now().UnixMilli()
cmd.AddCommand(rpc.NewFetchCmd(logger, tracer))
cmd.AddCommand(rpc.NewNextBlockCmd(logger, tracer))
return cmd
}
77 changes: 0 additions & 77 deletions cmd/firesol/rpc/check.go

This file was deleted.

38 changes: 20 additions & 18 deletions cmd/firesol/rpc/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (
"strconv"
"time"

firecoreRPC "github.com/streamingfast/firehose-core/rpc"
"github.com/spf13/pflag"

"github.com/gagliardetto/solana-go/rpc"
"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/blockpoller"
firecoreRPC "github.com/streamingfast/firehose-core/rpc"
"github.com/streamingfast/firehose-solana/block/fetcher"
"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand All @@ -29,17 +30,16 @@ func NewFetchCmd(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd.Flags().String("state-dir", "/data/poller", "interval between fetch")
cmd.Flags().Duration("interval-between-fetch", 0, "interval between fetch")
cmd.Flags().Duration("latest-block-retry-interval", time.Second, "interval between fetch")
cmd.Flags().Duration("max-block-fetch-duration", 3*time.Second, "maximum delay before considering a block fetch as failed")
cmd.Flags().Duration("interval-between-clients-sort", 10*time.Minute, "interval between sorting clients base on their head block")
cmd.Flags().Int("block-fetch-batch-size", 10, "Number of blocks to fetch in a single batch")
cmd.Flags().Bool("optimize-single-target", false, "Only set this if every endpoint is pointing to a single node (not a cluster). It allows reducing the number of RPC calls by making assumptions about the last block")
cmd.Flags().String("network", "mainnet", "network to fetch from (mainnet, devnet, testnet) -- only used to patch a known issue on some slots")

return cmd
}

func fetchRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()

stateDir := sflags.MustGetString(cmd, "state-dir")

startBlock, err := strconv.ParseUint(args[0], 10, 64)
Expand All @@ -48,38 +48,40 @@ func fetchRunE(logger *zap.Logger, tracer logging.Tracer) firecore.CommandExecut
}

fetchInterval := sflags.MustGetDuration(cmd, "interval-between-fetch")
maxBlockFetchDuration := sflags.MustGetDuration(cmd, "max-block-fetch-duration")
intervalBetweenClientsSort := sflags.MustGetDuration(cmd, "interval-between-clients-sort")

logger.Info(
"launching firehose-solana poller",
zap.String("state_dir", stateDir),
zap.Uint64("first_streamable_block", startBlock),
zap.Duration("interval_between_fetch", fetchInterval),
zap.Duration("latest_block_retry_interval", sflags.MustGetDuration(cmd, "latest-block-retry-interval")),
)
logger.Info("launching firehose-solana poller")
cmd.Flags().VisitAll(func(flag *pflag.Flag) {
logger.Info("flag", zap.String("name", flag.Name), zap.String("value", flag.Value.String()))
})

rpcEndpoints := sflags.MustGetStringArray(cmd, "endpoints")
rpcClients := firecoreRPC.NewClients[*rpc.Client]()
rpcClients := firecoreRPC.NewClients[*rpc.Client](maxBlockFetchDuration, firecoreRPC.NewStickyRollingStrategy[*rpc.Client](), logger)
for _, rpcEndpoint := range rpcEndpoints {
client := rpc.New(rpcEndpoint)
rpcClients.Add(client)
}

latestBlockRetryInterval := sflags.MustGetDuration(cmd, "latest-block-retry-interval")
optimizeSingleTarget := sflags.MustGetBool(cmd, "optimize-single-target")
var isMainnet bool
switch sflags.MustGetString(cmd, "network") {
case "mainnet", "mainnet-beta":
isMainnet = true
}

poller := blockpoller.New(
fetcher.NewRPC(rpcClients, fetchInterval, latestBlockRetryInterval, optimizeSingleTarget, isMainnet, logger),
blockFetcher := fetcher.NewRPC(fetchInterval, latestBlockRetryInterval, isMainnet, logger)
rpcClients.StartSorting(cmd.Context(), firecoreRPC.SortDirectionDescending, blockFetcher, intervalBetweenClientsSort)

poller := blockpoller.New[*rpc.Client](
blockFetcher,
blockpoller.NewFireBlockHandler("type.googleapis.com/sf.solana.type.v1.Block"),
blockpoller.WithStoringState(stateDir),
blockpoller.WithLogger(logger),
rpcClients,
blockpoller.WithLogger[*rpc.Client](logger),
blockpoller.WithStoringState[*rpc.Client](stateDir),
)

err = poller.Run(ctx, startBlock, sflags.MustGetInt(cmd, "block-fetch-batch-size"))
err = poller.Run(startBlock, nil, sflags.MustGetInt(cmd, "block-fetch-batch-size"))
if err != nil {
return fmt.Errorf("running poller: %w", err)
}
Expand Down
Loading

0 comments on commit ad5559c

Please sign in to comment.