-
Notifications
You must be signed in to change notification settings - Fork 943
/
daser.go
173 lines (142 loc) · 4.67 KB
/
daser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package das
import (
"context"
"errors"
"fmt"
"sync/atomic"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"github.com/celestiaorg/go-fraud"
libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/p2p/shrexsub"
)
var log = logging.Logger("das")
// DASer continuously validates availability of data committed to headers.
type DASer struct {
params Parameters
da share.Availability
bcast fraud.Broadcaster
hsub libhead.Subscriber[*header.ExtendedHeader] // listens for new headers in the network
getter libhead.Getter[*header.ExtendedHeader] // retrieves past headers
sampler *samplingCoordinator
store checkpointStore
subscriber subscriber
cancel context.CancelFunc
subscriberDone chan struct{}
running int32
}
type listenFn func(context.Context, *header.ExtendedHeader)
type sampleFn func(context.Context, *header.ExtendedHeader) error
// NewDASer creates a new DASer.
func NewDASer(
da share.Availability,
hsub libhead.Subscriber[*header.ExtendedHeader],
getter libhead.Getter[*header.ExtendedHeader],
dstore datastore.Datastore,
bcast fraud.Broadcaster,
shrexBroadcast shrexsub.BroadcastFn,
options ...Option,
) (*DASer, error) {
d := &DASer{
params: DefaultParameters(),
da: da,
bcast: bcast,
hsub: hsub,
getter: getter,
store: newCheckpointStore(dstore),
subscriber: newSubscriber(),
subscriberDone: make(chan struct{}),
}
for _, applyOpt := range options {
applyOpt(d)
}
err := d.params.Validate()
if err != nil {
return nil, err
}
d.sampler = newSamplingCoordinator(d.params, getter, d.sample, shrexBroadcast)
return d, nil
}
// Start initiates subscription for new ExtendedHeaders and spawns a sampling routine.
func (d *DASer) Start(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&d.running, 0, 1) {
return fmt.Errorf("da: DASer already started")
}
sub, err := d.hsub.Subscribe()
if err != nil {
return err
}
// load latest DASed checkpoint
cp, err := d.store.load(ctx)
if err != nil {
log.Warnw("checkpoint not found, initializing with height 1")
cp = checkpoint{
SampleFrom: d.params.SampleFrom,
NetworkHead: d.params.SampleFrom,
}
// attempt to get head info. No need to handle error, later DASer
// will be able to find new head from subscriber after it is started
if h, err := d.getter.Head(ctx); err == nil {
cp.NetworkHead = uint64(h.Height())
}
}
log.Info("starting DASer from checkpoint: ", cp.String())
runCtx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
go d.sampler.run(runCtx, cp)
go d.subscriber.run(runCtx, sub, d.sampler.listen)
go d.store.runBackgroundStore(runCtx, d.params.BackgroundStoreInterval, d.sampler.getCheckpoint)
return nil
}
// Stop stops sampling.
func (d *DASer) Stop(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&d.running, 1, 0) {
return nil
}
// try to store checkpoint without waiting for coordinator and workers to stop
cp, err := d.sampler.getCheckpoint(ctx)
if err != nil {
log.Error("DASer coordinator checkpoint is unavailable")
}
if err = d.store.store(ctx, cp); err != nil {
log.Errorw("storing checkpoint to disk", "err", err)
}
d.cancel()
if err = d.sampler.wait(ctx); err != nil {
return fmt.Errorf("DASer force quit: %w", err)
}
// save updated checkpoint after sampler and all workers are shut down
if err = d.store.store(ctx, newCheckpoint(d.sampler.state.unsafeStats())); err != nil {
log.Errorw("storing checkpoint to disk", "err", err)
}
if err = d.store.wait(ctx); err != nil {
return fmt.Errorf("DASer force quit with err: %w", err)
}
return d.subscriber.wait(ctx)
}
func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
err := d.da.SharesAvailable(ctx, h.DAH)
if err != nil {
var byzantineErr *byzantine.ErrByzantine
if errors.As(err, &byzantineErr) {
log.Warn("Propagating proof...")
sendErr := d.bcast.Broadcast(ctx, byzantine.CreateBadEncodingProof(h.Hash(), uint64(h.Height()), byzantineErr))
if sendErr != nil {
log.Errorw("fraud proof propagating failed", "err", sendErr)
}
}
return err
}
return nil
}
// SamplingStats returns the current statistics over the DA sampling process.
func (d *DASer) SamplingStats(ctx context.Context) (SamplingStats, error) {
return d.sampler.stats(ctx)
}
// WaitCatchUp waits for DASer to indicate catchup is done
func (d *DASer) WaitCatchUp(ctx context.Context) error {
return d.sampler.state.waitCatchUp(ctx)
}