diff --git a/internal/log.go b/internal/log.go index 15889223a..04a870e7f 100644 --- a/internal/log.go +++ b/internal/log.go @@ -92,7 +92,7 @@ type Log struct { // It allows to check if the usage of IdempotencyKey match inputs given on the first idempotency key usage. IdempotencyHash string `json:"idempotencyHash" bun:"idempotency_hash,unique,nullzero"` ID int `json:"id" bun:"id,unique,type:numeric"` - Hash []byte `json:"hash" bun:"hash,type:bytea"` + Hash []byte `json:"hash" bun:"hash,type:bytea,scanonly"` } func (l Log) WithIdempotencyKey(key string) Log { @@ -201,12 +201,19 @@ var _ LogPayload = (*CreatedTransaction)(nil) func (p CreatedTransaction) GetMemento() any { // Exclude postCommitVolumes and postCommitEffectiveVolumes fields from transactions. // We don't want those fields to be part of the hash as they are not part of the decision-making process. - return struct { + type transactionResume struct { TransactionData ID int `json:"id"` + } + + return struct { + Transaction transactionResume `json:"transaction"` + AccountMetadata AccountMetadata `json:"accountMetadata"` }{ - TransactionData: p.Transaction.TransactionData, - ID: p.Transaction.ID, + Transaction: transactionResume{ + TransactionData: p.Transaction.TransactionData, + ID: p.Transaction.ID, + }, } } @@ -358,4 +365,4 @@ func ComputeIdempotencyHash(inputs any) string { } return base64.URLEncoding.EncodeToString(digest.Sum(nil)) -} +} \ No newline at end of file diff --git a/internal/storage/bucket/migrations/30-logs-hash-in-database.sql b/internal/storage/bucket/migrations/30-logs-hash-in-database.sql new file mode 100644 index 000000000..c6b0fa249 --- /dev/null +++ b/internal/storage/bucket/migrations/30-logs-hash-in-database.sql @@ -0,0 +1,39 @@ +create function "{{.Bucket}}".set_log_hash() + returns trigger + security definer + language plpgsql +as +$$ +declare + previousHash bytea; + marshalledAsJSON varchar; +begin + select hash into previousHash + from "{{.Bucket}}".logs + where ledger = new.ledger + order by seq desc + limit 1; + + -- select only fields participating in the hash on the backend and format json representation the same way + select '{' || + '"type":"' || new.type || '",' || + '"data":' || encode(new.memento, 'escape') || ',' || + '"date":"' || (to_json(new.date::timestamp)#>>'{}') || 'Z",' || + '"idempotencyKey":"' || coalesce(new.idempotency_key, '') || '",' || + '"id":0,' || + '"hash":null' || + '}' into marshalledAsJSON; + + new.hash = ( + select public.digest( + case + when previousHash is null + then marshalledAsJSON::bytea + else '"' || encode(previousHash::bytea, 'base64')::bytea || E'"\n' || convert_to(marshalledAsJSON, 'LATIN1')::bytea + end || E'\n', 'sha256'::text + ) + ); + + return new; +end; +$$; \ No newline at end of file diff --git a/internal/storage/ledger/debug.go b/internal/storage/ledger/debug.go index 70783e699..cde123a36 100644 --- a/internal/storage/ledger/debug.go +++ b/internal/storage/ledger/debug.go @@ -9,9 +9,9 @@ import ( ) // nolint:unused -func (s *Store) dumpTables(ctx context.Context, tables ...string) { +func (s *Store) DumpTables(ctx context.Context, tables ...string) { for _, table := range tables { - s.dumpQuery( + s.DumpQuery( ctx, s.db.NewSelect(). ModelTableExpr(s.GetPrefixedRelationName(table)), @@ -20,17 +20,17 @@ func (s *Store) dumpTables(ctx context.Context, tables ...string) { } // nolint:unused -func (s *Store) dumpQuery(ctx context.Context, query *bun.SelectQuery) { +func (s *Store) DumpQuery(ctx context.Context, query *bun.SelectQuery) { fmt.Println(query) rows, err := query.Rows(ctx) if err != nil { panic(err) } - s.dumpRows(rows) + s.DumpRows(rows) } // nolint:unused -func (s *Store) dumpRows(rows *sql.Rows) { +func (s *Store) DumpRows(rows *sql.Rows) { data, err := xsql.Pretty(rows) if err != nil { panic(err) diff --git a/internal/storage/ledger/logs.go b/internal/storage/ledger/logs.go index 454d3ec06..ae0b8ce74 100644 --- a/internal/storage/ledger/logs.go +++ b/internal/storage/ledger/logs.go @@ -2,7 +2,6 @@ package ledger import ( "context" - "database/sql" "database/sql/driver" "encoding/json" "fmt" @@ -52,27 +51,12 @@ func (s *Store) InsertLog(ctx context.Context, log *ledger.Log) error { if s.ledger.HasFeature(ledger.FeatureHashLogs, "SYNC") { _, err := s.db.NewRaw(`select pg_advisory_xact_lock(?)`, s.ledger.ID).Exec(ctx) if err != nil { - return err - } - lastLog := &Log{} - err = s.db.NewSelect(). - Model(lastLog). - ModelTableExpr(s.GetPrefixedRelationName("logs")). - Order("seq desc"). - Where("ledger = ?", s.ledger.Name). - Limit(1). - Scan(ctx) - if err != nil { - if !errors.Is(err, sql.ErrNoRows) { - return fmt.Errorf("retrieving last log: %w", err) - } - log.ComputeHash(nil) - } else { - log.ComputeHash(pointer.For(lastLog.toCore())) + return postgres.ResolveError(err) } } _, err := tracing.TraceWithLatency(ctx, "InsertLog", tracing.NoResult(func(ctx context.Context) error { + payloadData, err := json.Marshal(log.Data) if err != nil { return fmt.Errorf("failed to marshal log data: %w", err) @@ -80,7 +64,7 @@ func (s *Store) InsertLog(ctx context.Context, log *ledger.Log) error { mementoObject := log.Data.(any) if memento, ok := mementoObject.(ledger.Memento); ok { - mementoObject = memento + mementoObject = memento.GetMemento() } mementoData, err := json.Marshal(mementoObject) diff --git a/internal/storage/ledger/logs_test.go b/internal/storage/ledger/logs_test.go index 9bd06723e..0d9189f90 100644 --- a/internal/storage/ledger/logs_test.go +++ b/internal/storage/ledger/logs_test.go @@ -22,12 +22,49 @@ import ( "github.com/stretchr/testify/require" ) +// todo: add log hash test with ledger v2 + func TestInsertLog(t *testing.T) { t.Parallel() store := newLedgerStore(t) ctx := logging.TestingContext() + t.Run("check hash against core", func(t *testing.T) { + // Insert a first tx (we don't have any previous hash to use at this moment) + log1 := ledger.NewLog(ledger.CreatedTransaction{ + Transaction: ledger.NewTransaction(), + AccountMetadata: ledger.AccountMetadata{}, + }) + log1Copy := log1 + + err := store.InsertLog(ctx, &log1) + require.NoError(t, err) + + require.Equal(t, 1, log1.ID) + require.NotZero(t, log1.Hash) + + // Ensure than the database hashing is the same as the go hashing + chainedLog1 := log1Copy.ChainLog(nil) + require.Equal(t, chainedLog1.Hash, log1.Hash) + + // Insert a new log to test the hash when a previous hash exists + // We also addi an idempotency key to check for conflicts + log2 := ledger.NewLog(ledger.CreatedTransaction{ + Transaction: ledger.NewTransaction(), + AccountMetadata: ledger.AccountMetadata{}, + }) + log2Copy := log2 + err = store.InsertLog(ctx, &log2) + require.NoError(t, err) + require.Equal(t, 2, log2.ID) + require.NotZero(t, log2.Hash) + + // Ensure than the database hashing is the same as the go hashing + chainedLog2 := log2Copy.ChainLog(&log1) + require.Equal(t, chainedLog2.Hash, log2.Hash) + }) + t.Run("duplicate IK", func(t *testing.T) { // Insert a first tx (we don't have any previous hash to use at this moment) logTx := ledger.NewLog(ledger.CreatedTransaction{ diff --git a/internal/storage/ledger/migrations/0-add-sequences.sql b/internal/storage/ledger/migrations/0-add-sequences.sql index a4b16186f..aa709231b 100644 --- a/internal/storage/ledger/migrations/0-add-sequences.sql +++ b/internal/storage/ledger/migrations/0-add-sequences.sql @@ -41,6 +41,19 @@ when ( execute procedure "{{.Bucket}}".update_effective_volumes(); {{ end }} +-- logs hash + +{{ if .HasFeature "HASH_LOGS" "SYNC" }} +create trigger "set_log_hash_{{.ID}}" +before insert +on "{{.Bucket}}"."logs" +for each row +when ( + new.ledger = '{{.Name}}' +) +execute procedure "{{.Bucket}}".set_log_hash(); +{{ end }} + {{ if .HasFeature "ACCOUNT_METADATA_HISTORY" "SYNC" }} create trigger "update_account_metadata_history_{{.ID}}" after update