From 969d23cc89290036b55e1cc9231545be59d18cdf Mon Sep 17 00:00:00 2001 From: Eduard Voiculescu Date: Thu, 17 Aug 2023 14:28:03 -0400 Subject: [PATCH] commenting all the transaction stuff for testing --- data/sinker.go | 124 ++++++++++++++++++++++++------------------------- 1 file changed, 61 insertions(+), 63 deletions(-) diff --git a/data/sinker.go b/data/sinker.go index 6aac9a2..6f7ef09 100644 --- a/data/sinker.go +++ b/data/sinker.go @@ -5,13 +5,11 @@ import ( "fmt" "time" - pb "github.com/streamingfast/honey-tracker/data/pb/hivemapper/v1" data "github.com/streamingfast/honey-tracker/utils" sink "github.com/streamingfast/substreams-sink" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" v1 "github.com/streamingfast/substreams/pb/sf/substreams/v1" "go.uber.org/zap" - "google.golang.org/protobuf/proto" ) type Sinker struct { @@ -72,68 +70,68 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp defer func() { s.averageBlockTimeProcessing.Add(time.Since(startTime).Milliseconds()) - if err != nil { - e := s.db.RollbackTransaction() - err = fmt.Errorf("block: %d rollback transaction: %w: while handling err %w", data.Clock.Number, e, err) - return - } - err = s.db.CommitTransaction() + //if err != nil { + // e := s.db.RollbackTransaction() + // err = fmt.Errorf("block: %d rollback transaction: %w: while handling err %w", data.Clock.Number, e, err) + // return + //} + //err = s.db.CommitTransaction() }() - - err = s.db.BeginTransaction() - if err != nil { - return fmt.Errorf("begin transaction: %w", err) - } - - output := data.Output - if output.Name != s.OutputModuleName() { - return fmt.Errorf("received data from wrong output module, expected to received from %q but got module's output for %q", s.OutputModuleName(), output.Name) - } - - if len(output.GetMapOutput().GetValue()) == 0 { - return s.db.StoreCursor(cursor) - } - - moduleOutput := &pb.Output{} - err = proto.Unmarshal(output.GetMapOutput().GetValue(), moduleOutput) - if err != nil { - return fmt.Errorf("unmarshal module output changes: %w", err) - } - - dbBlockID, err := s.db.HandleClock(data.Clock) - if err != nil { - return fmt.Errorf("handle block clock: %w", err) - } - - err = s.db.HandleInitializedAccount(dbBlockID, moduleOutput.InitializedAccount) - if err != nil { - return fmt.Errorf("handle initialized accounts: %w", err) - } - - if err := s.db.HandleRegularDriverPayments(dbBlockID, moduleOutput.RegularDriverPayments); err != nil { - return fmt.Errorf("handle payments: %w", err) - } - - if err := s.db.HandleSplitPayments(dbBlockID, moduleOutput.TokenSplittingPayments); err != nil { - return fmt.Errorf("handle split payments: %w", err) - } - - if err := s.db.HandleTransfers(dbBlockID, moduleOutput.Transfers); err != nil { - return fmt.Errorf("handle transfers: %w", err) - } - - if err := s.db.HandleMints(dbBlockID, moduleOutput.Mints); err != nil { - return fmt.Errorf("handle mints: %w", err) - } - - if err := s.db.HandleBurns(dbBlockID, moduleOutput.Burns); err != nil { - return fmt.Errorf("handle burns: %w", err) - } - - err = s.db.StoreCursor(cursor) - if err != nil { - return fmt.Errorf("store cursor: %w", err) - } + // + //err = s.db.BeginTransaction() + //if err != nil { + // return fmt.Errorf("begin transaction: %w", err) + //} + // + //output := data.Output + //if output.Name != s.OutputModuleName() { + // return fmt.Errorf("received data from wrong output module, expected to received from %q but got module's output for %q", s.OutputModuleName(), output.Name) + //} + // + //if len(output.GetMapOutput().GetValue()) == 0 { + // return s.db.StoreCursor(cursor) + //} + // + //moduleOutput := &pb.Output{} + //err = proto.Unmarshal(output.GetMapOutput().GetValue(), moduleOutput) + //if err != nil { + // return fmt.Errorf("unmarshal module output changes: %w", err) + //} + // + //dbBlockID, err := s.db.HandleClock(data.Clock) + //if err != nil { + // return fmt.Errorf("handle block clock: %w", err) + //} + // + //err = s.db.HandleInitializedAccount(dbBlockID, moduleOutput.InitializedAccount) + //if err != nil { + // return fmt.Errorf("handle initialized accounts: %w", err) + //} + // + //if err := s.db.HandleRegularDriverPayments(dbBlockID, moduleOutput.RegularDriverPayments); err != nil { + // return fmt.Errorf("handle payments: %w", err) + //} + // + //if err := s.db.HandleSplitPayments(dbBlockID, moduleOutput.TokenSplittingPayments); err != nil { + // return fmt.Errorf("handle split payments: %w", err) + //} + // + //if err := s.db.HandleTransfers(dbBlockID, moduleOutput.Transfers); err != nil { + // return fmt.Errorf("handle transfers: %w", err) + //} + // + //if err := s.db.HandleMints(dbBlockID, moduleOutput.Mints); err != nil { + // return fmt.Errorf("handle mints: %w", err) + //} + // + //if err := s.db.HandleBurns(dbBlockID, moduleOutput.Burns); err != nil { + // return fmt.Errorf("handle burns: %w", err) + //} + // + //err = s.db.StoreCursor(cursor) + //if err != nil { + // return fmt.Errorf("store cursor: %w", err) + //} return nil }