-
Notifications
You must be signed in to change notification settings - Fork 4
/
manager.rs
167 lines (144 loc) · 5.37 KB
/
manager.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
use std::collections::HashMap;
use crate::{
address::Address,
agent::{
agent_channel::{AgentChannelReceiver, AgentChannelSender},
Agent,
},
state::{
update::{EvmStateUpdate, EvmStateUpdateOutput, Update, WorldStateUpdate},
State, StateError, World,
},
summarizer::{Summarizer, SummaryGenerator},
time_policy::TimePolicy,
};
type SimManagerResult<T> = Result<T, SimManagerError>;
#[derive(thiserror::Error, Debug)]
pub enum SimManagerError {
#[error("Address collision inserting new agent: {0:?}.")]
AddressCollision(Address),
#[error("Error interacting with state.")]
StateError(#[from] StateError),
}
pub struct SimManager<U, W>
where
U: Update,
W: World<WorldUpdate = U>,
{
pub time_policy: Box<dyn TimePolicy>,
pub agents: HashMap<Address, Box<dyn Agent<U, W>>>,
pub state: State<U, W>,
pub summarizer: Summarizer<U, W>,
}
impl<U, W> SimManager<U, W>
where
U: Update,
W: World<WorldUpdate = U>,
{
pub fn new(
state: State<U, W>,
time_policy: Box<dyn TimePolicy>,
summarizer: Summarizer<U, W>,
) -> Self {
Self {
time_policy,
agents: HashMap::new(),
state,
summarizer,
}
}
pub fn get_state_mut(&mut self) -> &mut State<U, W> {
&mut self.state
}
pub fn get_state(&self) -> &State<U, W> {
&self.state
}
/// Run the time policy and agents to update the simulation environment.
pub fn run_sim(&mut self) -> SimManagerResult<()> {
// Initiate block time policy.
self.state.update_time(self.time_policy.current_time_env());
while self.time_policy.is_active() {
let (evm_update_sender, evm_update_receiver) =
crossbeam_channel::unbounded::<EvmStateUpdate>();
let (world_update_sender, world_update_receiver) =
crossbeam_channel::unbounded::<WorldStateUpdate<U>>();
let mut agent_channel_receivers = HashMap::new();
let mut agent_channel_senders = HashMap::new();
for (addr, _) in self.agents.iter_mut() {
let (evm_result_sender, evm_result_receiver) =
crossbeam_channel::unbounded::<EvmStateUpdateOutput>();
let (world_result_sender, world_result_receiver) =
crossbeam_channel::unbounded::<U>();
let channel_sender = AgentChannelSender::new(
*addr,
evm_update_sender.clone(),
evm_result_sender,
world_update_sender.clone(),
world_result_sender,
);
let channel_receiver =
AgentChannelReceiver::new(evm_result_receiver, world_result_receiver);
agent_channel_senders.insert(*addr, channel_sender);
agent_channel_receivers.insert(*addr, channel_receiver);
}
rayon::scope(|s| {
for (agent_id, agent) in self.agents.iter_mut() {
s.spawn(|_| {
agent.step(
&self.state,
agent_channel_senders
.get(agent_id)
.expect("Agent id must be in map."),
);
});
}
});
while let Ok(update) = evm_update_receiver.try_recv() {
self.state
.execute_evm_tx_state_update(update)
.map_err(SimManagerError::StateError)?;
}
while let Ok(update) = world_update_receiver.try_recv() {
self.state
.execute_world_state_update(update)
.map_err(SimManagerError::StateError)?;
}
rayon::scope(|s| {
for (addr, agent) in self.agents.iter_mut() {
s.spawn(|_| {
agent.resolve_step(
&self.state,
agent_channel_receivers
.get(addr)
.expect("Agent id must be in map."),
);
})
}
});
// Run summarizers.
self.summarizer.output_summaries(&self.state);
// Update time policy.
self.state.update_time(self.time_policy.step());
}
Ok(())
}
/// Adds and activates an agent to be put in the collection of agents under the manager's control.
pub fn activate_agent(&mut self, mut new_agent: Box<dyn Agent<U, W>>) -> SimManagerResult<()> {
// Register agent account info.
let addr = new_agent.address();
if self.agents.contains_key(&addr) {
return Err(SimManagerError::AddressCollision(addr));
}
self.state
.add_account(addr, new_agent.account_info())
.map_err(SimManagerError::StateError)?;
// Run agent's activation step.
new_agent.activation_step(&mut self.state);
// Adds agent to local map.
self.agents.insert(addr, new_agent);
Ok(())
}
pub fn register_summary_generator(&mut self, generator: Box<dyn SummaryGenerator<U, W>>) {
self.summarizer.register_summary_generator(generator)
}
}