Skip to content

Commit

Permalink
Avoid reencoding ATX blob (#5868)
Browse files Browse the repository at this point in the history
## Motivation
  • Loading branch information
poszu committed Apr 22, 2024
1 parent 1fd7eb4 commit 213a585
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 27 deletions.
13 changes: 10 additions & 3 deletions activation/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,11 +760,18 @@ func (b *Builder) Regossip(ctx context.Context, nodeID types.NodeID) error {
}

// SignAndFinalizeAtx signs the atx with specified signer and calculates the ID of the ATX.
// DO NOT USE for new code. This function is deprecated and will be removed.
// The proper way to create an ATX in tests is to use the specific wire type and sign it.
func SignAndFinalizeAtx(signer *signing.EdSigner, atx *types.ActivationTx) error {
// FIXME - there is no need to sign types.ActivationTX (only ActivationTxVx)
wireAtx := wire.ActivationTxToWireV1(atx)
atx.Signature = signer.Sign(signing.ATX, wireAtx.SignedBytes())
atx.SmesherID = signer.NodeID()
wireAtx.Signature = signer.Sign(signing.ATX, wireAtx.SignedBytes())
wireAtx.SmesherID = signer.NodeID()
atx.AtxBlob = types.AtxBlob{
Version: types.AtxV1,
Blob: codec.MustEncode(wireAtx),
}
atx.Signature = wireAtx.Signature
atx.SmesherID = wireAtx.SmesherID
atx.SetID(types.ATXID(wireAtx.HashInnerBytes()))
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions activation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func (h *Handler) handleAtx(
h.inProgressMu.Unlock()
h.log.WithContext(ctx).With().Info("handling incoming atx", id, log.Int("size", len(msg)))

proof, err := h.processATX(ctx, peer, atx, receivedTime)
proof, err := h.processATX(ctx, peer, atx, msg, receivedTime)
h.inProgressMu.Lock()
defer h.inProgressMu.Unlock()
for _, ch := range h.inProgress[id] {
Expand All @@ -526,6 +526,7 @@ func (h *Handler) processATX(
ctx context.Context,
peer p2p.Peer,
watx wire.ActivationTxV1,
blob []byte,
received time.Time,
) (*mwire.MalfeasanceProof, error) {
if !h.edVerifier.Verify(signing.ATX, watx.SmesherID, watx.SignedBytes(), watx.Signature) {
Expand Down Expand Up @@ -575,7 +576,7 @@ func (h *Handler) processATX(
baseTickHeight = posAtx.TickHeight()
}

atx := wire.ActivationTxFromWireV1(&watx)
atx := wire.ActivationTxFromWireV1(&watx, blob...)
if h.nipostValidator.IsVerifyingFullPost() {
atx.SetValidity(types.Valid)
}
Expand Down
11 changes: 9 additions & 2 deletions activation/wire/wire_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ func ActivationTxFromBytes(data []byte) (*types.ActivationTx, error) {
return nil, fmt.Errorf("decoding ATX: %w", err)
}

return ActivationTxFromWireV1(&wireAtx), nil
return ActivationTxFromWireV1(&wireAtx, data...), nil
}

func ActivationTxFromWireV1(atx *ActivationTxV1) *types.ActivationTx {
func ActivationTxFromWireV1(atx *ActivationTxV1, blob ...byte) *types.ActivationTx {
result := &types.ActivationTx{
InnerActivationTx: types.InnerActivationTx{
NIPostChallenge: types.NIPostChallenge{
Expand All @@ -195,6 +195,13 @@ func ActivationTxFromWireV1(atx *ActivationTxV1) *types.ActivationTx {
},
SmesherID: atx.SmesherID,
Signature: atx.Signature,
AtxBlob: types.AtxBlob{
Version: types.AtxV1,
Blob: blob,
},
}
if len(blob) == 0 {
result.AtxBlob.Blob = codec.MustEncode(atx)
}

result.SetID(types.ATXID(atx.HashInnerBytes()))
Expand Down
4 changes: 2 additions & 2 deletions checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func validateAndPreserveData(
for _, dep := range deps {
var atx wire.ActivationTxV1
require.NoError(tb, codec.Decode(dep.Blob, &atx))
vatx := wire.ActivationTxFromWireV1(&atx)
vatx := wire.ActivationTxFromWireV1(&atx, dep.Blob...)
mclock.EXPECT().CurrentLayer().Return(vatx.PublishEpoch.FirstLayer())
mfetch.EXPECT().RegisterPeerHashes(gomock.Any(), gomock.Any())
mfetch.EXPECT().GetPoetProof(gomock.Any(), gomock.Any())
Expand Down Expand Up @@ -795,7 +795,7 @@ func TestRecover_OwnAtxNotInCheckpoint_Preserve_DepIsGolden(t *testing.T) {
// make the first one from the previous snapshot
var atx wire.ActivationTxV1
require.NoError(t, codec.Decode(vAtxs[0].Blob, &atx))
golden := wire.ActivationTxFromWireV1(&atx)
golden := wire.ActivationTxFromWireV1(&atx, vAtxs[0].Blob...)
require.NoError(t, atxs.AddCheckpointed(oldDB, &atxs.CheckpointAtx{
ID: golden.ID(),
Epoch: golden.PublishEpoch,
Expand Down
10 changes: 10 additions & 0 deletions common/types/activation.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,15 @@ func (m *ATXMetadata) MarshalLogObject(encoder log.ObjectEncoder) error {
return nil
}

type AtxVersion uint

const AtxV1 AtxVersion = 1

type AtxBlob struct {
Blob []byte
Version AtxVersion
}

// ActivationTx is a full, signed activation transaction. It includes (or references) everything a miner needs to prove
// they are eligible to actively participate in the Spacemesh protocol in the next epoch.
type ActivationTx struct {
Expand All @@ -166,6 +175,7 @@ type ActivationTx struct {
SmesherID NodeID
Signature EdSignature

AtxBlob
golden bool
}

Expand Down
6 changes: 2 additions & 4 deletions malfeasance/wire/malfeasance.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func (mp *MalfeasanceProof) MarshalLogObject(encoder log.ObjectEncoder) error {
encoder.AddString("type", "invalid post index")
p, ok := mp.Proof.Data.(*InvalidPostIndexProof)
if ok {
atx := wire.ActivationTxFromWireV1(&p.Atx)
encoder.AddString("atx_id", atx.ID().String())
encoder.AddString("atx_id", p.Atx.ID().String())
encoder.AddString("smesher", p.Atx.SmesherID.String())
encoder.AddUint32("invalid index", p.InvalidIdx)
}
Expand Down Expand Up @@ -359,11 +358,10 @@ func MalfeasanceInfo(smesher types.NodeID, mp *MalfeasanceProof) string {
case InvalidPostIndex:
p, ok := mp.Proof.Data.(*InvalidPostIndexProof)
if ok {
atx := wire.ActivationTxFromWireV1(&p.Atx)
b.WriteString(
fmt.Sprintf(
"cause: smesher published ATX %s with invalid post index %d in epoch %d\n",
atx.ID().ShortString(),
p.Atx.ID().ShortString(),
p.InvalidIdx,
p.Atx.PublishEpoch,
))
Expand Down
24 changes: 13 additions & 11 deletions sql/atxs/atxs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"time"

sqlite "github.com/go-llsqlite/crawshaw"
Expand Down Expand Up @@ -40,11 +41,16 @@ func decoder(fn decoderCallback) sql.Decoder {
stmt.ColumnBytes(0, id[:])
checkpointed := stmt.ColumnLen(1) == 0
if !checkpointed {
// FIXME: remove decoding blob once ActivationTx struct is trimmed from unncecessary fields.
var atxV1 wire.ActivationTxV1
if _, err := codec.DecodeFrom(stmt.ColumnReader(1), &atxV1); err != nil {
blob, err := io.ReadAll(stmt.ColumnReader(1))
if err != nil {
return fn(nil, fmt.Errorf("read blob %w", err))
}
if err := codec.Decode(blob, &atxV1); err != nil {
return fn(nil, fmt.Errorf("decode %w", err))
}
a = *wire.ActivationTxFromWireV1(&atxV1)
a = *wire.ActivationTxFromWireV1(&atxV1, blob...)
}
a.SetID(id)
baseTickHeight := uint64(stmt.ColumnInt64(2))
Expand Down Expand Up @@ -423,11 +429,6 @@ func AddMaybeNoNonce(db sql.Executor, atx *types.VerifiedActivationTx) error {
}

func add(db sql.Executor, atx *types.VerifiedActivationTx, nonce *types.VRFPostIndex) error {
buf, err := codec.Encode(wire.ActivationTxToWireV1(atx.ActivationTx))
if err != nil {
return fmt.Errorf("encode: %w", err)
}

enc := func(stmt *sql.Statement) {
stmt.BindBytes(1, atx.ID().Bytes())
stmt.BindInt64(2, int64(atx.PublishEpoch))
Expand Down Expand Up @@ -456,7 +457,7 @@ func add(db sql.Executor, atx *types.VerifiedActivationTx, nonce *types.VRFPostI
}
}

_, err = db.Exec(`
_, err := db.Exec(`
insert into atxs (id, epoch, effective_num_units, commitment_atx, nonce,
pubkey, received, base_tick_height, tick_count, sequence, coinbase,
validity, prev_id)
Expand All @@ -467,9 +468,10 @@ func add(db sql.Executor, atx *types.VerifiedActivationTx, nonce *types.VRFPostI

enc = func(stmt *sql.Statement) {
stmt.BindBytes(1, atx.ID().Bytes())
stmt.BindBytes(2, buf)
stmt.BindBytes(2, atx.Blob)
stmt.BindInt64(3, int64(atx.Version))
}
_, err = db.Exec("insert into atx_blobs (id, atx) values (?1, ?2)", enc, nil)
_, err = db.Exec("insert into atx_blobs (id, atx, version) values (?1, ?2, ?3)", enc, nil)
if err != nil {
return fmt.Errorf("insert ATX blob %v: %w", atx.ID(), err)
}
Expand Down Expand Up @@ -801,7 +803,7 @@ func PoetProofRef(ctx context.Context, db sql.Executor, id types.ATXID) (types.P
return types.PoetProofRef{}, fmt.Errorf("getting blob for %s: %w", id, err)
}

// TODO: decide about version based on publish epoch
// TODO: decide about version based the `version` column in `atx_blobs`
var atx wire.ActivationTxV1
if err := codec.Decode(blob.Bytes, &atx); err != nil {
return types.PoetProofRef{}, fmt.Errorf("decoding ATX blob: %w", err)
Expand Down
8 changes: 5 additions & 3 deletions sql/atxs/atxs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,11 @@ func TestLoadBlob(t *testing.T) {
require.Equal(t, []int{len(blob1.Bytes)}, blobSizes)

var blob2 sql.Blob
atx2, err := newAtx(sig, withPublishEpoch(1))
nodeID := types.RandomNodeID()
atx2.NodeID = &nodeID // ensure ATXs differ in size
atx2, err := newAtx(sig, func(atx *types.ActivationTx) {
nodeID := types.RandomNodeID()
atx.NodeID = &nodeID // ensure ATXs differ in size
})

require.NoError(t, err)
require.NoError(t, atxs.Add(db, atx2))
require.NoError(t, atxs.LoadBlob(ctx, db, atx2.ID().Bytes(), &blob2))
Expand Down
4 changes: 4 additions & 0 deletions sql/migrations/state/0018_atx_blob_version.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- Add version column to make it easier to decode the blob
-- to the right version of ATX.
ALTER TABLE atx_blobs ADD COLUMN version INTEGER;
UPDATE atx_blobs SET version = 1;

0 comments on commit 213a585

Please sign in to comment.