Skip to content

Commit

Permalink
Added deduplication of WalletClosed events
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaszslabon committed Jun 10, 2024
1 parent 5a27be6 commit 73da53d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 3 deletions.
26 changes: 26 additions & 0 deletions pkg/tbtc/deduplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ const (
// DKGResultHashCachePeriod is the time period the cache maintains
// the given DKG result hash.
DKGResultHashCachePeriod = 7 * 24 * time.Hour
// WalletClosedCachePeriod is the time period the cache maintains
// the given wallet closed hash.
WalletClosedCachePeriod = 7 * 24 * time.Hour
)

// deduplicator decides whether the given event should be handled by the
Expand All @@ -31,15 +34,18 @@ const (
// Those events are supported:
// - DKG started
// - DKG result submitted
// - Wallet closed
type deduplicator struct {
dkgSeedCache *cache.TimeCache
dkgResultHashCache *cache.TimeCache
walletClosedCache *cache.TimeCache
}

func newDeduplicator() *deduplicator {
return &deduplicator{
dkgSeedCache: cache.NewTimeCache(DKGSeedCachePeriod),
dkgResultHashCache: cache.NewTimeCache(DKGResultHashCachePeriod),
walletClosedCache: cache.NewTimeCache(WalletClosedCachePeriod),
}
}

Expand Down Expand Up @@ -90,3 +96,23 @@ func (d *deduplicator) notifyDKGResultSubmitted(
// proceed with the execution.
return false
}

func (d *deduplicator) notifyWalletClosed(
WalletID [32]byte,
) bool {
d.walletClosedCache.Sweep()

// Use wallet ID converted to string as the cache key.
cacheKey := hex.EncodeToString(WalletID[:])

// If the key is not in the cache, that means the wallet closure was not
// handled yet and the client should proceed with the execution.
if !d.walletClosedCache.Has(cacheKey) {
d.walletClosedCache.Add(cacheKey)
return true
}

// Otherwise, the wallet closure is a duplicate and the client should not
// proceed with the execution.
return false
}
43 changes: 41 additions & 2 deletions pkg/tbtc/deduplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import (
"github.com/keep-network/keep-common/pkg/cache"
)

const testDKGSeedCachePeriod = 1 * time.Second
const testDKGResultHashCachePeriod = 1 * time.Second
const (
testDKGSeedCachePeriod = 1 * time.Second
testDKGResultHashCachePeriod = 1 * time.Second
testWalletClosedCachePeriod = 1 * time.Second
)

func TestNotifyDKGStarted(t *testing.T) {
deduplicator := deduplicator{
Expand Down Expand Up @@ -112,3 +115,39 @@ func TestNotifyDKGResultSubmitted(t *testing.T) {
t.Fatal("should be allowed to process")
}
}

func TestNotifyWalletClosed(t *testing.T) {
deduplicator := deduplicator{
walletClosedCache: cache.NewTimeCache(testWalletClosedCachePeriod),
}

wallet1 := [32]byte{1}
wallet2 := [32]byte{2}

// Add the first wallet ID.
canProcess := deduplicator.notifyWalletClosed(wallet1)
if !canProcess {
t.Fatal("should be allowed to process")
}

// Add the second wallet ID.
canProcess = deduplicator.notifyWalletClosed(wallet2)
if !canProcess {
t.Fatal("should be allowed to process")
}

// Add the first wallet ID before caching period elapses.
canProcess = deduplicator.notifyWalletClosed(wallet1)
if canProcess {
t.Fatal("should not be allowed to process")
}

// Wait until caching period elapses.
time.Sleep(testWalletClosedCachePeriod)

// Add the first wallet ID again.
canProcess = deduplicator.notifyWalletClosed(wallet1)
if !canProcess {
t.Fatal("should be allowed to process")
}
}
12 changes: 11 additions & 1 deletion pkg/tbtc/tbtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,17 @@ func Initialize(

_ = chain.OnWalletClosed(func(event *WalletClosedEvent) {
go func() {
// TODO: Most likely event deduplication is needed.
if ok := deduplicator.notifyWalletClosed(
event.WalletID,
); !ok {
logger.Warnf(
"Wallet closure for wallet with ID [0x%x] at block [%v] has"+
"been already processed",
event.WalletID,
event.BlockNumber,
)
return
}

logger.Infof(
"Wallet with ID [0x%x] has been closed at block [%v]",
Expand Down

0 comments on commit 73da53d

Please sign in to comment.