Skip to content

Commit

Permalink
commenting all the transaction stuff for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduard-Voiculescu committed Aug 17, 2023
1 parent 85290c2 commit 969d23c
Showing 1 changed file with 61 additions and 63 deletions.
124 changes: 61 additions & 63 deletions data/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import (
"fmt"
"time"

pb "github.com/streamingfast/honey-tracker/data/pb/hivemapper/v1"
data "github.com/streamingfast/honey-tracker/utils"
sink "github.com/streamingfast/substreams-sink"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
v1 "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

type Sinker struct {
Expand Down Expand Up @@ -72,68 +70,68 @@ func (s *Sinker) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrp

defer func() {
s.averageBlockTimeProcessing.Add(time.Since(startTime).Milliseconds())
if err != nil {
e := s.db.RollbackTransaction()
err = fmt.Errorf("block: %d rollback transaction: %w: while handling err %w", data.Clock.Number, e, err)
return
}
err = s.db.CommitTransaction()
//if err != nil {
// e := s.db.RollbackTransaction()
// err = fmt.Errorf("block: %d rollback transaction: %w: while handling err %w", data.Clock.Number, e, err)
// return
//}
//err = s.db.CommitTransaction()
}()

err = s.db.BeginTransaction()
if err != nil {
return fmt.Errorf("begin transaction: %w", err)
}

output := data.Output
if output.Name != s.OutputModuleName() {
return fmt.Errorf("received data from wrong output module, expected to received from %q but got module's output for %q", s.OutputModuleName(), output.Name)
}

if len(output.GetMapOutput().GetValue()) == 0 {
return s.db.StoreCursor(cursor)
}

moduleOutput := &pb.Output{}
err = proto.Unmarshal(output.GetMapOutput().GetValue(), moduleOutput)
if err != nil {
return fmt.Errorf("unmarshal module output changes: %w", err)
}

dbBlockID, err := s.db.HandleClock(data.Clock)
if err != nil {
return fmt.Errorf("handle block clock: %w", err)
}

err = s.db.HandleInitializedAccount(dbBlockID, moduleOutput.InitializedAccount)
if err != nil {
return fmt.Errorf("handle initialized accounts: %w", err)
}

if err := s.db.HandleRegularDriverPayments(dbBlockID, moduleOutput.RegularDriverPayments); err != nil {
return fmt.Errorf("handle payments: %w", err)
}

if err := s.db.HandleSplitPayments(dbBlockID, moduleOutput.TokenSplittingPayments); err != nil {
return fmt.Errorf("handle split payments: %w", err)
}

if err := s.db.HandleTransfers(dbBlockID, moduleOutput.Transfers); err != nil {
return fmt.Errorf("handle transfers: %w", err)
}

if err := s.db.HandleMints(dbBlockID, moduleOutput.Mints); err != nil {
return fmt.Errorf("handle mints: %w", err)
}

if err := s.db.HandleBurns(dbBlockID, moduleOutput.Burns); err != nil {
return fmt.Errorf("handle burns: %w", err)
}

err = s.db.StoreCursor(cursor)
if err != nil {
return fmt.Errorf("store cursor: %w", err)
}
//
//err = s.db.BeginTransaction()
//if err != nil {
// return fmt.Errorf("begin transaction: %w", err)
//}
//
//output := data.Output
//if output.Name != s.OutputModuleName() {
// return fmt.Errorf("received data from wrong output module, expected to received from %q but got module's output for %q", s.OutputModuleName(), output.Name)
//}
//
//if len(output.GetMapOutput().GetValue()) == 0 {
// return s.db.StoreCursor(cursor)
//}
//
//moduleOutput := &pb.Output{}
//err = proto.Unmarshal(output.GetMapOutput().GetValue(), moduleOutput)
//if err != nil {
// return fmt.Errorf("unmarshal module output changes: %w", err)
//}
//
//dbBlockID, err := s.db.HandleClock(data.Clock)
//if err != nil {
// return fmt.Errorf("handle block clock: %w", err)
//}
//
//err = s.db.HandleInitializedAccount(dbBlockID, moduleOutput.InitializedAccount)
//if err != nil {
// return fmt.Errorf("handle initialized accounts: %w", err)
//}
//
//if err := s.db.HandleRegularDriverPayments(dbBlockID, moduleOutput.RegularDriverPayments); err != nil {
// return fmt.Errorf("handle payments: %w", err)
//}
//
//if err := s.db.HandleSplitPayments(dbBlockID, moduleOutput.TokenSplittingPayments); err != nil {
// return fmt.Errorf("handle split payments: %w", err)
//}
//
//if err := s.db.HandleTransfers(dbBlockID, moduleOutput.Transfers); err != nil {
// return fmt.Errorf("handle transfers: %w", err)
//}
//
//if err := s.db.HandleMints(dbBlockID, moduleOutput.Mints); err != nil {
// return fmt.Errorf("handle mints: %w", err)
//}
//
//if err := s.db.HandleBurns(dbBlockID, moduleOutput.Burns); err != nil {
// return fmt.Errorf("handle burns: %w", err)
//}
//
//err = s.db.StoreCursor(cursor)
//if err != nil {
// return fmt.Errorf("store cursor: %w", err)
//}

return nil
}
Expand Down

0 comments on commit 969d23c

Please sign in to comment.