Skip to content

Commit

Permalink
chore: Better abstract secondary storage - observe secondary storage …
Browse files Browse the repository at this point in the history
…via metrics - cleanups
  • Loading branch information
epociask committed Oct 13, 2024
1 parent 3c3271d commit bb9b433
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 113 deletions.
2 changes: 1 addition & 1 deletion commitments/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type CommitmentMeta struct {
Mode CommitmentMode
// CertVersion is shared for all modes and denotes version of the EigenDA certificate
CertVersion byte
CertVersion uint8
}

type CommitmentMode string
Expand Down
32 changes: 23 additions & 9 deletions e2e/optimism_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/Layr-Labs/eigenda-proxy/e2e"
"github.com/Layr-Labs/eigenda-proxy/metrics"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-e2e/actions"
"github.com/ethereum-optimism/optimism/op-e2e/config"
Expand Down Expand Up @@ -166,11 +167,18 @@ func TestOptimismKeccak256Commitment(gt *testing.T) {
optimism.sequencer.ActL2PipelineFull(t)
optimism.ActL1Finalized(t)

// assert that EigenDA proxy's was written and read from
// stat := proxyTS.Server.GetS3Stats()
// assert that keccak256 primary store was written and read from
labels := metrics.BuildServerRPCLabels("put", "", "optimism_keccak256", "0")
delete(labels, "method")

ms, err := proxyTS.MetricPoller.PollMetricsWithRetry(metrics.ServerRPCStatuses, labels, 5)
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 2)

require.True(t, ms[0].Count > 0)
require.True(t, ms[1].Count > 0)

// require.Equal(t, 1, stat.Entries)
// require.Equal(t, 1, stat.Reads)
}

func TestOptimismGenericCommitment(gt *testing.T) {
Expand Down Expand Up @@ -222,9 +230,15 @@ func TestOptimismGenericCommitment(gt *testing.T) {

// assert that EigenDA proxy's was written and read from

if useMemory() {
stat := proxyTS.Server.GetEigenDAStats()
require.Equal(t, 1, stat.Entries)
require.Equal(t, 1, stat.Reads)
}
// assert that EigenDA's primary store was written and read from
labels := metrics.BuildServerRPCLabels("put", "", "optimism_generic", "0")
delete(labels, "method")

ms, err := proxyTS.MetricPoller.PollMetricsWithRetry(metrics.ServerRPCStatuses, labels, 5)
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 2)

require.True(t, ms[0].Count > 0)
require.True(t, ms[1].Count > 0)
}
35 changes: 24 additions & 11 deletions e2e/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/Layr-Labs/eigenda-proxy/client"
"github.com/Layr-Labs/eigenda-proxy/metrics"
"github.com/Layr-Labs/eigenda-proxy/store"

"github.com/Layr-Labs/eigenda-proxy/e2e"
altda "github.com/ethereum-optimism/optimism/op-alt-da"
Expand Down Expand Up @@ -349,10 +351,13 @@ func TestProxyServerCaching(t *testing.T) {
require.Equal(t, testPreimage, preimage)

// ensure that read was from cache
val, err := ts.MetricPoller.Poll("secondary.requests_total")
labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success")

ms, err := ts.MetricPoller.PollMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5)
require.NoError(t, err)
require.Len(t, ms, 1)

println(val)
require.True(t, ms[0].Count > 0)

