From 12e73c9b70d4e49919a59d7fd965f53cc3ddb842 Mon Sep 17 00:00:00 2001 From: Jonas Bostoen Date: Sun, 12 Jun 2022 18:59:03 -0400 Subject: [PATCH] fix: contract events --- chainservice/cached_client.go | 6 +++--- chainservice/events.go | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/chainservice/cached_client.go b/chainservice/cached_client.go index 79a4900..80ca96d 100644 --- a/chainservice/cached_client.go +++ b/chainservice/cached_client.go @@ -2,7 +2,6 @@ package chainservice import ( "context" - "fmt" "math/big" "time" @@ -120,7 +119,7 @@ func (c *CachedClient) FilterLogs(ctx context.Context, query ethereum.FilterQuer // my (big) machine almost crashed. A reasonable improvement we can make here is to use a small amount of concurrency, // e.g. 2 - 4 concurrent `eth_getLogs` requests. If this fails (context timeouts), we can both increase the block range (parts) // and decrease the number of concurrent requests. -func (c *CachedClient) SmartFilterLogs(ctx context.Context, topics [][]common.Hash, fromBlock, toBlock *big.Int) ([]types.Log, error) { +func (c *CachedClient) SmartFilterLogs(ctx context.Context, addresses []common.Address, topics [][]common.Hash, fromBlock, toBlock *big.Int) ([]types.Log, error) { var logs []types.Log parts := int64(50) @@ -138,6 +137,7 @@ func (c *CachedClient) SmartFilterLogs(ctx context.Context, topics [][]common.Ha } res, err := c.FilterLogs(ctx, ethereum.FilterQuery{ + Addresses: addresses, Topics: topics, FromBlock: big.NewInt(i), ToBlock: big.NewInt(i + chunk), @@ -161,7 +161,7 @@ func (c *CachedClient) SmartFilterLogs(ctx context.Context, topics [][]common.Ha c.logger.Debug().Int64("parts", parts).Msg("smart filter logs") logs, err := retry(parts) if err != nil { - fmt.Println(err) + c.logger.Debug().Msg(err.Error()) parts *= 2 c.logger.Debug().Msg("failed, retrying") diff --git a/chainservice/events.go b/chainservice/events.go index eb75205..c88f849 100644 --- a/chainservice/events.go +++ b/chainservice/events.go @@ -60,7 +60,7 @@ func (c ChainService) FilterEvents(query *dsl.Query, fromBlock, toBlock *big.Int ctx, cancel := context.WithTimeout(context.Background(), c.defaultTimeout) defer cancel() - logs, err := rlClient.SmartFilterLogs(ctx, [][]common.Hash{{topic}}, fromBlock, toBlock) + logs, err := rlClient.SmartFilterLogs(ctx, []common.Address{cs.Address()}, [][]common.Hash{{topic}}, fromBlock, toBlock) if err != nil { c.logger.Debug().Str("chain", string(query.Chain)).Err(err).Msg("getting logs from node") out <- apolloTypes.CallResult{ @@ -152,7 +152,7 @@ func (c ChainService) FilterGlobalEvents(query *dsl.Query, fromBlock, toBlock *b ctx, cancel := context.WithTimeout(context.Background(), c.defaultTimeout) defer cancel() - logs, err := rlClient.SmartFilterLogs(ctx, [][]common.Hash{{topic}}, fromBlock, toBlock) + logs, err := rlClient.SmartFilterLogs(ctx, nil, [][]common.Hash{{topic}}, fromBlock, toBlock) if err != nil { c.logger.Debug().Str("chain", string(query.Chain)).Err(err).Msg("getting logs from node") out <- apolloTypes.CallResult{ @@ -392,6 +392,7 @@ func (c ChainService) ListenForGlobalEvents(query *dsl.Query, res chan<- apolloT callResult := *aggregateCallResults(results...) callResult.QueryName = query.Name + callResult.Type = apolloTypes.GlobalEvent res <- callResult }(log) }