diff --git a/Dockerfile b/Dockerfile index fac5777..e26af62 100644 --- a/Dockerfile +++ b/Dockerfile @@ -73,9 +73,11 @@ COPY nitro-overrides/precompiles/FheOps.go precompiles/FheOps.go COPY nitro-overrides/precompiles/precompile.go precompiles/precompile.go COPY nitro-overrides/arbos/block_processor.go arbos/block_processor.go COPY nitro-overrides/arbos/parallel_tx_processor.go arbos/parallel_tx_processor.go +COPY nitro-overrides/arbos/parse_l2.go arbos/parse_l2.go COPY nitro-overrides/gethhook/geth-hook.go gethhook/geth-hook.go COPY nitro-overrides/execution/gethexec/sequencer.go execution/gethexec/sequencer.go COPY nitro-overrides/execution/gethexec/tx_ops_graph.go execution/gethexec/tx_ops_graph.go +COPY nitro-overrides/execution/gethexec/executionengine.go execution/gethexec/executionengine.go RUN go mod tidy diff --git a/nitro-overrides/arbos/block_processor.go b/nitro-overrides/arbos/block_processor.go index ed1ba1e..5ee68b7 100644 --- a/nitro-overrides/arbos/block_processor.go +++ b/nitro-overrides/arbos/block_processor.go @@ -125,9 +125,9 @@ type SequencingHooks struct { ConditionalOptionsForTx []*arbitrum_types.ConditionalOptions // Fhenix specific - NotifyCt func(*types.Transaction, *arbitrum_types.ConditionalOptions, *fheos.PendingDecryption) - NotifyDecryptRes func(*fheos.PendingDecryption) error - OnTxSuccess func(*types.Transaction) + NotifyCt func(*types.Transaction, *arbitrum_types.ConditionalOptions, *fheos.PendingDecryption) + NotifyDecryptRes func(*fheos.PendingDecryption) error + SerializeTxDecryptRes func(*types.Transaction) ([]byte, error) } func NoopSequencingHooks() *SequencingHooks { @@ -143,7 +143,7 @@ func NoopSequencingHooks() *SequencingHooks { nil, func(*types.Transaction, *arbitrum_types.ConditionalOptions, *fheos.PendingDecryption) {}, func(*fheos.PendingDecryption) error { return nil }, - func(*types.Transaction) {}, + func(*types.Transaction) ([]byte, error) { return nil, nil }, } } @@ -403,8 +403,6 @@ func ProduceBlockAdvanced( } } continue - } else { - hooks.OnTxSuccess(tx) } if tx.Type() == types.ArbitrumInternalTxType && result.Err != nil { diff --git a/nitro-overrides/arbos/parse_l2.go b/nitro-overrides/arbos/parse_l2.go new file mode 100644 index 0000000..1d1bf9b --- /dev/null +++ b/nitro-overrides/arbos/parse_l2.go @@ -0,0 +1,411 @@ +package arbos + +import ( + "bytes" + "errors" + "fmt" + fheos "github.com/fhenixprotocol/fheos/precompiles" + "io" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/log" + "github.com/offchainlabs/nitro/arbos/arbostypes" + "github.com/offchainlabs/nitro/arbos/util" + "github.com/offchainlabs/nitro/util/arbmath" +) + +type InfallibleBatchFetcher func(batchNum uint64, batchHash common.Hash) []byte + +func ParseL2Transactions(msg *arbostypes.L1IncomingMessage, chainId *big.Int, batchFetcher InfallibleBatchFetcher) (types.Transactions, error) { + if len(msg.L2msg) > arbostypes.MaxL2MessageSize { + // ignore the message if l2msg is too large + return nil, errors.New("message too large") + } + switch msg.Header.Kind { + case arbostypes.L1MessageType_L2Message: + return parseL2Message(bytes.NewReader(msg.L2msg), msg.Header.Poster, msg.Header.Timestamp, msg.Header.RequestId, chainId, 0) + case arbostypes.L1MessageType_Initialize: + return nil, errors.New("ParseL2Transactions encounted initialize message (should've been handled explicitly at genesis)") + case arbostypes.L1MessageType_EndOfBlock: + return nil, nil + case arbostypes.L1MessageType_L2FundedByL1: + if len(msg.L2msg) < 1 { + return nil, errors.New("L2FundedByL1 message has no data") + } + if msg.Header.RequestId == nil { + return nil, errors.New("cannot issue L2 funded by L1 tx without L1 request id") + } + kind := msg.L2msg[0] + depositRequestId := crypto.Keccak256Hash(msg.Header.RequestId[:], arbmath.U256Bytes(common.Big0)) + unsignedRequestId := crypto.Keccak256Hash(msg.Header.RequestId[:], arbmath.U256Bytes(common.Big1)) + tx, err := parseUnsignedTx(bytes.NewReader(msg.L2msg[1:]), msg.Header.Poster, &unsignedRequestId, chainId, kind) + if err != nil { + return nil, err + } + deposit := types.NewTx(&types.ArbitrumDepositTx{ + ChainId: chainId, + L1RequestId: depositRequestId, + // Matches the From of parseUnsignedTx + To: msg.Header.Poster, + Value: tx.Value(), + }) + return types.Transactions{deposit, tx}, nil + case arbostypes.L1MessageType_SubmitRetryable: + tx, err := parseSubmitRetryableMessage(bytes.NewReader(msg.L2msg), msg.Header, chainId) + if err != nil { + return nil, err + } + return types.Transactions{tx}, nil + case arbostypes.L1MessageType_BatchForGasEstimation: + return nil, errors.New("L1 message type BatchForGasEstimation is unimplemented") + case arbostypes.L1MessageType_EthDeposit: + tx, err := parseEthDepositMessage(bytes.NewReader(msg.L2msg), msg.Header, chainId) + if err != nil { + return nil, err + } + return types.Transactions{tx}, nil + case arbostypes.L1MessageType_RollupEvent: + log.Debug("ignoring rollup event message") + return types.Transactions{}, nil + case arbostypes.L1MessageType_BatchPostingReport: + tx, err := parseBatchPostingReportMessage(bytes.NewReader(msg.L2msg), chainId, msg.BatchGasCost, batchFetcher) + if err != nil { + return nil, err + } + return types.Transactions{tx}, nil + case arbostypes.L1MessageType_Invalid: + // intentionally invalid message + return nil, errors.New("invalid message") + default: + // invalid message, just ignore it + return nil, fmt.Errorf("invalid message type %v", msg.Header.Kind) + } +} + +const ( + L2MessageKind_UnsignedUserTx = 0 + L2MessageKind_ContractTx = 1 + L2MessageKind_NonmutatingCall = 2 + L2MessageKind_Batch = 3 + L2MessageKind_SignedTx = 4 + // 5 is reserved + L2MessageKind_Heartbeat = 6 // deprecated + L2MessageKind_SignedCompressedTx = 7 + // 8 is reserved for BLS signed batch + L2MessageKind_SignedDecryptionTx = 9 +) + +// Warning: this does not validate the day of the week or if DST is being observed +func parseTimeOrPanic(format string, value string) time.Time { + t, err := time.Parse(format, value) + if err != nil { + panic(err) + } + return t +} + +var HeartbeatsDisabledAt = uint64(parseTimeOrPanic(time.RFC1123, "Mon, 08 Aug 2022 16:00:00 GMT").Unix()) + +func parseL2Message(rd io.Reader, poster common.Address, timestamp uint64, requestId *common.Hash, chainId *big.Int, depth int) (types.Transactions, error) { + var l2KindBuf [1]byte + if _, err := rd.Read(l2KindBuf[:]); err != nil { + return nil, err + } + + switch l2KindBuf[0] { + case L2MessageKind_UnsignedUserTx: + tx, err := parseUnsignedTx(rd, poster, requestId, chainId, L2MessageKind_UnsignedUserTx) + if err != nil { + return nil, err + } + return types.Transactions{tx}, nil + case L2MessageKind_ContractTx: + tx, err := parseUnsignedTx(rd, poster, requestId, chainId, L2MessageKind_ContractTx) + if err != nil { + return nil, err + } + return types.Transactions{tx}, nil + case L2MessageKind_NonmutatingCall: + return nil, errors.New("L2 message kind NonmutatingCall is unimplemented") + case L2MessageKind_Batch: + if depth >= 16 { + return nil, errors.New("L2 message batches have a max depth of 16") + } + segments := make(types.Transactions, 0) + index := big.NewInt(0) + for { + nextMsg, err := util.BytestringFromReader(rd, arbostypes.MaxL2MessageSize) + if err != nil { + // an error here means there are no further messages in the batch + // nolint:nilerr + return segments, nil + } + + var nextRequestId *common.Hash + if requestId != nil { + subRequestId := crypto.Keccak256Hash(requestId[:], arbmath.U256Bytes(index)) + nextRequestId = &subRequestId + } + nestedSegments, err := parseL2Message(bytes.NewReader(nextMsg), poster, timestamp, nextRequestId, chainId, depth+1) + if err != nil { + return nil, err + } + segments = append(segments, nestedSegments...) + index.Add(index, big.NewInt(1)) + } + case L2MessageKind_SignedTx: + newTx := new(types.Transaction) + // Safe to read in its entirety, as all input readers are limited + readBytes, err := io.ReadAll(rd) + if err != nil { + return nil, err + } + if err := newTx.UnmarshalBinary(readBytes); err != nil { + return nil, err + } + if newTx.Type() >= types.ArbitrumDepositTxType || newTx.Type() == types.BlobTxType { + // Should be unreachable for Arbitrum types due to UnmarshalBinary not accepting Arbitrum internal txs + // and we want to disallow BlobTxType since Arbitrum doesn't support EIP-4844 txs yet. + return nil, types.ErrTxTypeNotSupported + } + return types.Transactions{newTx}, nil + case L2MessageKind_Heartbeat: + if timestamp >= HeartbeatsDisabledAt { + return nil, errors.New("heartbeat messages have been disabled") + } + // do nothing + return nil, nil + case L2MessageKind_SignedCompressedTx: + return nil, errors.New("L2 message kind SignedCompressedTx is unimplemented") + case L2MessageKind_SignedDecryptionTx: + // A signed decryption tx is of the form: decryptionResults | signedTx + + // Extract the decryption results + err := fheos.LoadMultipleResolvedDecryptions(rd) + if err != nil { + return nil, err + } + + // The rest of the message is a signed tx + return parseL2Message(rd, poster, timestamp, requestId, chainId, depth) + default: + // ignore invalid message kind + return nil, fmt.Errorf("unkown L2 message kind %v", l2KindBuf[0]) + } +} + +func parseUnsignedTx(rd io.Reader, poster common.Address, requestId *common.Hash, chainId *big.Int, txKind byte) (*types.Transaction, error) { + gasLimitHash, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + gasLimitBig := gasLimitHash.Big() + if !gasLimitBig.IsUint64() { + return nil, errors.New("unsigned user tx gas limit >= 2^64") + } + gasLimit := gasLimitBig.Uint64() + + maxFeePerGas, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + + var nonce uint64 + if txKind == L2MessageKind_UnsignedUserTx { + nonceAsHash, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + nonceAsBig := nonceAsHash.Big() + if !nonceAsBig.IsUint64() { + return nil, errors.New("unsigned user tx nonce >= 2^64") + } + nonce = nonceAsBig.Uint64() + } + + to, err := util.AddressFrom256FromReader(rd) + if err != nil { + return nil, err + } + var destination *common.Address + if to != (common.Address{}) { + destination = &to + } + + value, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + + calldata, err := io.ReadAll(rd) + if err != nil { + return nil, err + } + + var inner types.TxData + + switch txKind { + case L2MessageKind_UnsignedUserTx: + inner = &types.ArbitrumUnsignedTx{ + ChainId: chainId, + From: poster, + Nonce: nonce, + GasFeeCap: maxFeePerGas.Big(), + Gas: gasLimit, + To: destination, + Value: value.Big(), + Data: calldata, + } + case L2MessageKind_ContractTx: + if requestId == nil { + return nil, errors.New("cannot issue contract tx without L1 request id") + } + inner = &types.ArbitrumContractTx{ + ChainId: chainId, + RequestId: *requestId, + From: poster, + GasFeeCap: maxFeePerGas.Big(), + Gas: gasLimit, + To: destination, + Value: value.Big(), + Data: calldata, + } + default: + return nil, errors.New("invalid L2 tx type in parseUnsignedTx") + } + + return types.NewTx(inner), nil +} + +func parseEthDepositMessage(rd io.Reader, header *arbostypes.L1IncomingMessageHeader, chainId *big.Int) (*types.Transaction, error) { + to, err := util.AddressFromReader(rd) + if err != nil { + return nil, err + } + balance, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + if header.RequestId == nil { + return nil, errors.New("cannot issue deposit tx without L1 request id") + } + tx := &types.ArbitrumDepositTx{ + ChainId: chainId, + L1RequestId: *header.RequestId, + From: header.Poster, + To: to, + Value: balance.Big(), + } + return types.NewTx(tx), nil +} + +func parseSubmitRetryableMessage(rd io.Reader, header *arbostypes.L1IncomingMessageHeader, chainId *big.Int) (*types.Transaction, error) { + retryTo, err := util.AddressFrom256FromReader(rd) + if err != nil { + return nil, err + } + pRetryTo := &retryTo + if retryTo == (common.Address{}) { + pRetryTo = nil + } + callvalue, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + depositValue, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + maxSubmissionFee, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + feeRefundAddress, err := util.AddressFrom256FromReader(rd) + if err != nil { + return nil, err + } + callvalueRefundAddress, err := util.AddressFrom256FromReader(rd) + if err != nil { + return nil, err + } + gasLimit, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + gasLimitBig := gasLimit.Big() + if !gasLimitBig.IsUint64() { + return nil, errors.New("gas limit too large") + } + maxFeePerGas, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + dataLength256, err := util.HashFromReader(rd) + if err != nil { + return nil, err + } + dataLengthBig := dataLength256.Big() + if !dataLengthBig.IsUint64() { + return nil, errors.New("data length field too large") + } + dataLength := dataLengthBig.Uint64() + if dataLength > arbostypes.MaxL2MessageSize { + return nil, errors.New("retryable data too large") + } + retryData := make([]byte, dataLength) + if dataLength > 0 { + if _, err := rd.Read(retryData); err != nil { + return nil, err + } + } + if header.RequestId == nil { + return nil, errors.New("cannot issue submit retryable tx without L1 request id") + } + tx := &types.ArbitrumSubmitRetryableTx{ + ChainId: chainId, + RequestId: *header.RequestId, + From: header.Poster, + L1BaseFee: header.L1BaseFee, + DepositValue: depositValue.Big(), + GasFeeCap: maxFeePerGas.Big(), + Gas: gasLimitBig.Uint64(), + RetryTo: pRetryTo, + RetryValue: callvalue.Big(), + Beneficiary: callvalueRefundAddress, + MaxSubmissionFee: maxSubmissionFee.Big(), + FeeRefundAddr: feeRefundAddress, + RetryData: retryData, + } + return types.NewTx(tx), err +} + +func parseBatchPostingReportMessage(rd io.Reader, chainId *big.Int, msgBatchGasCost *uint64, batchFetcher InfallibleBatchFetcher) (*types.Transaction, error) { + batchTimestamp, batchPosterAddr, batchHash, batchNum, l1BaseFee, extraGas, err := arbostypes.ParseBatchPostingReportMessageFields(rd) + if err != nil { + return nil, err + } + var batchDataGas uint64 + if msgBatchGasCost != nil { + batchDataGas = *msgBatchGasCost + } else { + batchData := batchFetcher(batchNum, batchHash) + batchDataGas = arbostypes.ComputeBatchGasCost(batchData) + } + batchDataGas = arbmath.SaturatingUAdd(batchDataGas, extraGas) + + data, err := util.PackInternalTxDataBatchPostingReport( + batchTimestamp, batchPosterAddr, batchNum, batchDataGas, l1BaseFee, + ) + if err != nil { + return nil, err + } + return types.NewTx(&types.ArbitrumInternalTx{ + ChainId: chainId, + Data: data, + // don't need to fill in the other fields, since they exist only to ensure uniqueness, and batchNum is already unique + }), nil +} diff --git a/nitro-overrides/execution/gethexec/executionengine.go b/nitro-overrides/execution/gethexec/executionengine.go new file mode 100644 index 0000000..328ea8a --- /dev/null +++ b/nitro-overrides/execution/gethexec/executionengine.go @@ -0,0 +1,972 @@ +// Copyright 2022-2024, Offchain Labs, Inc. +// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE + +//go:build !wasm +// +build !wasm + +package gethexec + +/* +#cgo CFLAGS: -g -Wall -I../../target/include/ +#cgo LDFLAGS: ${SRCDIR}/../../target/lib/libstylus.a -ldl -lm +#include "arbitrator.h" +*/ +import "C" +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "os" + "path" + "runtime/pprof" + "runtime/trace" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" + "github.com/google/uuid" + "github.com/offchainlabs/nitro/arbos" + "github.com/offchainlabs/nitro/arbos/arbosState" + "github.com/offchainlabs/nitro/arbos/arbostypes" + "github.com/offchainlabs/nitro/arbos/l1pricing" + "github.com/offchainlabs/nitro/arbos/programs" + "github.com/offchainlabs/nitro/arbutil" + "github.com/offchainlabs/nitro/execution" + "github.com/offchainlabs/nitro/util/arbmath" + "github.com/offchainlabs/nitro/util/sharedmetrics" + "github.com/offchainlabs/nitro/util/stopwaiter" +) + +var ( + l1GasPriceEstimateGauge = metrics.NewRegisteredGauge("arb/l1gasprice/estimate", nil) + baseFeeGauge = metrics.NewRegisteredGauge("arb/block/basefee", nil) + blockGasUsedHistogram = metrics.NewRegisteredHistogram("arb/block/gasused", nil, metrics.NewBoundedHistogramSample()) + txCountHistogram = metrics.NewRegisteredHistogram("arb/block/transactions/count", nil, metrics.NewBoundedHistogramSample()) + txGasUsedHistogram = metrics.NewRegisteredHistogram("arb/block/transactions/gasused", nil, metrics.NewBoundedHistogramSample()) + gasUsedSinceStartupCounter = metrics.NewRegisteredCounter("arb/gas_used", nil) +) + +type L1PriceDataOfMsg struct { + callDataUnits uint64 + cummulativeCallDataUnits uint64 + l1GasCharged uint64 + cummulativeL1GasCharged uint64 +} + +type L1PriceData struct { + mutex sync.RWMutex + startOfL1PriceDataCache arbutil.MessageIndex + endOfL1PriceDataCache arbutil.MessageIndex + msgToL1PriceData []L1PriceDataOfMsg +} + +type ExecutionEngine struct { + stopwaiter.StopWaiter + + bc *core.BlockChain + consensus execution.FullConsensusClient + recorder *BlockRecorder + + resequenceChan chan []*arbostypes.MessageWithMetadata + createBlocksMutex sync.Mutex + + newBlockNotifier chan struct{} + latestBlockMutex sync.Mutex + latestBlock *types.Block + + nextScheduledVersionCheck time.Time // protected by the createBlocksMutex + + reorgSequencing bool + + prefetchBlock bool + + cachedL1PriceData *L1PriceData +} + +func NewL1PriceData() *L1PriceData { + return &L1PriceData{ + msgToL1PriceData: []L1PriceDataOfMsg{}, + } +} + +func NewExecutionEngine(bc *core.BlockChain) (*ExecutionEngine, error) { + return &ExecutionEngine{ + bc: bc, + resequenceChan: make(chan []*arbostypes.MessageWithMetadata), + newBlockNotifier: make(chan struct{}, 1), + cachedL1PriceData: NewL1PriceData(), + }, nil +} + +func (s *ExecutionEngine) backlogCallDataUnits() uint64 { + s.cachedL1PriceData.mutex.RLock() + defer s.cachedL1PriceData.mutex.RUnlock() + + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 { + return 0 + } + return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits - + s.cachedL1PriceData.msgToL1PriceData[0].cummulativeCallDataUnits + + s.cachedL1PriceData.msgToL1PriceData[0].callDataUnits) +} + +func (s *ExecutionEngine) backlogL1GasCharged() uint64 { + s.cachedL1PriceData.mutex.RLock() + defer s.cachedL1PriceData.mutex.RUnlock() + + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 { + return 0 + } + return (s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged - + s.cachedL1PriceData.msgToL1PriceData[0].cummulativeL1GasCharged + + s.cachedL1PriceData.msgToL1PriceData[0].l1GasCharged) +} + +func (s *ExecutionEngine) MarkFeedStart(to arbutil.MessageIndex) { + s.cachedL1PriceData.mutex.Lock() + defer s.cachedL1PriceData.mutex.Unlock() + + if to < s.cachedL1PriceData.startOfL1PriceDataCache { + log.Info("trying to trim older cache which doesnt exist anymore") + } else if to >= s.cachedL1PriceData.endOfL1PriceDataCache { + s.cachedL1PriceData.startOfL1PriceDataCache = 0 + s.cachedL1PriceData.endOfL1PriceDataCache = 0 + s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{} + } else { + newStart := to - s.cachedL1PriceData.startOfL1PriceDataCache + 1 + s.cachedL1PriceData.msgToL1PriceData = s.cachedL1PriceData.msgToL1PriceData[newStart:] + s.cachedL1PriceData.startOfL1PriceDataCache = to + 1 + } +} + +func (s *ExecutionEngine) Initialize(rustCacheSize uint32) { + if rustCacheSize != 0 { + programs.ResizeWasmLruCache(rustCacheSize) + } +} + +func (s *ExecutionEngine) SetRecorder(recorder *BlockRecorder) { + if s.Started() { + panic("trying to set recorder after start") + } + if s.recorder != nil { + panic("trying to set recorder policy when already set") + } + s.recorder = recorder +} + +func (s *ExecutionEngine) EnableReorgSequencing() { + if s.Started() { + panic("trying to enable reorg sequencing after start") + } + if s.reorgSequencing { + panic("trying to enable reorg sequencing when already set") + } + s.reorgSequencing = true +} + +func (s *ExecutionEngine) EnablePrefetchBlock() { + if s.Started() { + panic("trying to enable prefetch block after start") + } + if s.prefetchBlock { + panic("trying to enable prefetch block when already set") + } + s.prefetchBlock = true +} + +func (s *ExecutionEngine) SetConsensus(consensus execution.FullConsensusClient) { + if s.Started() { + panic("trying to set transaction consensus after start") + } + if s.consensus != nil { + panic("trying to set transaction consensus when already set") + } + s.consensus = consensus +} + +func (s *ExecutionEngine) GetBatchFetcher() execution.BatchFetcher { + return s.consensus +} + +func (s *ExecutionEngine) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockHash, oldMessages []*arbostypes.MessageWithMetadata) ([]*execution.MessageResult, error) { + if count == 0 { + return nil, errors.New("cannot reorg out genesis") + } + s.createBlocksMutex.Lock() + resequencing := false + defer func() { + // if we are resequencing old messages - don't release the lock + // lock will be released by thread listening to resequenceChan + if !resequencing { + s.createBlocksMutex.Unlock() + } + }() + blockNum := s.MessageIndexToBlockNumber(count - 1) + // We can safely cast blockNum to a uint64 as it comes from MessageCountToBlockNumber + targetBlock := s.bc.GetBlockByNumber(uint64(blockNum)) + if targetBlock == nil { + log.Warn("reorg target block not found", "block", blockNum) + return nil, nil + } + + tag := s.bc.StateCache().WasmCacheTag() + // reorg Rust-side VM state + C.stylus_reorg_vm(C.uint64_t(blockNum), C.uint32_t(tag)) + + err := s.bc.ReorgToOldBlock(targetBlock) + if err != nil { + return nil, err + } + + newMessagesResults := make([]*execution.MessageResult, 0, len(oldMessages)) + for i := range newMessages { + var msgForPrefetch *arbostypes.MessageWithMetadata + if i < len(newMessages)-1 { + msgForPrefetch = &newMessages[i].MessageWithMeta + } + msgResult, err := s.digestMessageWithBlockMutex(count+arbutil.MessageIndex(i), &newMessages[i].MessageWithMeta, msgForPrefetch) + if err != nil { + return nil, err + } + newMessagesResults = append(newMessagesResults, msgResult) + } + if s.recorder != nil { + s.recorder.ReorgTo(targetBlock.Header()) + } + if len(oldMessages) > 0 { + s.resequenceChan <- oldMessages + resequencing = true + } + return newMessagesResults, nil +} + +func (s *ExecutionEngine) getCurrentHeader() (*types.Header, error) { + currentBlock := s.bc.CurrentBlock() + if currentBlock == nil { + return nil, errors.New("failed to get current block") + } + return currentBlock, nil +} + +func (s *ExecutionEngine) HeadMessageNumber() (arbutil.MessageIndex, error) { + currentHeader, err := s.getCurrentHeader() + if err != nil { + return 0, err + } + return s.BlockNumberToMessageIndex(currentHeader.Number.Uint64()) +} + +func (s *ExecutionEngine) HeadMessageNumberSync(t *testing.T) (arbutil.MessageIndex, error) { + s.createBlocksMutex.Lock() + defer s.createBlocksMutex.Unlock() + return s.HeadMessageNumber() +} + +func (s *ExecutionEngine) NextDelayedMessageNumber() (uint64, error) { + currentHeader, err := s.getCurrentHeader() + if err != nil { + return 0, err + } + return currentHeader.Nonce.Uint64(), nil +} + +func MessageFromTxes(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*arbostypes.L1IncomingMessage, error) { + var l2Message []byte + txErrors := hooks.TxErrors + + if len(txes) == 1 && txErrors[0] == nil { + data, err := MessageFromSignedTx(txes[0], hooks) + if err != nil || data == nil { + return nil, err + } + l2Message = append(l2Message, data...) + } else { + l2Message = append(l2Message, arbos.L2MessageKind_Batch) + txSizeBuf := make([]byte, 8) + for i, tx := range txes { + if txErrors[i] != nil { + continue + } + + txBytes, err := MessageFromSignedTx(tx, hooks) + if err != nil { + return nil, err + } + + // prepend transaction length + binary.BigEndian.PutUint64(txSizeBuf, uint64(len(txBytes))) + l2Message = append(l2Message, txSizeBuf...) + l2Message = append(l2Message, txBytes...) + } + } + if len(l2Message) > arbostypes.MaxL2MessageSize { + return nil, errors.New("l2message too long") + } + return &arbostypes.L1IncomingMessage{ + Header: header, + L2msg: l2Message, + }, nil +} + +// MessageFromSignedTx returns a byte-serialization of a transaction. +func MessageFromSignedTx(tx *types.Transaction, hooks *arbos.SequencingHooks) ([]byte, error) { + // The structure the encoded message is: + // if there are decryption results associated with the transaction: + // encoded_async_tx = L2MessageKind_AsyncSignedTx | async_results | L2MessageKind_AsyncSignedTx | tx_data + // Note: see SerializeTxDecryptRes for the formatting of async_results + // if there are no decryption results associated with the transaction: + // encoded_async_tx = L2MessageKind_AsyncSignedTx | tx_data + var data []byte + + decryptResults, err := hooks.SerializeTxDecryptRes(tx) + if err != nil { + log.Error("error during serialization of decryption results", "err", err) + return nil, err + } + + if decryptResults != nil { + log.Debug("decryption results found! adding to message", "tx", tx.Hash().Hex()) + data = append(data, arbos.L2MessageKind_SignedDecryptionTx) + data = append(data, decryptResults...) + } + + // Append transaction data + txBytes, err := tx.MarshalBinary() + if err != nil { + return nil, err + } + data = append(data, arbos.L2MessageKind_SignedTx) + data = append(data, txBytes...) + + return data, nil +} + +// The caller must hold the createBlocksMutex +func (s *ExecutionEngine) resequenceReorgedMessages(messages []*arbostypes.MessageWithMetadata) { + if !s.reorgSequencing { + return + } + + log.Info("Trying to resequence messages", "number", len(messages)) + lastBlockHeader, err := s.getCurrentHeader() + if err != nil { + log.Error("block header not found during resequence", "err", err) + return + } + + nextDelayedSeqNum := lastBlockHeader.Nonce.Uint64() + + for _, msg := range messages { + // Check if the message is non-nil just to be safe + if msg == nil || msg.Message == nil || msg.Message.Header == nil { + continue + } + header := msg.Message.Header + if header.RequestId != nil { + delayedSeqNum := header.RequestId.Big().Uint64() + if delayedSeqNum != nextDelayedSeqNum { + log.Info("not resequencing delayed message due to unexpected index", "expected", nextDelayedSeqNum, "found", delayedSeqNum) + continue + } + _, err := s.sequenceDelayedMessageWithBlockMutex(msg.Message, delayedSeqNum) + if err != nil { + log.Error("failed to re-sequence old delayed message removed by reorg", "err", err) + } + nextDelayedSeqNum += 1 + continue + } + if header.Kind != arbostypes.L1MessageType_L2Message || header.Poster != l1pricing.BatchPosterAddress { + // This shouldn't exist? + log.Warn("skipping non-standard sequencer message found from reorg", "header", header) + continue + } + // We don't need a batch fetcher as this is an L2 message + txes, err := arbos.ParseL2Transactions(msg.Message, s.bc.Config().ChainID, nil) + if err != nil { + log.Warn("failed to parse sequencer message found from reorg", "err", err) + continue + } + hooks := arbos.NoopSequencingHooks() + hooks.DiscardInvalidTxsEarly = true + _, err = s.sequenceTransactionsWithBlockMutex(msg.Message.Header, txes, hooks) + if err != nil { + log.Error("failed to re-sequence old user message removed by reorg", "err", err) + return + } + } +} + +func (s *ExecutionEngine) sequencerWrapper(sequencerFunc func() (*types.Block, error)) (*types.Block, error) { + attempts := 0 + for { + s.createBlocksMutex.Lock() + block, err := sequencerFunc() + s.createBlocksMutex.Unlock() + if !errors.Is(err, execution.ErrSequencerInsertLockTaken) { + return block, err + } + // We got SequencerInsertLockTaken + // option 1: there was a race, we are no longer main sequencer + chosenErr := s.consensus.ExpectChosenSequencer() + if chosenErr != nil { + return nil, chosenErr + } + // option 2: we are in a test without very orderly sequencer coordination + if !s.bc.Config().ArbitrumChainParams.AllowDebugPrecompiles { + // option 3: something weird. send warning + log.Warn("sequence transactions: insert lock takent", "attempts", attempts) + } + // options 2/3 fail after too many attempts + attempts++ + if attempts > 20 { + return nil, err + } + <-time.After(time.Millisecond * 100) + } +} + +func (s *ExecutionEngine) SequenceTransactions(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) { + return s.sequencerWrapper(func() (*types.Block, error) { + hooks.TxErrors = nil + return s.sequenceTransactionsWithBlockMutex(header, txes, hooks) + }) +} + +// SequenceTransactionsWithProfiling runs SequenceTransactions with tracing and +// CPU profiling enabled. If the block creation takes longer than 2 seconds, it +// keeps both and prints out filenames in an error log line. +func (s *ExecutionEngine) SequenceTransactionsWithProfiling(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) { + pprofBuf, traceBuf := bytes.NewBuffer(nil), bytes.NewBuffer(nil) + if err := pprof.StartCPUProfile(pprofBuf); err != nil { + log.Error("Starting CPU profiling", "error", err) + } + if err := trace.Start(traceBuf); err != nil { + log.Error("Starting tracing", "error", err) + } + start := time.Now() + res, err := s.SequenceTransactions(header, txes, hooks) + elapsed := time.Since(start) + pprof.StopCPUProfile() + trace.Stop() + if elapsed > 2*time.Second { + writeAndLog(pprofBuf, traceBuf) + return res, err + } + return res, err +} + +func writeAndLog(pprof, trace *bytes.Buffer) { + id := uuid.NewString() + pprofFile := path.Join(os.TempDir(), id+".pprof") + if err := os.WriteFile(pprofFile, pprof.Bytes(), 0o600); err != nil { + log.Error("Creating temporary file for pprof", "fileName", pprofFile, "error", err) + return + } + traceFile := path.Join(os.TempDir(), id+".trace") + if err := os.WriteFile(traceFile, trace.Bytes(), 0o600); err != nil { + log.Error("Creating temporary file for trace", "fileName", traceFile, "error", err) + return + } + log.Info("Transactions sequencing took longer than 2 seconds, created pprof and trace files", "pprof", pprofFile, "traceFile", traceFile) +} + +func (s *ExecutionEngine) sequenceTransactionsWithBlockMutex(header *arbostypes.L1IncomingMessageHeader, txes types.Transactions, hooks *arbos.SequencingHooks) (*types.Block, error) { + lastBlockHeader, err := s.getCurrentHeader() + if err != nil { + return nil, err + } + + statedb, err := s.bc.StateAt(lastBlockHeader.Root) + if err != nil { + return nil, err + } + + delayedMessagesRead := lastBlockHeader.Nonce.Uint64() + + startTime := time.Now() + block, receipts, err := arbos.ProduceBlockAdvanced( + header, + txes, + delayedMessagesRead, + lastBlockHeader, + statedb, + s.bc, + s.bc.Config(), + hooks, + false, + ) + if err != nil { + return nil, err + } + blockCalcTime := time.Since(startTime) + if len(hooks.TxErrors) != len(txes) { + return nil, fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes)) + } + + if len(receipts) == 0 { + return nil, nil + } + + allTxsErrored := true + for _, err := range hooks.TxErrors { + if err == nil { + allTxsErrored = false + break + } + } + if allTxsErrored { + return nil, nil + } + + msg, err := MessageFromTxes(header, txes, hooks) + if err != nil { + return nil, err + } + + pos, err := s.BlockNumberToMessageIndex(lastBlockHeader.Number.Uint64() + 1) + if err != nil { + return nil, err + } + + msgWithMeta := arbostypes.MessageWithMetadata{ + Message: msg, + DelayedMessagesRead: delayedMessagesRead, + } + msgResult, err := s.resultFromHeader(block.Header()) + if err != nil { + return nil, err + } + + err = s.consensus.WriteMessageFromSequencer(pos, msgWithMeta, *msgResult) + if err != nil { + return nil, err + } + + // Only write the block after we've written the messages, so if the node dies in the middle of this, + // it will naturally recover on startup by regenerating the missing block. + err = s.appendBlock(block, statedb, receipts, blockCalcTime) + if err != nil { + return nil, err + } + s.cacheL1PriceDataOfMsg(pos, receipts, block, false) + + return block, nil +} + +func (s *ExecutionEngine) SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error { + _, err := s.sequencerWrapper(func() (*types.Block, error) { + return s.sequenceDelayedMessageWithBlockMutex(message, delayedSeqNum) + }) + return err +} + +func (s *ExecutionEngine) sequenceDelayedMessageWithBlockMutex(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) (*types.Block, error) { + currentHeader, err := s.getCurrentHeader() + if err != nil { + return nil, err + } + + expectedDelayed := currentHeader.Nonce.Uint64() + + pos, err := s.BlockNumberToMessageIndex(currentHeader.Number.Uint64() + 1) + if err != nil { + return nil, err + } + + if expectedDelayed != delayedSeqNum { + return nil, fmt.Errorf("wrong delayed message sequenced got %d expected %d", delayedSeqNum, expectedDelayed) + } + + messageWithMeta := arbostypes.MessageWithMetadata{ + Message: message, + DelayedMessagesRead: delayedSeqNum + 1, + } + + startTime := time.Now() + block, statedb, receipts, err := s.createBlockFromNextMessage(&messageWithMeta, false) + if err != nil { + return nil, err + } + blockCalcTime := time.Since(startTime) + + msgResult, err := s.resultFromHeader(block.Header()) + if err != nil { + return nil, err + } + + err = s.consensus.WriteMessageFromSequencer(pos, messageWithMeta, *msgResult) + if err != nil { + return nil, err + } + + err = s.appendBlock(block, statedb, receipts, blockCalcTime) + if err != nil { + return nil, err + } + s.cacheL1PriceDataOfMsg(pos, receipts, block, true) + + log.Info("ExecutionEngine: Added DelayedMessages", "pos", pos, "delayed", delayedSeqNum, "block-header", block.Header()) + + return block, nil +} + +func (s *ExecutionEngine) GetGenesisBlockNumber() uint64 { + return s.bc.Config().ArbitrumChainParams.GenesisBlockNum +} + +func (s *ExecutionEngine) BlockNumberToMessageIndex(blockNum uint64) (arbutil.MessageIndex, error) { + genesis := s.GetGenesisBlockNumber() + if blockNum < genesis { + return 0, fmt.Errorf("blockNum %d < genesis %d", blockNum, genesis) + } + return arbutil.MessageIndex(blockNum - genesis), nil +} + +func (s *ExecutionEngine) MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) uint64 { + return uint64(messageNum) + s.GetGenesisBlockNumber() +} + +// must hold createBlockMutex +func (s *ExecutionEngine) createBlockFromNextMessage(msg *arbostypes.MessageWithMetadata, isMsgForPrefetch bool) (*types.Block, *state.StateDB, types.Receipts, error) { + currentHeader := s.bc.CurrentBlock() + if currentHeader == nil { + return nil, nil, nil, errors.New("failed to get current block header") + } + + currentBlock := s.bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64()) + if currentBlock == nil { + return nil, nil, nil, errors.New("can't find block for current header") + } + + err := s.bc.RecoverState(currentBlock) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to recover block %v state: %w", currentBlock.Number(), err) + } + + statedb, err := s.bc.StateAt(currentHeader.Root) + if err != nil { + return nil, nil, nil, err + } + statedb.StartPrefetcher("TransactionStreamer") + defer statedb.StopPrefetcher() + + batchFetcher := func(num uint64) ([]byte, error) { + data, _, err := s.consensus.FetchBatch(s.GetContext(), num) + return data, err + } + + block, receipts, err := arbos.ProduceBlock( + msg.Message, + msg.DelayedMessagesRead, + currentHeader, + statedb, + s.bc, + s.bc.Config(), + batchFetcher, + isMsgForPrefetch, + ) + + return block, statedb, receipts, err +} + +// must hold createBlockMutex +func (s *ExecutionEngine) appendBlock(block *types.Block, statedb *state.StateDB, receipts types.Receipts, duration time.Duration) error { + var logs []*types.Log + for _, receipt := range receipts { + logs = append(logs, receipt.Logs...) + } + status, err := s.bc.WriteBlockAndSetHeadWithTime(block, receipts, logs, statedb, true, duration) + if err != nil { + return err + } + if status == core.SideStatTy { + return errors.New("geth rejected block as non-canonical") + } + baseFeeGauge.Update(block.BaseFee().Int64()) + txCountHistogram.Update(int64(len(block.Transactions()) - 1)) + var blockGasused uint64 + for i := 1; i < len(receipts); i++ { + val := arbmath.SaturatingUSub(receipts[i].GasUsed, receipts[i].GasUsedForL1) + txGasUsedHistogram.Update(int64(val)) + blockGasused += val + } + blockGasUsedHistogram.Update(int64(blockGasused)) + gasUsedSinceStartupCounter.Inc(int64(blockGasused)) + s.updateL1GasPriceEstimateMetric() + return nil +} + +func (s *ExecutionEngine) resultFromHeader(header *types.Header) (*execution.MessageResult, error) { + if header == nil { + return nil, fmt.Errorf("result not found") + } + info := types.DeserializeHeaderExtraInformation(header) + return &execution.MessageResult{ + BlockHash: header.Hash(), + SendRoot: info.SendRoot, + }, nil +} + +func (s *ExecutionEngine) ResultAtPos(pos arbutil.MessageIndex) (*execution.MessageResult, error) { + return s.resultFromHeader(s.bc.GetHeaderByNumber(s.MessageIndexToBlockNumber(pos))) +} + +func (s *ExecutionEngine) updateL1GasPriceEstimateMetric() { + bc := s.bc + latestHeader := bc.CurrentBlock() + latestState, err := bc.StateAt(latestHeader.Root) + if err != nil { + log.Error("error getting latest statedb while fetching l2 Estimate of L1 GasPrice") + return + } + arbState, err := arbosState.OpenSystemArbosState(latestState, nil, true) + if err != nil { + log.Error("error opening system arbos state while fetching l2 Estimate of L1 GasPrice") + return + } + l2EstimateL1GasPrice, err := arbState.L1PricingState().PricePerUnit() + if err != nil { + log.Error("error fetching l2 Estimate of L1 GasPrice") + return + } + l1GasPriceEstimateGauge.Update(l2EstimateL1GasPrice.Int64()) +} + +func (s *ExecutionEngine) getL1PricingSurplus() (int64, error) { + bc := s.bc + latestHeader := bc.CurrentBlock() + latestState, err := bc.StateAt(latestHeader.Root) + if err != nil { + return 0, errors.New("error getting latest statedb while fetching current L1 pricing surplus") + } + arbState, err := arbosState.OpenSystemArbosState(latestState, nil, true) + if err != nil { + return 0, errors.New("error opening system arbos state while fetching current L1 pricing surplus") + } + surplus, err := arbState.L1PricingState().GetL1PricingSurplus() + if err != nil { + return 0, errors.New("error fetching current L1 pricing surplus") + } + return surplus.Int64(), nil +} + +func (s *ExecutionEngine) cacheL1PriceDataOfMsg(seqNum arbutil.MessageIndex, receipts types.Receipts, block *types.Block, blockBuiltUsingDelayedMessage bool) { + var gasUsedForL1 uint64 + var callDataUnits uint64 + if !blockBuiltUsingDelayedMessage { + // s.cachedL1PriceData tracks L1 price data for messages posted by Nitro, + // so delayed messages should not update cummulative values kept on it. + + // First transaction in every block is an Arbitrum internal transaction, + // so we skip it here. + for i := 1; i < len(receipts); i++ { + gasUsedForL1 += receipts[i].GasUsedForL1 + } + for _, tx := range block.Transactions() { + callDataUnits += tx.CalldataUnits + } + } + l1GasCharged := gasUsedForL1 * block.BaseFee().Uint64() + + s.cachedL1PriceData.mutex.Lock() + defer s.cachedL1PriceData.mutex.Unlock() + + resetCache := func() { + s.cachedL1PriceData.startOfL1PriceDataCache = seqNum + s.cachedL1PriceData.endOfL1PriceDataCache = seqNum + s.cachedL1PriceData.msgToL1PriceData = []L1PriceDataOfMsg{{ + callDataUnits: callDataUnits, + cummulativeCallDataUnits: callDataUnits, + l1GasCharged: l1GasCharged, + cummulativeL1GasCharged: l1GasCharged, + }} + } + size := len(s.cachedL1PriceData.msgToL1PriceData) + if size == 0 || + s.cachedL1PriceData.startOfL1PriceDataCache == 0 || + s.cachedL1PriceData.endOfL1PriceDataCache == 0 || + arbutil.MessageIndex(size) != s.cachedL1PriceData.endOfL1PriceDataCache-s.cachedL1PriceData.startOfL1PriceDataCache+1 { + resetCache() + return + } + if seqNum != s.cachedL1PriceData.endOfL1PriceDataCache+1 { + if seqNum > s.cachedL1PriceData.endOfL1PriceDataCache+1 { + log.Info("message position higher then current end of l1 price data cache, resetting cache to this message") + resetCache() + } else if seqNum < s.cachedL1PriceData.startOfL1PriceDataCache { + log.Info("message position lower than start of l1 price data cache, ignoring") + } else { + log.Info("message position already seen in l1 price data cache, ignoring") + } + } else { + cummulativeCallDataUnits := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeCallDataUnits + cummulativeL1GasCharged := s.cachedL1PriceData.msgToL1PriceData[size-1].cummulativeL1GasCharged + s.cachedL1PriceData.msgToL1PriceData = append(s.cachedL1PriceData.msgToL1PriceData, L1PriceDataOfMsg{ + callDataUnits: callDataUnits, + cummulativeCallDataUnits: cummulativeCallDataUnits + callDataUnits, + l1GasCharged: l1GasCharged, + cummulativeL1GasCharged: cummulativeL1GasCharged + l1GasCharged, + }) + s.cachedL1PriceData.endOfL1PriceDataCache = seqNum + } +} + +// DigestMessage is used to create a block by executing msg against the latest state and storing it. +// Also, while creating a block by executing msg against the latest state, +// in parallel, creates a block by executing msgForPrefetch (msg+1) against the latest state +// but does not store the block. +// This helps in filling the cache, so that the next block creation is faster. +func (s *ExecutionEngine) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*execution.MessageResult, error) { + if !s.createBlocksMutex.TryLock() { + return nil, errors.New("createBlock mutex held") + } + defer s.createBlocksMutex.Unlock() + return s.digestMessageWithBlockMutex(num, msg, msgForPrefetch) +} + +func (s *ExecutionEngine) digestMessageWithBlockMutex(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) (*execution.MessageResult, error) { + currentHeader, err := s.getCurrentHeader() + if err != nil { + return nil, err + } + curMsg, err := s.BlockNumberToMessageIndex(currentHeader.Number.Uint64()) + if err != nil { + return nil, err + } + if curMsg+1 != num { + return nil, fmt.Errorf("wrong message number in digest got %d expected %d", num, curMsg+1) + } + + startTime := time.Now() + if s.prefetchBlock && msgForPrefetch != nil { + go func() { + _, _, _, err := s.createBlockFromNextMessage(msgForPrefetch, true) + if err != nil { + return + } + }() + } + + block, statedb, receipts, err := s.createBlockFromNextMessage(msg, false) + if err != nil { + return nil, err + } + + err = s.appendBlock(block, statedb, receipts, time.Since(startTime)) + if err != nil { + return nil, err + } + s.cacheL1PriceDataOfMsg(num, receipts, block, false) + + if time.Now().After(s.nextScheduledVersionCheck) { + s.nextScheduledVersionCheck = time.Now().Add(time.Minute) + arbState, err := arbosState.OpenSystemArbosState(statedb, nil, true) + if err != nil { + return nil, err + } + version, timestampInt, err := arbState.GetScheduledUpgrade() + if err != nil { + return nil, err + } + var timeUntilUpgrade time.Duration + var timestamp time.Time + if timestampInt == 0 { + // This upgrade will take effect in the next block + timestamp = time.Now() + } else { + // This upgrade is scheduled for the future + timestamp = time.Unix(int64(timestampInt), 0) + timeUntilUpgrade = time.Until(timestamp) + } + maxSupportedVersion := params.ArbitrumDevTestChainConfig().ArbitrumChainParams.InitialArbOSVersion + logLevel := log.Warn + if timeUntilUpgrade < time.Hour*24 { + logLevel = log.Error + } + if version > maxSupportedVersion { + logLevel( + "you need to update your node to the latest version before this scheduled ArbOS upgrade", + "timeUntilUpgrade", timeUntilUpgrade, + "upgradeScheduledFor", timestamp, + "maxSupportedArbosVersion", maxSupportedVersion, + "pendingArbosUpgradeVersion", version, + ) + } + } + + sharedmetrics.UpdateSequenceNumberInBlockGauge(num) + s.latestBlockMutex.Lock() + s.latestBlock = block + s.latestBlockMutex.Unlock() + select { + case s.newBlockNotifier <- struct{}{}: + default: + } + + msgResult, err := s.resultFromHeader(block.Header()) + if err != nil { + return nil, err + } + return msgResult, nil +} + +func (s *ExecutionEngine) ArbOSVersionForMessageNumber(messageNum arbutil.MessageIndex) (uint64, error) { + block := s.bc.GetBlockByNumber(s.MessageIndexToBlockNumber(messageNum)) + if block == nil { + return 0, fmt.Errorf("couldn't find block for message number %d", messageNum) + } + extra := types.DeserializeHeaderExtraInformation(block.Header()) + return extra.ArbOSFormatVersion, nil +} + +func (s *ExecutionEngine) Start(ctx_in context.Context) { + s.StopWaiter.Start(ctx_in, s) + s.LaunchThread(func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case resequence := <-s.resequenceChan: + s.resequenceReorgedMessages(resequence) + s.createBlocksMutex.Unlock() + } + } + }) + s.LaunchThread(func(ctx context.Context) { + var lastBlock *types.Block + for { + select { + case <-s.newBlockNotifier: + case <-ctx.Done(): + return + } + s.latestBlockMutex.Lock() + block := s.latestBlock + s.latestBlockMutex.Unlock() + if block != nil && (lastBlock == nil || block.Hash() != lastBlock.Hash()) { + log.Info( + "created block", + "l2Block", block.Number(), + "l2BlockHash", block.Hash(), + ) + lastBlock = block + select { + case <-time.After(time.Second): + case <-ctx.Done(): + return + } + } + } + }) +} diff --git a/nitro-overrides/execution/gethexec/sequencer.go b/nitro-overrides/execution/gethexec/sequencer.go index 6b8ceec..2ad5aa1 100644 --- a/nitro-overrides/execution/gethexec/sequencer.go +++ b/nitro-overrides/execution/gethexec/sequencer.go @@ -4,7 +4,9 @@ package gethexec import ( + "bytes" "context" + "encoding/binary" "errors" "fmt" "math" @@ -16,7 +18,9 @@ import ( "sync/atomic" "time" + fheos_state "github.com/fhenixprotocol/fheos/precompiles" fheos "github.com/fhenixprotocol/fheos/precompiles/types" + "github.com/offchainlabs/nitro/arbutil" "github.com/offchainlabs/nitro/execution" "github.com/offchainlabs/nitro/util/arbmath" @@ -576,11 +580,49 @@ func (s *Sequencer) notifyDecryptRes(decryptKey *fheos.PendingDecryption) error return nil } -func (s *Sequencer) onTxSuccess(tx *types.Transaction) { +// SerializeTxDecryptRes returns a byte-serialization of all the decryption results associated with a transaction. +func (s *Sequencer) SerializeTxDecryptRes(tx *types.Transaction) ([]byte, error) { + // The structure the encoded message is: + // if there are decryption results associated with the transaction: + // encoded_results = results_count | result1 | result2 | ... + // See GetSerializedDecryptionResult for the serialization of each result + txHash := tx.Hash() - if _, ok := s.txOps.transactions[txHash]; ok { - s.txOps.ResolveTransaction(txHash) + if tx, ok := s.txOps.transactions[txHash]; ok { + if len(tx.pendingDecryptions) != 0 { + log.Error("attempted to serialize a tx with pending decryptions", "txhash", txHash) + return nil, errors.New("attempted serialization of tx with pending decryptions") + } + + if len(tx.resolvedDecryptions) <= 0 { + log.Warn("attempted to serialize the decryption results of a tx with no resolved decryptions", "txhash", txHash) + return nil, errors.New("attempted decryption results serialization of tx with no resolved decryptions") + } + + var serializedData []byte + + // Serialize the count of elements as int32 + buf := new(bytes.Buffer) + elementCount := int32(len(tx.resolvedDecryptions)) + if err := binary.Write(buf, binary.LittleEndian, elementCount); err != nil { + return nil, err + } + serializedData = append(serializedData, buf.Bytes()...) + + // serialize each decryption result + for decrypt := range tx.resolvedDecryptions { + resultSerialized, err := fheos_state.GetSerializedDecryptionResult(decrypt) + if err != nil { + return nil, err + } + + serializedData = append(serializedData, resultSerialized...) + } + + return serializedData, nil } + + return nil, nil } func (s *Sequencer) CheckHealth(ctx context.Context) error { @@ -718,7 +760,7 @@ func (s *Sequencer) makeSequencingHooks() *arbos.SequencingHooks { PostTxFilter: s.postTxFilter, NotifyCt: s.notifyCt, NotifyDecryptRes: s.notifyDecryptRes, - OnTxSuccess: s.onTxSuccess, + SerializeTxDecryptRes: s.SerializeTxDecryptRes, DiscardInvalidTxsEarly: true, TxErrors: []error{}, ConditionalOptionsForTx: nil, @@ -1037,6 +1079,9 @@ func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) { madeBlock := false for i, err := range hooks.TxErrors { if err == nil { + // If we're here, transactions with parallel decryptions were already fully dealt with and broadcast. + s.txOps.ResolveTransaction(txes[i].Hash()) + madeBlock = true } queueItem := queueItems[i] diff --git a/precompiles/state.go b/precompiles/state.go index db4722f..723c9e1 100644 --- a/precompiles/state.go +++ b/precompiles/state.go @@ -1,15 +1,17 @@ package precompiles import ( + "encoding/binary" "errors" "fmt" + "github.com/fhenixprotocol/warp-drive/fhe-driver" + "io" "os" "time" "github.com/ethereum/go-ethereum/metrics" "github.com/fhenixprotocol/fheos/precompiles/types" storage2 "github.com/fhenixprotocol/fheos/storage" - "github.com/fhenixprotocol/warp-drive/fhe-driver" ) type FheosState struct { @@ -107,3 +109,47 @@ func InitializeFheosState() error { return nil } + +func GetSerializedDecryptionResult(key types.PendingDecryption) ([]byte, error) { + if State == nil { + return nil, errors.New("fheos state is not initialized") + } + + if State.DecryptResults == nil { + return nil, errors.New("DecryptionResults is not initialized in fheos state") + } + + return State.DecryptResults.GetSerializedDecryptionResult(key) +} + +func LoadMultipleResolvedDecryptions(reader io.Reader) error { + // parse the number of resolved decryptions + var numDecryptions int32 + err := binary.Read(reader, binary.LittleEndian, &numDecryptions) + if err != nil { + return err + } + + logger.Debug("Loading resolved decryptions", "numDecryptions", numDecryptions) + + for i := int32(0); i < numDecryptions; i++ { + err = LoadResolvedDecryption(reader) + if err != nil { + return err + } + } + + return nil +} + +func LoadResolvedDecryption(reader io.Reader) error { + if State == nil { + return errors.New("fheos state is not initialized") + } + + if State.DecryptResults == nil { + return errors.New("fheos state is not initialized") + } + + return State.DecryptResults.LoadResolvedDecryption(reader) +} diff --git a/precompiles/state_test.go b/precompiles/state_test.go new file mode 100644 index 0000000..1d237be --- /dev/null +++ b/precompiles/state_test.go @@ -0,0 +1,66 @@ +package precompiles + +import ( + "bytes" + "github.com/fhenixprotocol/warp-drive/fhe-driver" + "github.com/stretchr/testify/assert" + "math/big" + "testing" + + "github.com/fhenixprotocol/fheos/precompiles/types" +) + +func TestDecryptionResultsSerialization(t *testing.T) { + t.Run("Serialize Deserialize", func(t *testing.T) { + dr := types.NewDecryptionResultsMap() + dr2 := types.NewDecryptionResultsMap() + + // Set SealOutput + key := types.PendingDecryption{Hash: fhe.Hash{1, 2, 3}, Type: types.SealOutput} + dr.SetValue(key, []byte{4, 5, 6}) + drSerialized, err := dr.GetSerializedDecryptionResult(key) + assert.NoError(t, err) + + err = dr2.LoadResolvedDecryption(bytes.NewReader(drSerialized)) + assert.NoError(t, err) + + record2, ok := dr2.Get(key) + assert.True(t, ok) + + expected, ok := dr.Get(key) + assert.Equal(t, expected.Value, record2.Value) + assert.True(t, expected.Timestamp.Equal(record2.Timestamp)) + + //Set Require + keyRequire := types.PendingDecryption{Hash: fhe.Hash{4, 5, 6}, Type: types.Require} + dr.SetValue(keyRequire, true) + drSerialized, err = dr.GetSerializedDecryptionResult(keyRequire) + assert.NoError(t, err) + + err = dr2.LoadResolvedDecryption(bytes.NewReader(drSerialized)) + assert.NoError(t, err) + + record2, ok = dr2.Get(keyRequire) + assert.True(t, ok) + + expected, ok = dr.Get(keyRequire) + assert.Equal(t, expected.Value, record2.Value) + assert.True(t, expected.Timestamp.Equal(record2.Timestamp)) + + // Set Decrypt + keyDecrypt := types.PendingDecryption{Hash: fhe.Hash{7, 8, 9}, Type: types.Decrypt} + dr.SetValue(keyDecrypt, big.NewInt(123)) + drSerialized, err = dr.GetSerializedDecryptionResult(keyDecrypt) + assert.NoError(t, err) + + err = dr2.LoadResolvedDecryption(bytes.NewReader(drSerialized)) + assert.NoError(t, err) + + record2, ok = dr2.Get(keyDecrypt) + assert.True(t, ok) + + expected, ok = dr.Get(keyDecrypt) + assert.Equal(t, expected.Value, record2.Value) + assert.True(t, expected.Timestamp.Equal(record2.Timestamp)) + }) +} diff --git a/precompiles/types/decryption_results.go b/precompiles/types/decryption_results.go index 8f9c180..b4bd390 100644 --- a/precompiles/types/decryption_results.go +++ b/precompiles/types/decryption_results.go @@ -48,21 +48,9 @@ func (dr *DecryptionResults) SetValue(key PendingDecryption, value any) error { dr.mu.Lock() defer dr.mu.Unlock() - switch key.Type { - case SealOutput: - if _, ok := value.(string); !ok { - return fmt.Errorf("value for SealOutput must be string") - } - case Require: - if _, ok := value.(bool); !ok { - return fmt.Errorf("value for Require must be bool") - } - case Decrypt: - if _, ok := value.(*big.Int); !ok { - return fmt.Errorf("value for Decrypt must be *big.Int") - } - default: - return fmt.Errorf("unknown PrecompileName") + err := assertCorrectValueType(key.Type, value) + if err != nil { + return err } dr.data[key] = DecryptionRecord{Value: value, Timestamp: time.Now()} @@ -81,6 +69,41 @@ func (dr *DecryptionResults) Get(key PendingDecryption) (DecryptionRecord, bool) return record, true } +// SetRecord is just like SetValue but sets the complete record, including timestamp +// This way timestamps could be synchronized between different nodes +func (dr *DecryptionResults) SetRecord(key PendingDecryption, record DecryptionRecord) error { + err := assertCorrectValueType(key.Type, record.Value) + if err != nil { + return err + } + + dr.mu.Lock() + defer dr.mu.Unlock() + + dr.data[key] = record + return nil +} + +func assertCorrectValueType(decryptionType PrecompileName, value any) error { + switch decryptionType { + case SealOutput: + if _, ok := value.(string); !ok { + return fmt.Errorf("value for SealOutput must be string") + } + case Require: + if _, ok := value.(bool); !ok { + return fmt.Errorf("value for Require must be bool") + } + case Decrypt: + if _, ok := value.(*big.Int); !ok { + return fmt.Errorf("value for Decrypt must be *big.Int") + } + default: + return fmt.Errorf("unknown PrecompileName") + } + return nil +} + func (dr *DecryptionResults) Remove(key PendingDecryption) { dr.mu.Lock() defer dr.mu.Unlock() diff --git a/precompiles/types/decryption_results_serialize.go b/precompiles/types/decryption_results_serialize.go new file mode 100644 index 0000000..e9dafb7 --- /dev/null +++ b/precompiles/types/decryption_results_serialize.go @@ -0,0 +1,196 @@ +package types + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "io" + "math/big" + "time" +) + +// GetSerializedDecryptionResult returns a byte-serialization of a decryption result. +func (dr *DecryptionResults) GetSerializedDecryptionResult(key PendingDecryption) ([]byte, error) { + // The structure the encoded message is: + // encoded_result = key | result_req OR result_seal OR result_decrypt + // see DecryptionRecord.Serialize for the format of each result type + result, ok := dr.Get(key) + if !ok { + return nil, errors.New("tried to serialize result of unknown decryption") + } + + if result.Value == nil { + return nil, errors.New("tried to serialize result of decryption which is still pending") + } + + serializedKey, err := key.Serialize() + if err != nil { + return nil, err + } + serializedResult, err := result.Serialize(key.Type) + if err != nil { + return nil, err + } + + return append(serializedKey, serializedResult...), nil +} + +func (dr *DecryptionResults) LoadResolvedDecryption(reader io.Reader) error { + var pendingDecryptionKey PendingDecryption + err := pendingDecryptionKey.Deserialize(reader) + if err != nil { + return err + } + + var record DecryptionRecord + err = record.Deserialize(reader, pendingDecryptionKey.Type) + if err != nil { + return err + } + + return dr.SetRecord(pendingDecryptionKey, record) +} + +// Serialize the struct into binary +func (p *PendingDecryption) Serialize() ([]byte, error) { + buf := new(bytes.Buffer) + + // Write the Hash (32 bytes) + if err := binary.Write(buf, binary.LittleEndian, p.Hash); err != nil { + return nil, err + } + + // Write the Type (int) + if err := binary.Write(buf, binary.LittleEndian, int32(p.Type)); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// Deserialize binary data into the struct +func (p *PendingDecryption) Deserialize(reader io.Reader) error { + // Read the Hash (32 bytes) + if err := binary.Read(reader, binary.LittleEndian, &p.Hash); err != nil { + return err + } + + // Read the Type (int32) + var typeVal int32 + if err := binary.Read(reader, binary.LittleEndian, &typeVal); err != nil { + return err + } + p.Type = PrecompileName(typeVal) + + return nil +} + +// Serialize a decryptionRecord into binary, based on the resultType +func (d *DecryptionRecord) Serialize(resultType PrecompileName) ([]byte, error) { + // The structure the encoded message is: + // encoded_result = result | timestamp + // where result is: + // if resultType == SealOutput: + // len(result) | result (string) + // if resultType == Require: + // result (bool) + // if resultType == Decrypt: + // len(result) | result (byte slice) + buf := new(bytes.Buffer) + + // Serialize the Value based on resultType + switch resultType { + case SealOutput: + value, ok := d.Value.(string) + if !ok { + return nil, fmt.Errorf("expected string for SealOutput") + } + // Convert the string to a byte slice + valueBytes := []byte(value) + + // Write the length of the byte slice, then the slice itself + if err := binary.Write(buf, binary.LittleEndian, int32(len(valueBytes))); err != nil { + return nil, err + } + if err := binary.Write(buf, binary.LittleEndian, valueBytes); err != nil { + return nil, err + } + case Require: + value, ok := d.Value.(bool) + if !ok { + return nil, fmt.Errorf("expected bool for Require") + } + // Write the boolean value + if err := binary.Write(buf, binary.LittleEndian, value); err != nil { + return nil, err + } + case Decrypt: + value, ok := d.Value.(*big.Int) + if !ok { + return nil, fmt.Errorf("expected *big.Int for Decrypt") + } + // Write the big.Int as a byte slice + bigIntBytes := value.Bytes() + if err := binary.Write(buf, binary.LittleEndian, int32(len(bigIntBytes))); err != nil { + return nil, err + } + if err := binary.Write(buf, binary.LittleEndian, bigIntBytes); err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("tried to serialize unsupported result type") + } + + // Serialize the Timestamp as int64 (UnixNano) + timestamp := d.Timestamp.UnixNano() + if err := binary.Write(buf, binary.LittleEndian, timestamp); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// Deserialize binary data into the struct based on the known resultType +func (d *DecryptionRecord) Deserialize(reader io.Reader, resultType PrecompileName) error { + // Deserialize the Value based on resultType + switch resultType { + case SealOutput: + var length int32 + if err := binary.Read(reader, binary.LittleEndian, &length); err != nil { + return err + } + byteSlice := make([]byte, length) + if err := binary.Read(reader, binary.LittleEndian, &byteSlice); err != nil { + return err + } + d.Value = string(byteSlice) + case Require: + var value bool + if err := binary.Read(reader, binary.LittleEndian, &value); err != nil { + return err + } + d.Value = value + case Decrypt: + var length int32 + if err := binary.Read(reader, binary.LittleEndian, &length); err != nil { + return err + } + bigIntBytes := make([]byte, length) + if err := binary.Read(reader, binary.LittleEndian, &bigIntBytes); err != nil { + return err + } + d.Value = new(big.Int).SetBytes(bigIntBytes) + default: + return fmt.Errorf("tried to deserialize unsupported type of result") + } + + // Deserialize the Timestamp as int64 and convert to time.Time + var timestamp int64 + if err := binary.Read(reader, binary.LittleEndian, ×tamp); err != nil { + return err + } + d.Timestamp = time.Unix(0, timestamp) + + return nil +}