-
Notifications
You must be signed in to change notification settings - Fork 0
/
IntervalSource.cs
80 lines (69 loc) · 3.09 KB
/
IntervalSource.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
using System;
using System.Threading.Tasks;
using Akka;
using Akka.Streams.Dsl;
using Akka.Util;
using CirclesLand.BlockchainIndexer.Util;
using Dapper;
using Nethereum.BlockchainProcessing.BlockStorage.Entities.Mapping;
using Nethereum.Hex.HexTypes;
using Nethereum.Web3;
using Npgsql;
using Prometheus;
namespace CirclesLand.BlockchainIndexer.Sources
{
/// <summary>
/// Checks the chain for new blocks at the specified interval.
/// If blocks have been missed between two executions then all
/// blocks that preceded the current one will be emitted as well (catch up).
/// </summary>
public static class IntervalSource
{
public static Source<HexBigInteger, NotUsed> Create(int intervalInMs, string connectionString, string rpcUrl)
{
return Source.UnfoldAsync(new HexBigInteger(0), async lastBlock =>
{
await using var connection = new NpgsqlConnection(connectionString);
connection.Open();
var web3 = new Web3(rpcUrl);
while (true)
{
try
{
// Determine if we need to catch up (database old)
var currentBlock = await web3.Eth.Blocks.GetBlockNumber.SendRequestAsync();
var lastIndexedBlock =
connection.QuerySingleOrDefault<long?>("select max(number) from block") ?? 0;
if (lastBlock.Value == 0)
{
lastBlock = new HexBigInteger(lastIndexedBlock == 0 ? Settings.StartFromBlock : lastIndexedBlock);
}
if (currentBlock.ToLong() > lastIndexedBlock && currentBlock.Value > lastBlock.Value)
{
var nextBlockToIndex = lastBlock.Value + 1;
Console.WriteLine($"Catching up block: {nextBlockToIndex}");
return new Option<(HexBigInteger, HexBigInteger)>((new HexBigInteger(nextBlockToIndex),
new HexBigInteger(nextBlockToIndex)));
}
await Task.Delay(intervalInMs);
// At this point we wait for a new block
currentBlock = await web3.Eth.Blocks.GetBlockNumber.SendRequestAsync();
if (currentBlock == lastBlock)
{
continue;
}
Console.WriteLine($"Got new block: {currentBlock}");
SourceMetrics.BlocksEmitted.WithLabels("interval").Inc();
return new Option<(HexBigInteger, HexBigInteger)>((currentBlock, currentBlock));
}
catch (Exception ex)
{
Logger.LogError(ex.Message);
if (ex.StackTrace != null) Logger.LogError(ex.StackTrace);
throw;
}
}
});
}
}
}