-
Notifications
You must be signed in to change notification settings - Fork 677
/
neon_node.rs
4384 lines (4010 loc) · 179 KB
/
neon_node.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation
// Copyright (C) 2020 Stacks Open Internet Foundation
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
/// Main body of code for the Stacks node and miner.
///
/// System schematic.
/// Legend:
/// |------| Thread
/// /------\ Shared memory
/// @------@ Database
/// .------. Code module
///
///
/// |------------------|
/// | RunLoop thread | [1,7]
/// | .----------. |--------------------------------------.
/// | .StacksNode. | |
/// |---.----------.---| |
/// [1,12] | | | [1] |
/// .----------------* | *---------------. |
/// | [3] | | |
/// V | V V
/// |----------------| | [9,10] |---------------| [11] |--------------------------|
/// .--- | Relayer thread | <-----------|-----------> | P2P Thread | <--- | ChainsCoordinator thread | <--.
/// | |----------------| V |---------------| |--------------------------| |
/// | | | /-------------\ [2,3] | | | | |
/// | [1] | *--------> / Globals \ <-----------*----|--------------* | [4] |
/// | | [2,3,7] /-------------\ | | |
/// | V V [5] V |
/// | |----------------| @--------------@ @------------------@ |
/// | | Miner thread | <------------------------------ @ Mempool DB @ @ Chainstate DBs @ |
/// | |----------------| [6] @--------------@ @------------------@ |
/// | ^ |
/// | [8] | |
/// *----------------------------------------------------------------------------------------* |
/// | [7] |
/// *--------------------------------------------------------------------------------------------------------*
///
/// [1] Spawns
/// [2] Synchronize unconfirmed state
/// [3] Enable/disable miner
/// [4] Processes block data
/// [5] Stores unconfirmed transactions
/// [6] Reads unconfirmed transactions
/// [7] Signals block arrival
/// [8] Store blocks and microblocks
/// [9] Pushes retrieved blocks and microblocks
/// [10] Broadcasts new blocks, microblocks, and transactions
/// [11] Notifies about new transaction attachment events
/// [12] Signals VRF key registration
///
/// When the node is running, there are 4-5 active threads at once. They are:
///
/// * **RunLoop Thread**: This is the main thread, whose code body lives in src/run_loop/neon.rs.
/// This thread is responsible for:
/// * Bootup
/// * Running the burnchain indexer
/// * Notifying the ChainsCoordinator thread when there are new burnchain blocks to process
///
/// * **Relayer Thread**: This is the thread that stores and relays blocks and microblocks. Both
/// it and the ChainsCoordinator thread are very I/O-heavy threads, and care has been taken to
/// ensure that neither one attempts to acquire a write-lock in the underlying databases.
/// Specifically, this thread directs the ChainsCoordinator thread when to process new Stacks
/// blocks, and it directs the miner thread (if running) to stop when either it or the
/// ChainsCoordinator thread needs to acquire the write-lock.
/// This thread is responsible for:
/// * Receiving new blocks and microblocks from the P2P thread via a shared channel
/// * (Sychronously) requesting the CoordinatorThread to process newly-stored Stacks blocks and
/// microblocks
/// * Building up the node's unconfirmed microblock stream state, and sharing it with the P2P
/// thread so it can answer queries about the unconfirmed microblock chain
/// * Pushing newly-discovered blocks and microblocks to the P2P thread for broadcast
/// * Registering the VRF public key for the miner
/// * Spawning the block and microblock miner threads, and stopping them if their continued
/// execution would inhibit block or microblock storage or processing.
/// * Submitting the burnchain operation to commit to a freshly-mined block
///
/// * **Miner thread**: This is the thread that actually produces new blocks and microblocks. It
/// is spawned only by the Relayer thread to carry out mining activity when the underlying
/// chainstate is not needed by either the Relayer or ChainsCoordinator threeads.
/// This thread does the following:
/// * Walk the mempool DB to build a new block or microblock
/// * Return the block or microblock to the Relayer thread
///
/// * **P2P Thread**: This is the thread that communicates with the rest of the p2p network, and
/// handles RPC requests. It is meant to do as little storage-write I/O as possible to avoid lock
/// contention with the Miner, Relayer, and ChainsCoordinator threads. In particular, it forwards
/// data it receives from the p2p thread to the Relayer thread for I/O-bound processing. At the
/// time of this writing, it still requires holding a write-lock to handle some RPC request, but
/// future work will remove this so that this thread's execution will not interfere with the
/// others. This is the only thread that does socket I/O.
/// This thread runs the PeerNetwork state machines, which include the following:
/// * Learning the node's public IP address
/// * Discovering neighbor nodes
/// * Forwarding newly-discovered blocks, microblocks, and transactions from the Relayer thread to
/// other neighbors
/// * Synchronizing block and microblock inventory state with other neighbors
/// * Downloading blocks and microblocks, and passing them to the Relayer for storage and processing
/// * Downloading transaction attachments as their hashes are discovered during block processing
/// * Synchronizing the local mempool database with other neighbors
/// (notifications for new attachments come from a shared channel in the ChainsCoordinator thread)
/// * Handling HTTP requests
///
/// * **ChainsCoordinator Thread**: This thread process sortitions and Stacks blocks and
/// microblocks, and handles PoX reorgs should they occur (this mainly happens in boot-up). It,
/// like the Relayer thread, is a very I/O-heavy thread, and it will hold a write-lock on the
/// chainstate DBs while it works. Its actions are controlled by a CoordinatorComms structure in
/// the Globals shared state, which the Relayer thread and RunLoop thread both drive (the former
/// drives Stacks blocks processing, the latter sortitions).
/// This thread is responsible for:
/// * Responding to requests from other threads to process sortitions
/// * Responding to requests from other threads to process Stacks blocks and microblocks
/// * Processing PoX chain reorgs, should they ever happen
/// * Detecting attachment creation events, and informing the P2P thread of them so it can go
/// and download them
///
/// In addition to the mempool and chainstate databases, these threads share access to a Globals
/// singleton that contains soft state shared between threads. Mainly, the Globals struct is meant
/// to store inter-thread shared singleton communication media all in one convenient struct. Each
/// thread has a handle to the struct's shared state handles. Global state includes:
/// * The global flag as to whether or not the miner thread can be running
/// * The global shutdown flag that, when set, causes all threads to terminate
/// * Sender channel endpoints that can be shared between threads
/// * Metrics about the node's behavior (e.g. number of blocks processed, etc.)
///
/// This file may be refactored in the future into a full-fledged module.
use std::cmp;
use std::collections::HashMap;
use std::collections::{HashSet, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::default::Default;
use std::mem;
use std::net::SocketAddr;
use std::sync::mpsc::{Receiver, SyncSender, TrySendError};
use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc, Mutex};
use std::time::Duration;
use std::{thread, thread::JoinHandle};
use stacks::burnchains::{db::BurnchainHeaderReader, Burnchain, BurnchainParameters, Txid};
use stacks::chainstate::burn::db::sortdb::SortitionDB;
use stacks::chainstate::burn::operations::{
leader_block_commit::{RewardSetInfo, BURN_BLOCK_MINED_AT_MODULUS},
BlockstackOperationType, LeaderBlockCommitOp, LeaderKeyRegisterOp,
};
use stacks::chainstate::burn::BlockSnapshot;
use stacks::chainstate::burn::ConsensusHash;
use stacks::chainstate::coordinator::comm::CoordinatorChannels;
use stacks::chainstate::coordinator::{get_next_recipients, OnChainRewardSetProvider};
use stacks::chainstate::stacks::address::PoxAddress;
use stacks::chainstate::stacks::db::unconfirmed::UnconfirmedTxMap;
use stacks::chainstate::stacks::db::StacksHeaderInfo;
use stacks::chainstate::stacks::db::{StacksChainState, MINER_REWARD_MATURITY};
use stacks::chainstate::stacks::Error as ChainstateError;
use stacks::chainstate::stacks::StacksPublicKey;
use stacks::chainstate::stacks::{
miner::get_mining_spend_amount, miner::signal_mining_blocked, miner::signal_mining_ready,
miner::BlockBuilderSettings, miner::MinerStatus, miner::StacksMicroblockBuilder,
StacksBlockBuilder, StacksBlockHeader,
};
use stacks::chainstate::stacks::{
CoinbasePayload, StacksBlock, StacksMicroblock, StacksTransaction, StacksTransactionSigner,
TransactionAnchorMode, TransactionPayload, TransactionVersion,
};
use stacks::codec::StacksMessageCodec;
use stacks::core::mempool::MemPoolDB;
use stacks::core::FIRST_BURNCHAIN_CONSENSUS_HASH;
use stacks::core::STACKS_EPOCH_2_1_MARKER;
use stacks::cost_estimates::metrics::CostMetric;
use stacks::cost_estimates::metrics::UnitMetric;
use stacks::cost_estimates::UnitEstimator;
use stacks::cost_estimates::{CostEstimator, FeeEstimator};
use stacks::monitoring::{increment_stx_blocks_mined_counter, update_active_miners_count_gauge};
use stacks::net::{
atlas::{AtlasConfig, AtlasDB, AttachmentInstance},
db::{LocalPeer, PeerDB},
dns::DNSClient,
dns::DNSResolver,
p2p::PeerNetwork,
relay::Relayer,
rpc::RPCHandlerArgs,
Error as NetError, NetworkResult, PeerAddress, ServiceFlags,
};
use stacks::types::chainstate::{
BlockHeaderHash, BurnchainHeaderHash, SortitionId, StacksAddress, VRFSeed,
};
use stacks::types::StacksEpochId;
use stacks::util::get_epoch_time_ms;
use stacks::util::get_epoch_time_secs;
use stacks::util::hash::{to_hex, Hash160, Sha256Sum};
use stacks::util::secp256k1::Secp256k1PrivateKey;
use stacks::util::vrf::VRFPublicKey;
use stacks::util_lib::strings::{UrlString, VecDisplay};
use stacks::vm::costs::ExecutionCost;
use crate::burnchains::bitcoin_regtest_controller::BitcoinRegtestController;
use crate::burnchains::bitcoin_regtest_controller::OngoingBlockCommit;
use crate::burnchains::make_bitcoin_indexer;
use crate::run_loop::neon::Counters;
use crate::run_loop::neon::RunLoop;
use crate::run_loop::RegisteredKey;
use crate::ChainTip;
use super::{BurnchainController, Config, EventDispatcher, Keychain};
use crate::syncctl::PoxSyncWatchdogComms;
use stacks::monitoring;
use stacks_common::types::chainstate::StacksBlockId;
use stacks_common::types::chainstate::StacksPrivateKey;
use stacks_common::util::vrf::VRFProof;
use clarity::vm::ast::ASTRules;
use clarity::vm::types::PrincipalData;
pub const RELAYER_MAX_BUFFER: usize = 100;
const VRF_MOCK_MINER_KEY: u64 = 1;
pub const BLOCK_PROCESSOR_STACK_SIZE: usize = 32 * 1024 * 1024; // 32 MB
type MinedBlocks = HashMap<BlockHeaderHash, (AssembledAnchorBlock, Secp256k1PrivateKey)>;
/// Result of running the miner thread. It could produce a Stacks block or a microblock.
enum MinerThreadResult {
Block(
AssembledAnchorBlock,
Secp256k1PrivateKey,
Option<OngoingBlockCommit>,
),
Microblock(
Result<Option<(StacksMicroblock, ExecutionCost)>, NetError>,
MinerTip,
),
}
/// Fully-assembled Stacks anchored, block as well as some extra metadata pertaining to how it was
/// linked to the burnchain and what view(s) the miner had of the burnchain before and after
/// completing the block.
#[derive(Clone)]
struct AssembledAnchorBlock {
/// Consensus hash of the parent Stacks block
parent_consensus_hash: ConsensusHash,
/// Burnchain tip's block hash when we finished mining
my_burn_hash: BurnchainHeaderHash,
/// Burnchain tip's block height when we finished mining
my_block_height: u64,
/// Burnchain tip's block hash when we started mining (could be different)
orig_burn_hash: BurnchainHeaderHash,
/// The block we produced
anchored_block: StacksBlock,
/// The attempt count of this block (multiple blocks will be attempted per burnchain block)
attempt: u64,
/// Epoch timestamp in milliseconds when we started producing the block.
tenure_begin: u128,
}
/// Command types for the relayer thread, issued to it by other threads
pub enum RelayerDirective {
/// Handle some new data that arrived on the network (such as blocks, transactions, and
/// microblocks)
HandleNetResult(NetworkResult),
/// Announce a new sortition. Process and broadcast the block if we won.
ProcessTenure(ConsensusHash, BurnchainHeaderHash, BlockHeaderHash),
/// Try to mine a block
RunTenure(RegisteredKey, BlockSnapshot, u128), // (vrf key, chain tip, time of issuance in ms)
/// Try to register a VRF public key
RegisterKey(BlockSnapshot),
/// Stop the relayer thread
Exit,
}
/// Inter-thread communication structure, shared between threads
#[derive(Clone)]
pub struct Globals {
/// Last sortition processed
last_sortition: Arc<Mutex<Option<BlockSnapshot>>>,
/// Status of the miner
miner_status: Arc<Mutex<MinerStatus>>,
/// Communication link to the coordinator thread
coord_comms: CoordinatorChannels,
/// Unconfirmed transactions (shared between the relayer and p2p threads)
unconfirmed_txs: Arc<Mutex<UnconfirmedTxMap>>,
/// Writer endpoint to the relayer thread
relay_send: SyncSender<RelayerDirective>,
/// Cointer state in the main thread
counters: Counters,
/// Connection to the PoX sync watchdog
sync_comms: PoxSyncWatchdogComms,
/// Global flag to see if we should keep running
pub should_keep_running: Arc<AtomicBool>,
/// Status of our VRF key registration state (shared between the main thread and the relayer)
leader_key_registration_state: Arc<Mutex<LeaderKeyRegistrationState>>,
}
/// Miner chain tip, on top of which to build microblocks
#[derive(Debug, Clone, PartialEq)]
pub struct MinerTip {
/// tip's consensus hash
consensus_hash: ConsensusHash,
/// tip's Stacks block header hash
block_hash: BlockHeaderHash,
/// Microblock private key to use to sign microblocks
microblock_privkey: Secp256k1PrivateKey,
/// Stacks height
stacks_height: u64,
/// burnchain height
burn_height: u64,
}
impl MinerTip {
pub fn new(
ch: ConsensusHash,
bh: BlockHeaderHash,
pk: Secp256k1PrivateKey,
stacks_height: u64,
burn_height: u64,
) -> MinerTip {
MinerTip {
consensus_hash: ch,
block_hash: bh,
microblock_privkey: pk,
stacks_height,
burn_height,
}
}
}
impl Globals {
pub fn new(
coord_comms: CoordinatorChannels,
miner_status: Arc<Mutex<MinerStatus>>,
relay_send: SyncSender<RelayerDirective>,
counters: Counters,
sync_comms: PoxSyncWatchdogComms,
should_keep_running: Arc<AtomicBool>,
) -> Globals {
Globals {
last_sortition: Arc::new(Mutex::new(None)),
miner_status,
coord_comms,
unconfirmed_txs: Arc::new(Mutex::new(UnconfirmedTxMap::new())),
relay_send,
counters,
sync_comms,
should_keep_running,
leader_key_registration_state: Arc::new(Mutex::new(
LeaderKeyRegistrationState::Inactive,
)),
}
}
/// Get the last sortition processed by the relayer thread
pub fn get_last_sortition(&self) -> Option<BlockSnapshot> {
match self.last_sortition.lock() {
Ok(sort_opt) => sort_opt.clone(),
Err(_) => {
error!("Sortition mutex poisoned!");
panic!();
}
}
}
/// Set the last sortition processed
pub fn set_last_sortition(&self, block_snapshot: BlockSnapshot) {
match self.last_sortition.lock() {
Ok(mut sortition_opt) => {
sortition_opt.replace(block_snapshot);
}
Err(_) => {
error!("Sortition mutex poisoned!");
panic!();
}
};
}
/// Get the status of the miner (blocked or ready)
pub fn get_miner_status(&self) -> Arc<Mutex<MinerStatus>> {
self.miner_status.clone()
}
/// Get the main thread's counters
pub fn get_counters(&self) -> Counters {
self.counters.clone()
}
/// Called by the relayer to pass unconfirmed txs to the p2p thread, so the p2p thread doesn't
/// need to do the disk I/O needed to instantiate the unconfirmed state trie they represent.
/// Clears the unconfirmed transactions, and replaces them with the chainstate's.
pub fn send_unconfirmed_txs(&self, chainstate: &StacksChainState) {
if let Some(ref unconfirmed) = chainstate.unconfirmed_state {
match self.unconfirmed_txs.lock() {
Ok(mut txs) => {
txs.clear();
txs.extend(unconfirmed.mined_txs.clone());
}
Err(e) => {
// can only happen due to a thread panic in the relayer
error!("FATAL: unconfirmed tx arc mutex is poisoned: {:?}", &e);
panic!();
}
};
}
}
/// Called by the p2p thread to accept the unconfirmed tx state processed by the relayer.
/// Puts the shared unconfirmed transactions to chainstate.
pub fn recv_unconfirmed_txs(&self, chainstate: &mut StacksChainState) {
if let Some(ref mut unconfirmed) = chainstate.unconfirmed_state {
match self.unconfirmed_txs.lock() {
Ok(txs) => {
unconfirmed.mined_txs.clear();
unconfirmed.mined_txs.extend(txs.clone());
}
Err(e) => {
// can only happen due to a thread panic in the relayer
error!("FATAL: unconfirmed arc mutex is poisoned: {:?}", &e);
panic!();
}
};
}
}
/// Signal system-wide stop
pub fn signal_stop(&self) {
self.should_keep_running.store(false, Ordering::SeqCst);
}
/// Should we keep running?
pub fn keep_running(&self) -> bool {
self.should_keep_running.load(Ordering::SeqCst)
}
/// Get the handle to the coordinator
pub fn coord(&self) -> &CoordinatorChannels {
&self.coord_comms
}
/// Get the current leader key registration state.
/// Called from the runloop thread and relayer thread.
fn get_leader_key_registration_state(&self) -> LeaderKeyRegistrationState {
match self.leader_key_registration_state.lock() {
Ok(state) => (*state).clone(),
Err(e) => {
// can only happen due to a thread panic in the relayer
error!("FATAL: leader key registration mutex is poisoned: {:?}", &e);
panic!();
}
}
}
/// Set the initial leader key registration state.
/// Called from the runloop thread when booting up.
fn set_initial_leader_key_registration_state(&self, new_state: LeaderKeyRegistrationState) {
match self.leader_key_registration_state.lock() {
Ok(mut state) => {
*state = new_state;
}
Err(e) => {
// can only happen due to a thread panic in the relayer
error!("FATAL: leader key registration mutex is poisoned: {:?}", &e);
panic!();
}
}
}
/// Advance the leader key registration state to pending, given a txid we just sent.
/// Only the relayer thread calls this.
fn set_pending_leader_key_registration(&self, target_block_height: u64, txid: Txid) {
match self.leader_key_registration_state.lock() {
Ok(ref mut leader_key_registration_state) => {
**leader_key_registration_state =
LeaderKeyRegistrationState::Pending(target_block_height, txid);
}
Err(_e) => {
error!("FATAL: failed to lock leader key registration state mutex");
panic!();
}
}
}
/// Advance the leader key registration state to active, given the VRF key registration ops
/// we've discovered in a given snapshot.
/// The runloop thread calls this whenever it processes a sortition.
pub fn try_activate_leader_key_registration(
&self,
burn_block_height: u64,
key_registers: Vec<LeaderKeyRegisterOp>,
) -> bool {
let mut activated = false;
match self.leader_key_registration_state.lock() {
Ok(ref mut leader_key_registration_state) => {
for op in key_registers.into_iter() {
if let LeaderKeyRegistrationState::Pending(target_block_height, txid) =
**leader_key_registration_state
{
info!(
"Received burnchain block #{} including key_register_op - {}",
burn_block_height, txid
);
if txid == op.txid {
**leader_key_registration_state =
LeaderKeyRegistrationState::Active(RegisteredKey {
target_block_height,
vrf_public_key: op.public_key,
block_height: op.block_height as u64,
op_vtxindex: op.vtxindex as u32,
});
activated = true;
} else {
debug!(
"key_register_op {} does not match our pending op {}",
txid, &op.txid
);
}
}
}
}
Err(_e) => {
error!("FATAL: failed to lock leader key registration state mutex");
panic!();
}
}
activated
}
}
/// Node implementation for both miners and followers.
/// This struct is used to set up the node proper and launch the p2p thread and relayer thread.
/// It is further used by the main thread to communicate with these two threads.
pub struct StacksNode {
/// Atlas network configuration
pub atlas_config: AtlasConfig,
/// Global inter-thread communication handle
pub globals: Globals,
/// True if we're a miner
is_miner: bool,
/// handle to the p2p thread
pub p2p_thread_handle: JoinHandle<()>,
/// handle to the relayer thread
pub relayer_thread_handle: JoinHandle<()>,
}
/// Fault injection logic to artificially increase the length of a tenure.
/// Only used in testing
#[cfg(test)]
fn fault_injection_long_tenure() {
// simulated slow block
match std::env::var("STX_TEST_SLOW_TENURE") {
Ok(tenure_str) => match tenure_str.parse::<u64>() {
Ok(tenure_time) => {
info!(
"Fault injection: sleeping for {} milliseconds to simulate a long tenure",
tenure_time
);
stacks::util::sleep_ms(tenure_time);
}
Err(_) => {
error!("Parse error for STX_TEST_SLOW_TENURE");
panic!();
}
},
_ => {}
}
}
#[cfg(not(test))]
fn fault_injection_long_tenure() {}
/// Fault injection to skip mining in this bitcoin block height
/// Only used in testing
#[cfg(test)]
fn fault_injection_skip_mining(rpc_bind: &str, target_burn_height: u64) -> bool {
match std::env::var("STACKS_DISABLE_MINER") {
Ok(disable_heights) => {
let disable_schedule: serde_json::Value =
serde_json::from_str(&disable_heights).unwrap();
let disable_schedule = disable_schedule.as_array().unwrap();
for disabled in disable_schedule {
let target_miner_rpc_bind = disabled
.get("rpc_bind")
.unwrap()
.as_str()
.unwrap()
.to_string();
if target_miner_rpc_bind != rpc_bind {
continue;
}
let target_block_heights = disabled.get("blocks").unwrap().as_array().unwrap();
for target_block_value in target_block_heights {
let target_block = target_block_value.as_i64().unwrap() as u64;
if target_block == target_burn_height {
return true;
}
}
}
return false;
}
Err(_) => {
return false;
}
}
}
#[cfg(not(test))]
fn fault_injection_skip_mining(_rpc_bind: &str, _target_burn_height: u64) -> bool {
false
}
/// Open the chainstate, and inject faults from the config file
fn open_chainstate_with_faults(config: &Config) -> Result<StacksChainState, ChainstateError> {
let stacks_chainstate_path = config.get_chainstate_path_str();
let (mut chainstate, _) = StacksChainState::open(
config.is_mainnet(),
config.burnchain.chain_id,
&stacks_chainstate_path,
Some(config.node.get_marf_opts()),
)?;
chainstate.fault_injection.hide_blocks = config.node.fault_injection_hide_blocks;
Ok(chainstate)
}
/// Types of errors that can arise during mining
enum Error {
/// Can't find the header record for the chain tip
HeaderNotFoundForChainTip,
/// Can't find the stacks block's offset in the burnchain block
WinningVtxNotFoundForChainTip,
/// Can't find the block sortition snapshot for the chain tip
SnapshotNotFoundForChainTip,
/// The burnchain tip changed while this operation was in progress
BurnchainTipChanged,
/// The coordinator channel closed
CoordinatorClosed,
}
/// Metadata required for beginning a new tenure
struct ParentStacksBlockInfo {
/// Header metadata for the Stacks block we're going to build on top of
stacks_parent_header: StacksHeaderInfo,
/// the consensus hash of the sortition that selected the Stacks block parent
parent_consensus_hash: ConsensusHash,
/// the burn block height of the sortition that selected the Stacks block parent
parent_block_burn_height: u64,
/// the total amount burned in the sortition that selected the Stacks block parent
parent_block_total_burn: u64,
/// offset in the burnchain block where the parent's block-commit was
parent_winning_vtxindex: u16,
/// nonce to use for this new block's coinbase transaction
coinbase_nonce: u64,
}
#[derive(Clone)]
enum LeaderKeyRegistrationState {
/// Not started yet
Inactive,
/// Waiting for burnchain confirmation
/// `u64` is the target block height in which we intend this key to land
/// `txid` is the burnchain transaction ID
Pending(u64, Txid),
/// Ready to go!
Active(RegisteredKey),
}
/// Relayer thread
/// * accepts network results and stores blocks and microblocks
/// * forwards new blocks, microblocks, and transactions to the p2p thread
/// * processes burnchain state
/// * if mining, runs the miner and broadcasts blocks (via a subordinate MinerThread)
pub struct RelayerThread {
/// Node config
config: Config,
/// Handle to the sortition DB (optional so we can take/replace it)
sortdb: Option<SortitionDB>,
/// Handle to the chainstate DB (optional so we can take/replace it)
chainstate: Option<StacksChainState>,
/// Handle to the mempool DB (optional so we can take/replace it)
mempool: Option<MemPoolDB>,
/// Handle to global state and inter-thread communication channels
globals: Globals,
/// Authoritative copy of the keychain state
keychain: Keychain,
/// Burnchian configuration
burnchain: Burnchain,
/// height of last VRF key registration request
last_vrf_key_burn_height: u64,
/// Set of blocks that we have mined, but are still potentially-broadcastable
last_mined_blocks: MinedBlocks,
/// client to the burnchain (used only for sending block-commits)
bitcoin_controller: BitcoinRegtestController,
/// client to the event dispatcher
event_dispatcher: EventDispatcher,
/// copy of the local peer state
local_peer: LocalPeer,
/// last time we tried to mine a block (in millis)
last_tenure_issue_time: u128,
/// last observed burnchain block height from the p2p thread (obtained from network results)
last_network_block_height: u64,
/// time at which we observed a change in the network block height (epoch time in millis)
last_network_block_height_ts: u128,
/// last observed number of downloader state-machine passes from the p2p thread (obtained from
/// network results)
last_network_download_passes: u64,
/// last observed number of inventory state-machine passes from the p2p thread (obtained from
/// network results)
last_network_inv_passes: u64,
/// minimum number of downloader state-machine passes that must take place before mining (this
/// is used to ensure that the p2p thread attempts to download new Stacks block data before
/// this thread tries to mine a block)
min_network_download_passes: u64,
/// minimum number of inventory state-machine passes that must take place before mining (this
/// is used to ensure that the p2p thread attempts to download new Stacks block data before
/// this thread tries to mine a block)
min_network_inv_passes: u64,
/// consensus hash of the last sortition we saw, even if we weren't the winner
last_tenure_consensus_hash: Option<ConsensusHash>,
/// tip of last tenure we won (used for mining microblocks)
miner_tip: Option<MinerTip>,
/// last time we mined a microblock, in millis
last_microblock_tenure_time: u128,
/// when should we run the next microblock tenure, in millis
microblock_deadline: u128,
/// cost of the last-produced microblock stream
microblock_stream_cost: ExecutionCost,
/// Inner relayer instance for forwarding broadcasted data back to the p2p thread for dispatch
/// to neighbors
relayer: Relayer,
/// handle to the subordinate miner thread
miner_thread: Option<JoinHandle<Option<MinerThreadResult>>>,
/// if true, then the last time the miner thread was launched, it was used to mine a Stacks
/// block (used to alternate between mining microblocks and Stacks blocks that confirm them)
mined_stacks_block: bool,
}
struct BlockMinerThread {
/// node config struct
config: Config,
/// handle to global state
globals: Globals,
/// copy of the node's keychain
keychain: Keychain,
/// burnchain configuration
burnchain: Burnchain,
/// Set of blocks that we have mined, but are still potentially-broadcastable
/// (copied from RelayerThread since we need the info to determine the strategy for mining the
/// next block during this tenure).
last_mined_blocks: MinedBlocks,
/// Copy of the node's last ongoing block commit from the last time this thread was run
ongoing_commit: Option<OngoingBlockCommit>,
/// Copy of the node's registered VRF key
registered_key: RegisteredKey,
/// Burnchain block snapshot at the time this thread was initialized
burn_block: BlockSnapshot,
/// Handle to the node's event dispatcher
event_dispatcher: EventDispatcher,
}
/// State representing the microblock miner.
struct MicroblockMinerThread {
/// handle to global state
globals: Globals,
/// handle to chainstate DB (optional so we can take/replace it)
chainstate: Option<StacksChainState>,
/// handle to sortition DB (optional so we can take/replace it)
sortdb: Option<SortitionDB>,
/// handle to mempool DB (optional so we can take/replace it)
mempool: Option<MemPoolDB>,
/// Handle to the node's event dispatcher
event_dispatcher: EventDispatcher,
/// Parent Stacks block's sortition's consensus hash
parent_consensus_hash: ConsensusHash,
/// Parent Stacks block's hash
parent_block_hash: BlockHeaderHash,
/// Microblock signing key
miner_key: Secp256k1PrivateKey,
/// How often to make microblocks, in milliseconds
frequency: u64,
/// Epoch timestamp, in milliseconds, when the last microblock was produced
last_mined: u128,
/// How many microblocks produced so far
quantity: u64,
/// Block budget consumed so far by this tenure (initialized to the cost of the Stacks block
/// itself; microblocks fill up the remaining budget)
cost_so_far: ExecutionCost,
/// Block builder settings for the microblock miner.
settings: BlockBuilderSettings,
}
impl MicroblockMinerThread {
/// Instantiate the miner thread state from the relayer thread.
/// May fail if:
/// * we didn't win the last sortition
/// * we couldn't open or read the DBs for some reason
/// * we couldn't find the anchored block (i.e. it's not processed yet)
pub fn from_relayer_thread(relayer_thread: &RelayerThread) -> Option<MicroblockMinerThread> {
let globals = relayer_thread.globals.clone();
let config = relayer_thread.config.clone();
let burnchain = relayer_thread.burnchain.clone();
let miner_tip = match relayer_thread.miner_tip.clone() {
Some(tip) => tip,
None => {
debug!("Relayer: cannot instantiate microblock miner: did not win Stacks tip sortition");
return None;
}
};
let stacks_chainstate_path = config.get_chainstate_path_str();
let burn_db_path = config.get_burn_db_file_path();
let cost_estimator = config
.make_cost_estimator()
.unwrap_or_else(|| Box::new(UnitEstimator));
let metric = config
.make_cost_metric()
.unwrap_or_else(|| Box::new(UnitMetric));
// NOTE: read-write access is needed in order to be able to query the recipient set.
// This is an artifact of the way the MARF is built (see #1449)
let sortdb = SortitionDB::open(&burn_db_path, true, burnchain.pox_constants.clone())
.map_err(|e| {
error!(
"Relayer: Could not open sortdb '{}' ({:?}); skipping tenure",
&burn_db_path, &e
);
e
})
.ok()?;
let mut chainstate = open_chainstate_with_faults(&config)
.map_err(|e| {
error!(
"Relayer: Could not open chainstate '{}' ({:?}); skipping microblock tenure",
&stacks_chainstate_path, &e
);
e
})
.ok()?;
let mempool = MemPoolDB::open(
config.is_mainnet(),
config.burnchain.chain_id,
&stacks_chainstate_path,
cost_estimator,
metric,
)
.expect("Database failure opening mempool");
let MinerTip {
consensus_hash: ch,
block_hash: bhh,
microblock_privkey: miner_key,
..
} = miner_tip;
debug!(
"Relayer: Instantiate microblock mining state off of {}/{}",
&ch, &bhh
);
// we won a block! proceed to build a microblock tail if we've stored it
match StacksChainState::get_anchored_block_header_info(chainstate.db(), &ch, &bhh) {
Ok(Some(_)) => {
let parent_index_hash = StacksBlockHeader::make_index_block_hash(&ch, &bhh);
let cost_so_far = if relayer_thread.microblock_stream_cost == ExecutionCost::zero()
{
// unknown cost, or this is idempotent.
StacksChainState::get_stacks_block_anchored_cost(
chainstate.db(),
&parent_index_hash,
)
.expect("FATAL: failed to get anchored block cost")
.expect("FATAL: no anchored block cost stored for processed anchored block")
} else {
relayer_thread.microblock_stream_cost.clone()
};
let frequency = config.node.microblock_frequency;
let settings =
config.make_block_builder_settings(0, true, globals.get_miner_status());
// port over unconfirmed state to this thread
chainstate.unconfirmed_state = if let Some(unconfirmed_state) =
relayer_thread.chainstate_ref().unconfirmed_state.as_ref()
{
Some(unconfirmed_state.make_readonly_owned().ok()?)
} else {
None
};
Some(MicroblockMinerThread {
globals,
chainstate: Some(chainstate),
sortdb: Some(sortdb),
mempool: Some(mempool),
event_dispatcher: relayer_thread.event_dispatcher.clone(),
parent_consensus_hash: ch.clone(),
parent_block_hash: bhh.clone(),
miner_key,
frequency,
last_mined: 0,
quantity: 0,
cost_so_far: cost_so_far,
settings,
})
}
Ok(None) => {
warn!(
"Relayer: No such anchored block: {}/{}. Cannot mine microblocks",
ch, bhh
);
None
}
Err(e) => {
warn!(
"Relayer: Failed to get anchored block cost for {}/{}: {:?}",
ch, bhh, &e
);
None
}
}
}
/// Do something with the inner chainstate DBs (borrowed mutably).
/// Used to fool the borrow-checker.
/// NOT COMPOSIBLE - WILL PANIC IF CALLED FROM WITHIN ITSELF.
fn with_chainstate<F, R>(&mut self, func: F) -> R
where
F: FnOnce(&mut Self, &mut SortitionDB, &mut StacksChainState, &mut MemPoolDB) -> R,
{
let mut sortdb = self.sortdb.take().expect("FATAL: already took sortdb");
let mut chainstate = self
.chainstate
.take()
.expect("FATAL: already took chainstate");
let mut mempool = self.mempool.take().expect("FATAL: already took mempool");
let res = func(self, &mut sortdb, &mut chainstate, &mut mempool);
self.sortdb = Some(sortdb);
self.chainstate = Some(chainstate);
self.mempool = Some(mempool);
res
}
/// Unconditionally mine one microblock.
/// Can fail if the miner thread gets cancelled (most likely cause), or if there's some kind of
/// DB error.
fn inner_mine_one_microblock(
&mut self,
sortdb: &SortitionDB,
chainstate: &mut StacksChainState,
mempool: &mut MemPoolDB,
) -> Result<StacksMicroblock, ChainstateError> {
debug!(
"Try to mine one microblock off of {}/{} (total: {})",
&self.parent_consensus_hash,
&self.parent_block_hash,
chainstate
.unconfirmed_state
.as_ref()
.map(|us| us.num_microblocks())
.unwrap_or(0)
);
let burn_height =
SortitionDB::get_block_snapshot_consensus(sortdb.conn(), &self.parent_consensus_hash)
.map_err(|e| {
error!("Failed to find block snapshot for mined block: {}", e);
e
})?
.ok_or_else(|| {
error!("Failed to find block snapshot for mined block");
ChainstateError::NoSuchBlockError
})?
.block_height;
let ast_rules = SortitionDB::get_ast_rules(sortdb.conn(), burn_height).map_err(|e| {
error!("Failed to get AST rules for microblock: {}", e);
e
})?;
let epoch_id = SortitionDB::get_stacks_epoch(sortdb.conn(), burn_height)
.map_err(|e| {
error!("Failed to get epoch for microblock: {}", e);
e
})?
.expect("FATAL: no epoch defined")