Skip to content

Commit

Permalink
feat: non-blocking submiting batch
Browse files Browse the repository at this point in the history
Applying patch from celo-org/optimism celo-org#213
  • Loading branch information
Troublor committed Sep 1, 2024
1 parent 4797ddb commit 0616fcd
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 10 deletions.
26 changes: 17 additions & 9 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (l *BatchSubmitter) loop() {

receiptsCh := make(chan txmgr.TxReceipt[txRef])
queue := txmgr.NewQueue[txRef](l.killCtx, l.Txmgr, l.Config.MaxPendingTransactions)
var daWaitGroup sync.WaitGroup

// start the receipt/result processing loop
receiptLoopDone := make(chan struct{})
Expand Down Expand Up @@ -334,9 +335,11 @@ func (l *BatchSubmitter) loop() {
defer ticker.Stop()

publishAndWait := func() {
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, &daWaitGroup)
if !l.Txmgr.IsClosed() {
queue.Wait()
l.Log.Info("Wait for pure DA writes, not L1 txs")
daWaitGroup.Wait()
l.Log.Info("Wait for L1 writes (blobs or DA commitments)")
} else {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
}
Expand Down Expand Up @@ -369,7 +372,7 @@ func (l *BatchSubmitter) loop() {
l.clearState(l.shutdownCtx)
continue
}
l.publishStateToL1(queue, receiptsCh)
l.publishStateToL1(queue, receiptsCh, &daWaitGroup)
case <-l.shutdownCtx.Done():
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, remaining channel data won't be sent")
Expand Down Expand Up @@ -426,14 +429,14 @@ func (l *BatchSubmitter) waitNodeSync() error {

// publishStateToL1 queues up all pending TxData to be published to the L1, returning when there is
// no more data to queue for publishing or if there was an error queing the data.
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) {
func (l *BatchSubmitter) publishStateToL1(queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daWaitGroup *sync.WaitGroup) {
for {
// if the txmgr is closed, we stop the transaction sending
if l.Txmgr.IsClosed() {
l.Log.Info("Txmgr is closed, aborting state publishing")
return
}
err := l.publishTxToL1(l.killCtx, queue, receiptsCh)
err := l.publishTxToL1(l.killCtx, queue, receiptsCh, daWaitGroup)
if err != nil {
if err != io.EOF {
l.Log.Error("Error publishing tx to l1", "err", err)
Expand Down Expand Up @@ -483,7 +486,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
}

// publishTxToL1 submits a single state tx to the L1
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef]) error {
func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], waitGroup *sync.WaitGroup) error {
// send all available transactions
l1tip, err := l.l1Tip(ctx)
if err != nil {
Expand All @@ -503,9 +506,14 @@ func (l *BatchSubmitter) publishTxToL1(ctx context.Context, queue *txmgr.Queue[t
return err
}

if err = l.sendTransaction(ctx, txdata, queue, receiptsCh); err != nil {
return fmt.Errorf("BatchSubmitter.sendTransaction failed: %w", err)
}
waitGroup.Add(1)
go func() {
defer waitGroup.Done()
err := l.sendTransaction(ctx, txdata, queue, receiptsCh)
if err != nil {
l.Log.Warn("BatchSubmitter.sendTransaction failed: %w", "err", err)
}
}()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/derive/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// Frames cannot be larger than 1 MB.
// Data transactions that carry frames are generally not larger than 128 KB due to L1 network conditions,
// but we leave space to grow larger anyway (gas limit allows for more data).
const MaxFrameLen = 1_000_000
const MaxFrameLen = 4_000_000

// Data Format
//
Expand Down

0 comments on commit 0616fcd

Please sign in to comment.