Skip to content

Commit

Permalink
New block fetcher (#70)
Browse files Browse the repository at this point in the history
New block fetcher
  • Loading branch information
billettc authored Feb 7, 2024
1 parent 58b6192 commit 1ba253e
Show file tree
Hide file tree
Showing 77 changed files with 225,193 additions and 27,539 deletions.
60 changes: 1 addition & 59 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,63 +33,5 @@ venv
/dist/firehose-solana_linux_x86_64.tar.gz
/dist/metadata.json
/.fleet/settings.json
/firehose-data/reader/work/uploadable-oneblock/0240816600-oFqP3SLJ4pLdwxBg-fUUN3FS7js2v2wci-240816599-default.dbin
/firehose-data/reader/work/uploadable-oneblock/0240816601-u4uTQqmxdSuMK98F-oFqP3SLJ4pLdwxBg-240816600-default.dbin
/firehose-data/storage/one-blocks/0240816543-cMrEPKC1gXiao3qC-7YjnaY1V1sqLH5pY-240816542-default.dbin.zst
/firehose-data/storage/one-blocks/0240816544-yV6VsdUySfsCRBvY-cMrEPKC1gXiao3qC-240816543-default.dbin.zst
/firehose-data/storage/one-blocks/0240816545-BybgisVZRMsozxLb-yV6VsdUySfsCRBvY-240816544-default.dbin.zst
/firehose-data/storage/one-blocks/0240816546-g5G2g6pzET3PDQvp-BybgisVZRMsozxLb-240816545-default.dbin.zst
/firehose-data/storage/one-blocks/0240816547-ueSJjEqFNMSiJLN9-g5G2g6pzET3PDQvp-240816546-default.dbin.zst
/firehose-data/storage/one-blocks/0240816548-z5A2N6qwMw538RJf-ueSJjEqFNMSiJLN9-240816547-default.dbin.zst
/firehose-data/storage/one-blocks/0240816549-d4ReNQb6w5xKCqpz-z5A2N6qwMw538RJf-240816548-default.dbin.zst
/firehose-data/storage/one-blocks/0240816550-So8K5VonzxdUUcBd-d4ReNQb6w5xKCqpz-240816549-default.dbin.zst
/firehose-data/storage/one-blocks/0240816551-CMLTpfzyQHcmrXbo-So8K5VonzxdUUcBd-240816550-default.dbin.zst
/firehose-data/storage/one-blocks/0240816552-ZHSZuU1kQh5Nv46L-CMLTpfzyQHcmrXbo-240816551-default.dbin.zst
/firehose-data/storage/one-blocks/0240816553-174pZ1Zrd2JFddQi-ZHSZuU1kQh5Nv46L-240816552-default.dbin.zst
/firehose-data/storage/one-blocks/0240816554-EPbabZUZsav9uQVv-174pZ1Zrd2JFddQi-240816553-default.dbin.zst
/firehose-data/storage/one-blocks/0240816555-Y1UyGViZvvtzBMCm-EPbabZUZsav9uQVv-240816554-default.dbin.zst
/firehose-data/storage/one-blocks/0240816556-a9KVmLptE5VChUvs-Y1UyGViZvvtzBMCm-240816555-default.dbin.zst
/firehose-data/storage/one-blocks/0240816557-RYwrLPM3bzzVEYCt-a9KVmLptE5VChUvs-240816556-default.dbin.zst
/firehose-data/storage/one-blocks/0240816558-xkYnCncwdp5xA3vH-RYwrLPM3bzzVEYCt-240816557-default.dbin.zst
/firehose-data/storage/one-blocks/0240816559-edtKxeXmxd2dEuPS-xkYnCncwdp5xA3vH-240816558-default.dbin.zst
/firehose-data/storage/one-blocks/0240816560-GtKBEExc4gM7jJmd-edtKxeXmxd2dEuPS-240816559-default.dbin.zst
/firehose-data/storage/one-blocks/0240816561-VsSxzkUMGqkGWDuZ-GtKBEExc4gM7jJmd-240816560-default.dbin.zst
/firehose-data/storage/one-blocks/0240816562-xsrKWtSL7bZ1KNHe-VsSxzkUMGqkGWDuZ-240816561-default.dbin.zst
/firehose-data/storage/one-blocks/0240816563-2QE572Vkh4e3YPnG-xsrKWtSL7bZ1KNHe-240816562-default.dbin.zst
/firehose-data/storage/one-blocks/0240816564-XS5L9C1dAPBbmocp-2QE572Vkh4e3YPnG-240816563-default.dbin.zst
/firehose-data/storage/one-blocks/0240816565-jzNs8qFu61LerS9N-XS5L9C1dAPBbmocp-240816564-default.dbin.zst
/firehose-data/storage/one-blocks/0240816566-JSwFeBocPxNS3mt7-jzNs8qFu61LerS9N-240816565-default.dbin.zst
/firehose-data/storage/one-blocks/0240816567-ShWWi2GQnVbHKtbJ-JSwFeBocPxNS3mt7-240816566-default.dbin.zst
/firehose-data/storage/one-blocks/0240816568-nhgpnme5xPNYVYwR-ShWWi2GQnVbHKtbJ-240816567-default.dbin.zst
/firehose-data/storage/one-blocks/0240816569-hR4sbKghN6T3CrB1-nhgpnme5xPNYVYwR-240816568-default.dbin.zst
/firehose-data/storage/one-blocks/0240816570-hHyV39wSRnTsbBjk-hR4sbKghN6T3CrB1-240816569-default.dbin.zst
/firehose-data/storage/one-blocks/0240816571-ui1PryWbLw5iiue6-hHyV39wSRnTsbBjk-240816570-default.dbin.zst
/firehose-data/storage/one-blocks/0240816572-bTY6AohYYy1jHdbb-ui1PryWbLw5iiue6-240816571-default.dbin.zst
/firehose-data/storage/one-blocks/0240816573-jCqrr2BzcsttqheG-bTY6AohYYy1jHdbb-240816572-default.dbin.zst
/firehose-data/storage/one-blocks/0240816574-B8uwgHSxYmA5Ny5h-jCqrr2BzcsttqheG-240816573-default.dbin.zst
/firehose-data/storage/one-blocks/0240816575-2VyWn4RVHw7JJ5ku-B8uwgHSxYmA5Ny5h-240816574-default.dbin.zst
/firehose-data/storage/one-blocks/0240816576-26YX8mmDhdKv9CpC-2VyWn4RVHw7JJ5ku-240816575-default.dbin.zst
/firehose-data/storage/one-blocks/0240816577-KFzEu243HwNRk7ZQ-26YX8mmDhdKv9CpC-240816576-default.dbin.zst
/firehose-data/storage/one-blocks/0240816578-nzrNiNYSJ491evSv-KFzEu243HwNRk7ZQ-240816577-default.dbin.zst
/firehose-data/storage/one-blocks/0240816579-H9ms7PnDWX3wScvk-nzrNiNYSJ491evSv-240816578-default.dbin.zst
/firehose-data/storage/one-blocks/0240816580-LjT8viZYRQTUA3SK-H9ms7PnDWX3wScvk-240816579-default.dbin.zst
/firehose-data/storage/one-blocks/0240816581-yvF74ErTPM29vz3z-LjT8viZYRQTUA3SK-240816580-default.dbin.zst
/firehose-data/storage/one-blocks/0240816582-Jmv87vZsbhyWUzxq-yvF74ErTPM29vz3z-240816581-default.dbin.zst
/firehose-data/storage/one-blocks/0240816583-LrrLAm54pLtKRqgE-Jmv87vZsbhyWUzxq-240816582-default.dbin.zst
/firehose-data/storage/one-blocks/0240816584-UPnhQBDes4M22kN9-LrrLAm54pLtKRqgE-240816583-default.dbin.zst
/firehose-data/storage/one-blocks/0240816585-DcHz6NDGatrPRm1D-UPnhQBDes4M22kN9-240816584-default.dbin.zst
/firehose-data/storage/one-blocks/0240816586-zAAVftqUCL3Dvh1d-DcHz6NDGatrPRm1D-240816585-default.dbin.zst
/firehose-data/storage/one-blocks/0240816587-gGQ9CNND8iZNsz6f-zAAVftqUCL3Dvh1d-240816586-default.dbin.zst
/firehose-data/storage/one-blocks/0240816588-WKUdBZjXU24tdFww-gGQ9CNND8iZNsz6f-240816587-default.dbin.zst
/firehose-data/storage/one-blocks/0240816589-1JkvHG1M54muNwUn-WKUdBZjXU24tdFww-240816588-default.dbin.zst
/firehose-data/storage/one-blocks/0240816590-ueWoAdyc35DmXeeR-1JkvHG1M54muNwUn-240816589-default.dbin.zst
/firehose-data/storage/one-blocks/0240816591-D2KXk9Nc3JMTbSR4-ueWoAdyc35DmXeeR-240816590-default.dbin.zst
/firehose-data/storage/one-blocks/0240816592-ep9SnfJcBWvfXLAA-D2KXk9Nc3JMTbSR4-240816591-default.dbin.zst
/firehose-data/storage/one-blocks/0240816593-J43m8d2usngrEyqc-ep9SnfJcBWvfXLAA-240816592-default.dbin.zst
/firehose-data/storage/one-blocks/0240816594-DqY8pr5KyTZ83cyy-J43m8d2usngrEyqc-240816593-default.dbin.zst
/firehose-data/storage/one-blocks/0240816595-oYyC4GpEybsC6fUN-DqY8pr5KyTZ83cyy-240816594-default.dbin.zst
/firehose-data/storage/one-blocks/0240816596-juycmDi3U5EMAugh-oYyC4GpEybsC6fUN-240816595-default.dbin.zst
/firehose-data/storage/one-blocks/0240816597-AQipdgoR9gVQLc2k-juycmDi3U5EMAugh-240816596-default.dbin.zst
/firehose-data/storage/one-blocks/0240816598-nRQqTFc22RED3W5g-AQipdgoR9gVQLc2k-240816597-default.dbin.zst
/firehose-data/storage/one-blocks/0240816599-fUUN3FS7js2v2wci-nRQqTFc22RED3W5g-240816598-default.dbin.zst
/firesol-legacy
/block/fetcher/secret.go
20 changes: 8 additions & 12 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax=docker/dockerfile:1.2

FROM ghcr.io/streamingfast/firehose-solana:solana-bigtable-decoder as chain
FROM ghcr.io/streamingfast/firehose-core:b248423 as core

FROM ubuntu:20.04

Expand All @@ -12,24 +12,20 @@ RUN DEBIAN_FRONTEND=noninteractive apt-get update && \

RUN rm /etc/localtime && ln -snf /usr/share/zoneinfo/America/Montreal /etc/localtime && dpkg-reconfigure -f noninteractive tzdata

RUN mkdir /tmp/wasmer-install && cd /tmp/wasmer-install && \
curl -L https://github.com/wasmerio/wasmer/releases/download/2.3.0/wasmer-linux-amd64.tar.gz | tar xzf - && \
mv lib/libwasmer.a lib/libwasmer.so /usr/lib/ && cd / && rm -rf /tmp/wasmer-install

ADD /firesol /app/firesol

COPY tools/docker/motd_generic /etc/
COPY tools/docker/motd_node_manager /etc/
COPY tools/docker/99-firehose-solana.sh /etc/profile.d/
COPY tools/docker/scripts/* /usr/local/bin
#COPY tools/docker/motd_generic /etc/
#COPY tools/docker/motd_node_manager /etc/
#COPY tools/docker/99-firehose-solana.sh /etc/profile.d/
#COPY tools/docker/scripts/* /usr/local/bin

# On SSH connection, /root/.bashrc is invoked which invokes '/root/.bash_aliases' if existing,
# so we hijack the file to "execute" our specialized bash script
RUN echo ". /etc/profile.d/99-firehose-solana.sh" > /root/.bash_aliases
#RUN echo ". /etc/profile.d/99-firehose-solana.sh" > /root/.bash_aliases

ENV PATH "$PATH:/app"

COPY --from=chain /app/solana-bigtable-decoder /app/solana-bigtable-decoder

ENTRYPOINT ["/app/firesol"]
COPY --from=core /app/firecore /app/firecore

ENTRYPOINT ["/app/firesol"]
42 changes: 0 additions & 42 deletions bin/test.sh

This file was deleted.

162 changes: 158 additions & 4 deletions bt/reader.go → block/fetcher/bigtable.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,178 @@
package bt
package fetcher

import (
"bytes"
"compress/bzip2"
"compress/gzip"
"context"
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"math/big"
"os/exec"
"strings"
"time"

"cloud.google.com/go/bigtable"
"github.com/golang/protobuf/proto"
"github.com/klauspost/compress/zstd"
pbsolv1 "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/logging"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

type BigtableBlockReader struct {
bt *bigtable.Client
maxConnAttempt uint64

logger *zap.Logger
tracer logging.Tracer
}

func NewBigtableReader(bt *bigtable.Client, maxConnectionAttempt uint64, logger *zap.Logger, tracer logging.Tracer) *BigtableBlockReader {
return &BigtableBlockReader{
bt: bt,
logger: logger,
tracer: tracer,
maxConnAttempt: maxConnectionAttempt,
}
}

var PrintFreq = uint64(10)

func (r *BigtableBlockReader) Read(
ctx context.Context,
startBlockNum,
stopBlockNum uint64,
processBlock func(block *pbsolv1.Block) error,
) error {
var seenStartBlock bool
var lastSeenBlock *pbsolv1.Block
var fatalError error

r.logger.Info("launching firehose-solana reprocessing",
zap.Uint64("start_block_num", startBlockNum),
zap.Uint64("stop_block_num", stopBlockNum),
)
table := r.bt.Open("blocks")
attempts := uint64(0)

for {
if lastSeenBlock != nil {
resolvedStartBlock := lastSeenBlock.GetFirehoseBlockNumber()
r.logger.Debug("restarting read rows will retry last boundary",
zap.Uint64("last_seen_block", lastSeenBlock.GetFirehoseBlockNumber()),
zap.Uint64("resolved_block", resolvedStartBlock),
)
startBlockNum = resolvedStartBlock
}

btRange := bigtable.NewRange(fmt.Sprintf("%016x", startBlockNum), "")
err := table.ReadRows(ctx, btRange, func(row bigtable.Row) bool {

blk, zlogger, err := r.ProcessRow(row)
if err != nil {
fatalError = fmt.Errorf("failed to read row: %w", err)
return false
}

if !seenStartBlock {
if blk.Slot < startBlockNum {
r.logger.Debug("skipping blow below start block",
zap.Uint64("expected_block", startBlockNum),
)
return true
}
seenStartBlock = true
}

if lastSeenBlock != nil && lastSeenBlock.Blockhash == blk.Blockhash {
r.logger.Debug("skipping block already seed",
zap.Object("blk", blk),
)
return true
}

if lastSeenBlock != nil && (lastSeenBlock.Blockhash != blk.PreviousBlockhash) {
// Weird cases where we do not receive the next linkeable block.
// we should try to reconnect
r.logger.Warn("received unlikable block",
zap.Object("last_seen_blk", lastSeenBlock),
zap.Object("blk", blk),
zap.String("blk_previous_blockhash", blk.PreviousBlockhash),
)
return false
}

r.progressLog(blk, zlogger)
lastSeenBlock = blk
if err := processBlock(blk); err != nil {
fatalError = fmt.Errorf("failed to write blokc: %w", err)
return false
}

if stopBlockNum != 0 && blk.GetFirehoseBlockNumber() > stopBlockNum {
return false
}

return true
})

if err != nil {
attempts++
if attempts >= r.maxConnAttempt {
return fmt.Errorf("error while reading rowns, reached max attempts %d: %w", attempts, err)
}
r.logger.Error("error white reading rows", zap.Error(err), zap.Reflect("last_seen_block", lastSeenBlock), zap.Uint64("attempts", attempts))
continue
}
if fatalError != nil {
msg := "no blocks senn"
if lastSeenBlock != nil {
msg = fmt.Sprintf("last seen block %d (%s)", lastSeenBlock.GetFirehoseBlockNumber(), lastSeenBlock.GetFirehoseBlockID())
}
return fmt.Errorf("read blocks finished with a fatal error, %s: %w", msg, fatalError)
}
var opt []zap.Field
if lastSeenBlock != nil {
opt = append(opt, zap.Object("last_seen_block", lastSeenBlock))
}
r.logger.Debug("read block finished", opt...)
if stopBlockNum != 0 {
return nil
}
r.logger.Debug("stop block is num will sleep for 5 seconds and retry")
time.Sleep(5 * time.Second)
}
}

func (r *BigtableBlockReader) progressLog(blk *pbsolv1.Block, zlogger *zap.Logger) {
if r.tracer.Enabled() {
zlogger.Debug("handing block",
zap.Uint64("parent_slot", blk.ParentSlot),
zap.String("hash", blk.Blockhash),
)
}

if blk.Slot%PrintFreq == 0 {
opts := []zap.Field{
zap.String("hash", blk.Blockhash),
zap.String("previous_hash", blk.GetFirehoseBlockParentID()),
zap.Uint64("parent_slot", blk.ParentSlot),
}

if blk.BlockTime != nil {
opts = append(opts, zap.Int64("timestamp", blk.BlockTime.Timestamp))
} else {
opts = append(opts, zap.Int64("timestamp", 0))
}

zlogger.Info(fmt.Sprintf("processing block 1 / %d", PrintFreq), opts...)
}

}

type RowType string

const (
Expand All @@ -38,7 +192,7 @@ func explodeRow(row bigtable.Row) (*big.Int, RowType, []byte) {
return blockNum, rowType, el.Value
}

func (r *Client) processRow(row bigtable.Row) (*pbsolv1.Block, *zap.Logger, error) {
func (r *BigtableBlockReader) ProcessRow(row bigtable.Row) (*pbsolv1.Block, *zap.Logger, error) {
blockNum, rowType, rowCnt := explodeRow(row)
zlogger := r.logger.With(
zap.Uint64("block_num", blockNum.Uint64()),
Expand Down Expand Up @@ -138,7 +292,7 @@ func externalBinToProto(in []byte, command string, args ...string) ([]byte, erro
return cnt, nil
}

func (r *Client) decompress(in []byte) (out []byte, err error) {
func (r *BigtableBlockReader) decompress(in []byte) (out []byte, err error) {
switch in[0] {
case 0:
r.logger.Debug("no compression found")
Expand Down
Loading

0 comments on commit 1ba253e

Please sign in to comment.