Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

attributes publisher #4

Merged
merged 29 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
03f6065
attributes publisher
jinmel Jul 1, 2024
9ce7434
add l1OriginSelector and attrBuilder to driver
jinmel Jul 1, 2024
56f6a75
remove unused field
jinmel Jul 1, 2024
25103a7
docker compose file for testing
jinmel Jul 1, 2024
8853c0e
move attribute publishing to CLsync
jinmel Jul 1, 2024
c1e2a79
wsutils
jinmel Jul 16, 2024
2cb9441
improve broadcasting
jinmel Jul 16, 2024
0712ce3
add sse server options
jinmel Aug 1, 2024
35980e0
remove wss implementation
jinmel Aug 1, 2024
03d1b5e
event server with native feeds
jinmel Aug 1, 2024
8a26c80
remove ReadTimeout and IdleTimeout
jinmel Aug 2, 2024
821cd34
switch back to CL sync
jinmel Aug 5, 2024
1640fad
verbose logging for builder-op-geth
jinmel Aug 6, 2024
31b35dc
remove event server
jinmel Aug 9, 2024
f3f43fd
add PublishPayloadAttributes flag
jinmel Aug 9, 2024
403fb23
rename SequencerPublishPayloadAttributes to SequencerPublishAttributes
jinmel Aug 12, 2024
2ba4f88
typo fix
jinmel Aug 12, 2024
9a0e248
use flags to control publish attributes
jinmel Aug 12, 2024
4c7f750
adds unit test for clsync
jinmel Aug 12, 2024
9b66f1d
cleanup debug logs
jinmel Aug 13, 2024
e1c790a
cleanup logs in clsync
jinmel Aug 13, 2024
fd07061
return error when UnmarshalBinary fails
jinmel Aug 13, 2024
27754df
set NoTxPool
jinmel Aug 13, 2024
c965a08
remove debug logs
jinmel Aug 13, 2024
807badf
restore plasma.da-service arg
jinmel Aug 13, 2024
072acf5
remove wsutil package
jinmel Aug 13, 2024
b853aaf
remove events server
jinmel Aug 13, 2024
69f4388
Add EventStreamConfig and flags
jinmel Aug 13, 2024
76b8f69
Merge remote-tracking branch 'flashbots/fjord' into fb-op-node
jinmel Aug 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bedrock-devnet/devnet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ def devnet_deploy(paths):
log.info('Bringing up `da-server`, `sentinel`.') # TODO(10141): We don't have public sentinel images yet
run_command(['docker', 'compose', 'up', '-d', 'da-server'], cwd=paths.ops_bedrock_dir, env=docker_env)

