Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beacon-api: save atts to pool #12345

Merged
merged 9 commits into from
Apr 27, 2023
34 changes: 19 additions & 15 deletions beacon-chain/rpc/eth/beacon/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,17 @@ func (bs *Server) SubmitAttestations(ctx context.Context, req *ethpbv1.SubmitAtt

// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
bs.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
// Note we can't send for aggregated att because we don't have selection proof.
if !corehelpers.IsAggregated(att) {
bs.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
}

validAttestations = append(validAttestations, att)

go func() {
ctx = trace.NewContext(context.Background(), trace.FromContext(ctx))
attCopy := ethpbalpha.CopyAttestation(att)
if err := bs.AttestationsPool.SaveUnaggregatedAttestation(attCopy); err != nil {
log.WithError(err).Error("Could not handle attestation in operations service")
return
}
}()
}

broadcastFailed := false
Expand All @@ -115,6 +109,16 @@ func (bs *Server) SubmitAttestations(ctx context.Context, req *ethpbv1.SubmitAtt
if err := bs.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil {
broadcastFailed = true
}

if corehelpers.IsAggregated(att) {
if err := bs.AttestationsPool.SaveAggregatedAttestation(att); err != nil {
log.WithError(err).Error("could not save aggregated att")
}
} else {
if err := bs.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("could not save unaggregated att")
}
}
Comment on lines +113 to +121
Copy link
Contributor

@rkapka rkapka Apr 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already save in the goroutine on line 95. But we assume it's unaggregated. We also do the below assuming the attestation is unaggregated. I don't know why this assumption was made. This is probably the actual thing to fix.

bs.OperationNotifier.OperationFeed().Send(&feed.Event{
		Type: operation.UnaggregatedAttReceived,
		Data: &operation.UnAggregatedAttReceivedData{
			Attestation: att,
		},
})

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I missed that, I'll amend to that, thanks!

}
if broadcastFailed {
return nil, status.Errorf(
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/rpc/eth/beacon/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,8 @@ func TestServer_SubmitAttestations_Ok(t *testing.T) {
for _, r := range [][32]byte{actualAtt1, actualAtt2} {
assert.Equal(t, true, reflect.DeepEqual(expectedAtt1, r) || reflect.DeepEqual(expectedAtt2, r))
}

require.Equal(t, 2, s.AttestationsPool.UnaggregatedAttestationCount())
}

func TestServer_SubmitAttestations_ValidAttestationSubmitted(t *testing.T) {
Expand Down Expand Up @@ -1107,6 +1109,8 @@ func TestServer_SubmitAttestations_ValidAttestationSubmitted(t *testing.T) {
broadcastRoot, err := broadcaster.BroadcastAttestations[0].HashTreeRoot()
require.NoError(t, err)
require.DeepEqual(t, expectedAtt, broadcastRoot)

require.Equal(t, 1, s.AttestationsPool.UnaggregatedAttestationCount())
}

func TestServer_SubmitAttestations_InvalidAttestationGRPCHeader(t *testing.T) {
Expand Down