Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 16, 2024
1 parent 556f0b0 commit 2463714
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub enum ExchangeRateProvider {
/// Kafka configuration.
///
/// See [`Config`]'s [`kafka`](struct.Config.html#structfield.kafka).
#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
pub struct KafkaConfig(BTreeMap<String, String>);

impl Default for KafkaConfig {
Expand Down
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ async fn main() {
}
None => Default::default(),
};
let indexer_blocklist = indexer_blocklist::Blocklist::spawn(conf.blocklist);
let indexer_blocklist =
indexer_blocklist::Blocklist::spawn(conf.blocklist, conf.kafka.clone().into());
let mut network = network::service::spawn(
http_client.clone(),
network_subgraph_client,
Expand Down
149 changes: 141 additions & 8 deletions src/network/indexer_blocklist.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
time::Duration,
};

use anyhow::{anyhow, Context};
use futures::StreamExt;
use rdkafka::{
consumer::{Consumer, StreamConsumer},
Message, TopicPartitionList,
};
use thegraph_core::{alloy::primitives::Address, DeploymentId, ProofOfIndexing};
use tokio::sync::watch;

Expand All @@ -13,7 +22,7 @@ pub struct Blocklist {
}

impl Blocklist {
pub fn spawn(init: Vec<BlocklistEntry>) -> Self {
pub fn spawn(init: Vec<BlocklistEntry>, kafka_config: rdkafka::ClientConfig) -> Self {
let (blocklist_tx, blocklist_rx) = watch::channel(Default::default());
let (poi_tx, poi_rx) = watch::channel(Default::default());
let (indexer_tx, indexer_rx) = watch::channel(Default::default());
Expand All @@ -26,7 +35,7 @@ impl Blocklist {
actor.add_entry(entry);
}
tokio::spawn(async move {
actor.run().await;
actor.run(kafka_config.create().unwrap()).await;
});
Self {
blocklist: blocklist_rx,
Expand All @@ -37,14 +46,57 @@ impl Blocklist {
}

struct Actor {
pub blocklist: watch::Sender<Vec<BlocklistEntry>>,
pub poi: watch::Sender<HashMap<DeploymentId, Vec<(u64, ProofOfIndexing)>>>,
pub indexer: watch::Sender<HashMap<Address, HashSet<DeploymentId>>>,
blocklist: watch::Sender<Vec<BlocklistEntry>>,
poi: watch::Sender<HashMap<DeploymentId, Vec<(u64, ProofOfIndexing)>>>,
indexer: watch::Sender<HashMap<Address, HashSet<DeploymentId>>>,
}

impl Actor {
async fn run(&mut self) {
todo!();
async fn run(&mut self, consumer: StreamConsumer) {
let topic = "gateway_blocklist";
if let Err(blocklist_err) = consumer.subscribe(&[topic]) {
tracing::error!(%blocklist_err);
return;
}
if let Err(blocklist_err) = assign_partitions(&consumer, topic).await {
tracing::error!(%blocklist_err);
return;
}

let mut records: HashMap<String, BlocklistEntry> = Default::default();
let mut stream = consumer.stream();
while let Some(msg) = stream.next().await {
let msg = match msg {
Ok(msg) => msg,
Err(blocklist_recv_error) => {
tracing::error!(%blocklist_recv_error);
continue;
}
};
let key = match msg.key_view::<str>() {
Some(Ok(key)) => key,
result => {
tracing::error!("invalid key: {result:?}");
continue;
}
};
match msg.payload().map(serde_json::from_slice::<BlocklistEntry>) {
Some(Ok(entry)) => {
records.insert(key.to_string(), entry.clone());
self.add_entry(entry);
}
None => {
let entry = records.remove(key);
if let Some(entry) = entry {
self.remove_entry(&entry);
}
}
Some(Err(blocklist_deserialize_err)) => {
tracing::error!(%blocklist_deserialize_err);
}
};
}
tracing::error!("blocklist consumer stopped");
}

fn add_entry(&mut self, entry: BlocklistEntry) {
Expand Down Expand Up @@ -75,4 +127,85 @@ impl Actor {
self.blocklist
.send_modify(move |blocklist| blocklist.push(entry));
}

fn remove_entry(&mut self, entry: &BlocklistEntry) {
match entry {
BlocklistEntry::Poi {
deployment,
block,
public_poi,
..
} => {
self.poi.send_modify(|blocklist| {
if let Some(entry) = blocklist.get_mut(deployment) {
entry.retain(|value| &(*block, (*public_poi).into()) != value);
}
});
}
BlocklistEntry::Other {
deployment,
indexer,
..
} => {
self.indexer.send_modify(|blocklist| {
if let Some(entry) = blocklist.get_mut(indexer) {
entry.remove(deployment);
}
});
}
};
fn matching(a: &BlocklistEntry, b: &BlocklistEntry) -> bool {
match (a, b) {
(
BlocklistEntry::Poi {
deployment,
public_poi,
block,
info: _,
},
BlocklistEntry::Poi {
deployment: deployment_,
public_poi: public_poi_,
block: block_,
info: _,
},
) => {
(deployment == deployment_) && (public_poi == public_poi_) && (block == block_)
}
(
BlocklistEntry::Other {
indexer,
deployment,
info: _,
},
BlocklistEntry::Other {
indexer: indexer_,
deployment: deployment_,
info: _,
},
) => (indexer == indexer_) && (deployment == deployment_),
_ => false,
}
}
self.blocklist
.send_modify(|blocklist| blocklist.retain(|value| !matching(entry, value)));
}
}

async fn assign_partitions(
consumer: &StreamConsumer,
topic: &str,
) -> anyhow::Result<TopicPartitionList> {
let metadata = consumer
.fetch_metadata(Some(topic), Duration::from_secs(30))
.with_context(|| anyhow!("fetch {topic} metadata"))?;
anyhow::ensure!(!metadata.topics().is_empty());
let topic_info = &metadata.topics()[0];
let mut assignment = TopicPartitionList::new();
for partition in topic_info.partitions() {
assignment.add_partition_offset(topic, partition.id(), rdkafka::Offset::Beginning)?;
}
tracing::debug!(?assignment);
consumer.assign(&assignment)?;
Ok(assignment)
}

0 comments on commit 2463714

Please sign in to comment.