if useMemory() { // ensure that eigenda was not read from
memStats := ts.Server.GetEigenDAStats()
Expand Down Expand Up @@ -393,12 +398,14 @@ func TestProxyServerCachingWithRedis(t *testing.T) {
require.Equal(t, testPreimage, preimage)

// ensure that read was from cache
// redStats, err := ts.Server.GetStoreStats(store.RedisBackendType)
// require.NoError(t, err)

// require.Equal(t, 1, redStats.Reads)
// require.Equal(t, 1, redStats.Entries)
labels := metrics.BuildSecondaryCountLabels(store.RedisBackendType.String(), http.MethodGet, "success")
ms, err := ts.MetricPoller.PollMetrics(metrics.SecondaryRequestStatuses, labels)
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 1)
require.True(t, ms[0].Count >= 1)

// TODO: Add metrics for EigenDA dispersal/retrieval
if useMemory() { // ensure that eigenda was not read from
memStats := ts.Server.GetEigenDAStats()
require.Equal(t, 0, memStats.Reads)
Expand All @@ -420,6 +427,7 @@ func TestProxyServerReadFallback(t *testing.T) {

t.Parallel()

// setup server with S3 as a fallback option
testCfg := e2e.TestConfig(useMemory())
testCfg.UseS3Fallback = true
testCfg.Expiration = time.Millisecond * 1
Expand Down Expand Up @@ -447,11 +455,16 @@ func TestProxyServerReadFallback(t *testing.T) {
require.NoError(t, err)
require.Equal(t, testPreimage, preimage)

// ensure that read was from fallback target location (i.e, S3 for this test)
// s3Stats := ts.Server.GetS3Stats()
// require.Equal(t, 1, s3Stats.Reads)
// require.Equal(t, 1, s3Stats.Entries)
labels := metrics.BuildSecondaryCountLabels(store.S3BackendType.String(), http.MethodGet, "success")

ms, err := ts.MetricPoller.PollMetricsWithRetry(metrics.SecondaryRequestStatuses, labels, 5)
require.NoError(t, err)
require.NotEmpty(t, ms)
require.Len(t, ms, 1)

require.True(t, ms[0].Count > 0)

// TODO - remove this in favor of metrics sampling
if useMemory() { // ensure that an eigenda read was attempted with zero data available
memStats := ts.Server.GetEigenDAStats()
require.Equal(t, 1, memStats.Reads)
Expand Down
5 changes: 2 additions & 3 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ type TestSuite struct {
Ctx context.Context
Log log.Logger
Server *server.Server
MetricPoller *metrics.MetricsPoller
MetricPoller *metrics.PollerClient
MetricSvr *httputil.HTTPServer
}

Expand All @@ -198,7 +198,6 @@ func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, fu
m,
)


require.NoError(t, err)
proxySvr := server.NewServer(host, 0, store, log, m)

Expand Down Expand Up @@ -226,7 +225,7 @@ func CreateTestSuite(t *testing.T, testSuiteCfg server.CLIConfig) (TestSuite, fu
Ctx: ctx,
Log: log,
Server: proxySvr,
MetricPoller: metrics.NewPoller(fmt.Sprintf("http://%s", m.Address())),
MetricPoller: metrics.NewPoller(fmt.Sprintf("http://%s", metricsSvr.Addr().String())),
MetricSvr: metricsSvr,
}, kill
}
Expand Down
46 changes: 6 additions & 40 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ type Metrics struct {

registry *prometheus.Registry
factory metrics.Factory

address string
}

var _ Metricer = (*Metrics)(nil)
Expand Down Expand Up @@ -91,7 +89,7 @@ func NewMetrics(subsystem string) *Metrics {
Name: "requests_total",
Help: "Total requests to the HTTP server",
}, []string{
"method", "status", "commitment_mode", "DA_cert_version",
"method", "status", "commitment_mode", "cert_version",
}),
HTTPServerBadRequestHeader: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -149,7 +147,7 @@ func (m *Metrics) RecordUp() {
// RecordRPCServerRequest is a helper method to record an incoming HTTP request.
// It bumps the requests metric, and tracks how long it takes to serve a response,
// including the HTTP status code.
func (m *Metrics) RecordRPCServerRequest(method string) func(status string, mode string, ver string) {
func (m *Metrics) RecordRPCServerRequest(method string) func(status, mode, ver string) {
// we don't want to track the status code on the histogram because that would
// create a huge number of labels, and cost a lot on cloud hosted services
timer := prometheus.NewTimer(m.HTTPServerRequestDurationSeconds.WithLabelValues(method))
Expand All @@ -159,10 +157,6 @@ func (m *Metrics) RecordRPCServerRequest(method string) func(status string, mode
}
}

func (m *Metrics) Address() string {
return m.address
}

// RecordSecondaryPut records a secondary put/get operation.
func (m *Metrics) RecordSecondaryRequest(bt string, method string) func(status string) {
timer := prometheus.NewTimer(m.SecondaryRequestDurationSec.WithLabelValues(bt))
Expand All @@ -171,50 +165,22 @@ func (m *Metrics) RecordSecondaryRequest(bt string, method string) func(status s
m.SecondaryRequestsTotal.WithLabelValues(bt, method, status).Inc()
timer.ObserveDuration()
}

}

// FindRandomOpenPort returns a random open port
func FindRandomOpenPort() (int, error) {
// Listen on a random port
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return 0, fmt.Errorf("failed to find open port: %w", err)
}
defer listener.Close()

// Get the assigned address, which includes the port
addr := listener.Addr().(*net.TCPAddr)
return addr.Port, nil
}

// StartServer starts the metrics server on the given hostname and port.
// StartServer starts the metrics server on the given hostname and port.
// If port is 0, it automatically assigns an available port and returns the actual port.
func (m *Metrics) StartServer(hostname string, port int) (*ophttp.HTTPServer, error) {
// Create a listener with the provided host and port. If port is 0, the system will assign one.
if port == 0 {
randomPort, err := FindRandomOpenPort()
if err != nil {
return nil, fmt.Errorf("failed to find open port: %w", err)
}
port = randomPort
}
m.address = net.JoinHostPort(hostname, strconv.Itoa(port))

address := net.JoinHostPort(hostname, strconv.Itoa(port))

// Set up Prometheus metrics handler
h := promhttp.InstrumentMetricHandler(
m.registry, promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{}),
)

// Start the HTTP server using the listener, so we can control the actual port
server, err := ophttp.StartHTTPServer(m.address, h)

server, err := ophttp.StartHTTPServer(address, h)
if err != nil {
return nil, fmt.Errorf("failed to start HTTP server: %v", err)
return nil, fmt.Errorf("failed to start HTTP server: %w", err)
}

// Return the actual port the server is bound to
return server, nil
}

Expand Down
Loading

0 comments on commit bb9b433

Please sign in to comment.