diff --git a/pkg/tbtc/deduplicator.go b/pkg/tbtc/deduplicator.go index a674cea370..be16d990d0 100644 --- a/pkg/tbtc/deduplicator.go +++ b/pkg/tbtc/deduplicator.go @@ -16,6 +16,9 @@ const ( // DKGResultHashCachePeriod is the time period the cache maintains // the given DKG result hash. DKGResultHashCachePeriod = 7 * 24 * time.Hour + // WalletClosedCachePeriod is the time period the cache maintains + // the given wallet closed hash. + WalletClosedCachePeriod = 7 * 24 * time.Hour ) // deduplicator decides whether the given event should be handled by the @@ -31,15 +34,18 @@ const ( // Those events are supported: // - DKG started // - DKG result submitted +// - Wallet closed type deduplicator struct { dkgSeedCache *cache.TimeCache dkgResultHashCache *cache.TimeCache + walletClosedCache *cache.TimeCache } func newDeduplicator() *deduplicator { return &deduplicator{ dkgSeedCache: cache.NewTimeCache(DKGSeedCachePeriod), dkgResultHashCache: cache.NewTimeCache(DKGResultHashCachePeriod), + walletClosedCache: cache.NewTimeCache(WalletClosedCachePeriod), } } @@ -90,3 +96,23 @@ func (d *deduplicator) notifyDKGResultSubmitted( // proceed with the execution. return false } + +func (d *deduplicator) notifyWalletClosed( + WalletID [32]byte, +) bool { + d.walletClosedCache.Sweep() + + // Use wallet ID converted to string as the cache key. + cacheKey := hex.EncodeToString(WalletID[:]) + + // If the key is not in the cache, that means the wallet closure was not + // handled yet and the client should proceed with the execution. + if !d.walletClosedCache.Has(cacheKey) { + d.walletClosedCache.Add(cacheKey) + return true + } + + // Otherwise, the wallet closure is a duplicate and the client should not + // proceed with the execution. + return false +} diff --git a/pkg/tbtc/deduplicator_test.go b/pkg/tbtc/deduplicator_test.go index 4fdeb5ec7c..b75432a8c0 100644 --- a/pkg/tbtc/deduplicator_test.go +++ b/pkg/tbtc/deduplicator_test.go @@ -9,8 +9,11 @@ import ( "github.com/keep-network/keep-common/pkg/cache" ) -const testDKGSeedCachePeriod = 1 * time.Second -const testDKGResultHashCachePeriod = 1 * time.Second +const ( + testDKGSeedCachePeriod = 1 * time.Second + testDKGResultHashCachePeriod = 1 * time.Second + testWalletClosedCachePeriod = 1 * time.Second +) func TestNotifyDKGStarted(t *testing.T) { deduplicator := deduplicator{ @@ -112,3 +115,39 @@ func TestNotifyDKGResultSubmitted(t *testing.T) { t.Fatal("should be allowed to process") } } + +func TestNotifyWalletClosed(t *testing.T) { + deduplicator := deduplicator{ + walletClosedCache: cache.NewTimeCache(testWalletClosedCachePeriod), + } + + wallet1 := [32]byte{1} + wallet2 := [32]byte{2} + + // Add the first wallet ID. + canProcess := deduplicator.notifyWalletClosed(wallet1) + if !canProcess { + t.Fatal("should be allowed to process") + } + + // Add the second wallet ID. + canProcess = deduplicator.notifyWalletClosed(wallet2) + if !canProcess { + t.Fatal("should be allowed to process") + } + + // Add the first wallet ID before caching period elapses. + canProcess = deduplicator.notifyWalletClosed(wallet1) + if canProcess { + t.Fatal("should not be allowed to process") + } + + // Wait until caching period elapses. + time.Sleep(testWalletClosedCachePeriod) + + // Add the first wallet ID again. + canProcess = deduplicator.notifyWalletClosed(wallet1) + if !canProcess { + t.Fatal("should be allowed to process") + } +} diff --git a/pkg/tbtc/tbtc.go b/pkg/tbtc/tbtc.go index 937851e98a..a341bf24f0 100644 --- a/pkg/tbtc/tbtc.go +++ b/pkg/tbtc/tbtc.go @@ -266,7 +266,17 @@ func Initialize( _ = chain.OnWalletClosed(func(event *WalletClosedEvent) { go func() { - // TODO: Most likely event deduplication is needed. + if ok := deduplicator.notifyWalletClosed( + event.WalletID, + ); !ok { + logger.Warnf( + "Wallet closure for wallet with ID [0x%x] at block [%v] has"+ + "been already processed", + event.WalletID, + event.BlockNumber, + ) + return + } logger.Infof( "Wallet with ID [0x%x] has been closed at block [%v]",