log.info('Bringing up `builders`.')
run_command(['docker', 'compose', 'up', '-d', 'builder-op-node', 'builder-op-geth'],
cwd=paths.ops_bedrock_dir,
env=docker_env)
# Fin.
log.info('Devnet ready.')

Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.1-0.20220503160820-4a35382e8fc8
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.1
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/hashicorp/raft v1.6.1
Expand All @@ -46,6 +47,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.7.0
github.com/prometheus/client_golang v1.19.1
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.1
golang.org/x/crypto v0.23.0
Expand Down Expand Up @@ -113,7 +115,6 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/graph-gophers/graphql-go v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-bexpr v0.1.11 // indirect
Expand Down Expand Up @@ -237,6 +238,7 @@ require (
golang.org/x/tools v0.21.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/protobuf v1.34.1 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ github.com/quic-go/quic-go v0.44.0 h1:So5wOr7jyO4vzL2sd8/pD9Kesciv91zSk8BoFngItQ
github.com/quic-go/quic-go v0.44.0/go.mod h1:z4cx/9Ny9UtGITIPzmPTXh1ULfOyWh4qGQlpnPcWmek=
github.com/quic-go/webtransport-go v0.8.0 h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv/cD7QFJg=
github.com/quic-go/webtransport-go v0.8.0/go.mod h1:N99tjprW432Ut5ONql/aUhSLT0YVSlwHohQsuac9WaM=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc h1:zAsgcP8MhzAbhMnB1QQ2O7ZhWYVGYSR2iVcjzQuPV+o=
github.com/r3labs/sse v0.0.0-20210224172625-26fe804710bc/go.mod h1:S8xSOnV3CgpNrWd0GQ/OoQfMtlg2uPRSuTzcSGrzwK8=
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
Expand Down Expand Up @@ -974,6 +976,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
Expand Down Expand Up @@ -1168,6 +1171,8 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
21 changes: 21 additions & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ var (
EnvVars: prefixEnvVars("RPC_ADMIN_STATE"),
Category: OperationsCategory,
}
EventStreamListenAddr = &cli.StringFlag{
Name: "eventstream.addr",
Usage: "Event stream listening address",
EnvVars: prefixEnvVars("EVENTSTREAM_ADDR"),
Value: "127.0.0.1",
Category: OperationsCategory,
}
EventStreamListenPort = &cli.IntFlag{
Name: "eventstream.port",
Usage: "Event stream listening port",
EnvVars: prefixEnvVars("EVENTSTREAM_PORT"),
Value: 9546,
Category: OperationsCategory,
}
L1TrustRPC = &cli.BoolFlag{
Name: "l1.trustrpc",
Usage: "Trust the L1 RPC, sync faster at risk of malicious/buggy RPC providing bad or inconsistent L1 data",
Expand Down Expand Up @@ -225,6 +239,12 @@ var (
Value: 4,
Category: SequencerCategory,
}
SequencerPublishAttributesFlag = &cli.BoolFlag{
Name: "sequencer.publish-attributes",
Usage: "Publish payload attributes to the event feed",
EnvVars: prefixEnvVars("SEQUENCER_PUBLISH_ATTRIBUTES"),
Value: false,
}
L1EpochPollIntervalFlag = &cli.DurationFlag{
Name: "l1.epoch-poll-interval",
Usage: "Poll interval for retrieving new L1 epoch updates such as safe and finalized block changes. Disabled if 0 or negative.",
Expand Down Expand Up @@ -412,6 +432,7 @@ var optionalFlags = []cli.Flag{
SequencerStoppedFlag,
SequencerMaxSafeLagFlag,
SequencerL1Confs,
SequencerPublishAttributesFlag,
L1EpochPollIntervalFlag,
RuntimeConfigReloadIntervalFlag,
RPCEnableAdmin,
Expand Down
10 changes: 10 additions & 0 deletions op-node/node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type Config struct {

RPC RPCConfig

EventStream EventStreamConfig

P2P p2p.SetupP2P

Metrics MetricsConfig
Expand Down Expand Up @@ -81,6 +83,9 @@ type Config struct {

// Plasma DA config
Plasma plasma.CLIConfig

// Publish Attributes
PublishPayloadAttributes bool
}

type RPCConfig struct {
Expand All @@ -89,6 +94,11 @@ type RPCConfig struct {
EnableAdmin bool
}

type EventStreamConfig struct {
ListenAddr string
ListenPort int
}

func (cfg *RPCConfig) HttpEndpoint() string {
return fmt.Sprintf("http://%s:%d", cfg.ListenAddr, cfg.ListenPort)
}
Expand Down
59 changes: 59 additions & 0 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package node

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"strconv"
"sync/atomic"
"time"

Expand All @@ -13,6 +17,7 @@ import (
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
plasma "github.com/ethereum-optimism/optimism/op-plasma"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/r3labs/sse"

"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -83,6 +88,9 @@ type OpNode struct {
// cancels execution prematurely, e.g. to halt. This may be nil.
cancel context.CancelCauseFunc
halted atomic.Bool

httpEventStream *sse.Server
httpEventStreamServer *httputil.HTTPServer
}

// The OpNode handles incoming gossip
Expand Down Expand Up @@ -148,6 +156,10 @@ func (n *OpNode) init(ctx context.Context, cfg *Config, snapshotLog log.Logger)
if err := n.initMetricsServer(cfg); err != nil {
return fmt.Errorf("failed to init the metrics server: %w", err)
}
if err := n.initHTTPEventStreamServer(cfg); err != nil {
return fmt.Errorf("failed to init the HTTP event stream server: %w", err)
}

n.metrics.RecordInfo(n.appVersion)
n.metrics.RecordUp()
n.initHeartbeat(cfg)
Expand Down Expand Up @@ -432,6 +444,32 @@ func (n *OpNode) initRPCServer(cfg *Config) error {
return nil
}

func (n *OpNode) initHTTPEventStreamServer(cfg *Config) error {
eventStream := sse.New()
eventStream.AutoReplay = false
eventStream.CreateStream("payload_attributes")

mux := http.NewServeMux()
mux.HandleFunc("/events", eventStream.HTTPHandler)
addr := net.JoinHostPort(cfg.EventStream.ListenAddr, strconv.Itoa(cfg.EventStream.ListenPort))

var err error
timeouts := httputil.HTTPTimeouts{
ReadTimeout: httputil.DefaultTimeouts.ReadTimeout,
ReadHeaderTimeout: httputil.DefaultTimeouts.ReadHeaderTimeout,
WriteTimeout: 0,
IdleTimeout: 0,
}
n.httpEventStreamServer, err = httputil.StartHTTPServer(addr, mux, httputil.WithTimeouts(timeouts))
if err != nil {
return fmt.Errorf("failed to start http event stream server: %w", err)
}
n.log.Info("Started HTTP event stream server", "addr", addr)
n.httpEventStream = eventStream

return nil
}

func (n *OpNode) initMetricsServer(cfg *Config) error {
if !cfg.Metrics.Enabled {
n.log.Info("metrics disabled")
Expand Down Expand Up @@ -581,6 +619,22 @@ func (n *OpNode) PublishL2Payload(ctx context.Context, envelope *eth.ExecutionPa
return nil
}

func (n *OpNode) PublishL2Attributes(ctx context.Context, attrs *derive.AttributesWithParent) error {
builderAttrs, err := attrs.ToBuilderPayloadAttributes()
if err != nil {
n.log.Warn("failed to convert attributes to builder attributes", "err", err)
return err
}
jsonBytes, err := json.Marshal(builderAttrs)
if err != nil {
n.log.Warn("failed to marshal payload attributes", "err", err)
return err
}
n.log.Debug("Publishing execution payload attributes on event stream", "attrs", builderAttrs, "json", string(jsonBytes))
n.httpEventStream.Publish("payload_attributes", &sse.Event{Data: jsonBytes})
return nil
}

func (n *OpNode) OnUnsafeL2Payload(ctx context.Context, from peer.ID, envelope *eth.ExecutionPayloadEnvelope) error {
// ignore if it's from ourselves
if n.p2pNode != nil && from == n.p2pNode.Host().ID() {
Expand Down Expand Up @@ -723,6 +777,11 @@ func (n *OpNode) Stop(ctx context.Context) error {
result = multierror.Append(result, fmt.Errorf("failed to close metrics server: %w", err))
}
}
if n.httpEventStreamServer != nil {
if err := n.httpEventStreamServer.Stop(ctx); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close http event stream server: %w", err))
}
}

return result.ErrorOrNil()
}
Expand Down
76 changes: 65 additions & 11 deletions op-node/rollup/clsync/clsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package clsync
import (
"context"
"errors"
"fmt"
"io"
"time"

"github.com/ethereum/go-ethereum/log"

Expand All @@ -24,23 +26,39 @@ type Engine interface {
InsertUnsafePayload(ctx context.Context, payload *eth.ExecutionPayloadEnvelope, ref eth.L2BlockRef) error
}

type Network interface {
PublishL2Attributes(ctx context.Context, attrs *derive.AttributesWithParent) error
}

type L1OriginSelector interface {
FindL1Origin(ctx context.Context, l2Head eth.L2BlockRef) (eth.L1BlockRef, error)
}

// CLSync holds on to a queue of received unsafe payloads,
// and tries to apply them to the tip of the chain when requested to.
type CLSync struct {
log log.Logger
cfg *rollup.Config
metrics Metrics
ec Engine
unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
log log.Logger
cfg *rollup.Config
metrics Metrics
ec Engine
n Network
l1OriginSelector L1OriginSelector
attrBuilder derive.AttributesBuilder
unsafePayloads *PayloadsQueue // queue of unsafe payloads, ordered by ascending block number, may have gaps and duplicates
publishAttributes bool
}

func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, ec Engine) *CLSync {
func NewCLSync(log log.Logger, cfg *rollup.Config, metrics Metrics, ec Engine, n Network, l1Origin L1OriginSelector, attrBuilder derive.AttributesBuilder, publishAttributes bool) *CLSync {
return &CLSync{
log: log,
cfg: cfg,
metrics: metrics,
ec: ec,
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
log: log,
cfg: cfg,
metrics: metrics,
ec: ec,
n: n,
l1OriginSelector: l1Origin,
attrBuilder: attrBuilder,
unsafePayloads: NewPayloadsQueue(log, maxUnsafePayloadsMemory, payloadMemSize),
publishAttributes: publishAttributes,
}
}

Expand Down Expand Up @@ -117,7 +135,43 @@ func (eq *CLSync) Proceed(ctx context.Context) error {
eq.unsafePayloads.Pop()
return err
}

if eq.publishAttributes {
err = eq.PublishAttributes(ctx, ref)
if err != nil {
eq.log.Warn("Error publishing L2 attributes", "err", err)
}
}

eq.unsafePayloads.Pop()
eq.log.Trace("Executed unsafe payload", "hash", ref.Hash, "number", ref.Number, "timestamp", ref.Time, "l1Origin", ref.L1Origin)
return nil
}

func (eq *CLSync) PublishAttributes(ctx context.Context, l2head eth.L2BlockRef) error {
l1Origin, err := eq.l1OriginSelector.FindL1Origin(ctx, l2head)
if err != nil {
return fmt.Errorf("error finding next L1 Origin: %w", err)
}

fetchCtx, cancel := context.WithTimeout(ctx, time.Millisecond*500)
defer cancel()

attrs, err := eq.attrBuilder.PreparePayloadAttributes(fetchCtx, l2head, l1Origin.ID())
if err != nil {
return fmt.Errorf("error preparing payload attributes: %w", err)
}

withParent := &derive.AttributesWithParent{
Attributes: attrs,
Parent: l2head,
IsLastInSpan: false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is IsLastInSpan used for?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the derivation pipeline inserts blocks in batch which they call it a span. If it is the last block in span, it sets that to the safe head.

if attributes.IsLastInSpan {

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are inserting unsafe payloads it should be set to false

}

err = eq.n.PublishL2Attributes(ctx, withParent)
if err != nil {
return err
}

return nil
}
Loading