Skip to content

Commit

Permalink
fix: remove Scalar support
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 2, 2024
1 parent 4cc5c63 commit 90d5fd0
Show file tree
Hide file tree
Showing 12 changed files with 38 additions and 639 deletions.
45 changes: 1 addition & 44 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,27 @@ lazy_static = "1.4.0"
ordered-float = "4.2.0"
parking_lot = "0.12.3"
pin-project = "1.1.5"
primitive-types = "0.12.2"
prometheus = { version = "0.13", default-features = false }
prost = "0.13.1"
rand = { version = "0.8", features = ["small_rng"] }
rdkafka = { version = "0.36.2", features = ["gssapi", "tracing"] }
receipts = { git = "https://github.com/edgeandnode/receipts", rev = "e94e0f1" }
reqwest = { version = "0.12", default-features = false, features = [
"json",
"default-tls",
"gzip",
] }
secp256k1 = { version = "0.29", default-features = false }
semver = { version = "1.0", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0.116", features = ["raw_value"] }
serde_with = "3.8.1"
snmalloc-rs = "0.3"
tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "f680f4c" }
thegraph-core = { version = "0.8.5", features = ["alloy-contract", "alloy-signer-local", "attestation", "serde"] }
thegraph-core = { version = "0.8.5", features = [
"alloy-contract",
"alloy-signer-local",
"attestation",
"serde",
] }
thegraph-graphql-http = { version = "0.2.1", features = [
"http-client-reqwest",
] }
Expand Down
38 changes: 6 additions & 32 deletions src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use crate::{
metrics::{with_metric, METRICS},
middleware::RequestId,
network::{self, DeploymentError, Indexing, IndexingId, ResolvedSubgraphInfo, SubgraphError},
receipts::ReceiptStatus,
reports,
};

Expand Down Expand Up @@ -289,26 +288,20 @@ async fn run_indexer_queries(
let largest_allocation = selection.data.largest_allocation;
let url = selection.data.url.clone();
let seconds_behind = selection.seconds_behind;
let legacy_scalar = !selection.data.tap_support;
let subgraph_chain = subgraph.chain.clone();

// over-pay indexers to hit target
let min_fee = *(min_fee.0 * grt_per_usd * one_grt) / selections.len() as f64;
let indexer_fee = selection.fee.as_f64() * budget as f64;
let fee = indexer_fee.max(min_fee) as u128;
let receipt = match if legacy_scalar {
ctx.receipt_signer
.create_legacy_receipt(largest_allocation, fee)
} else {
ctx.receipt_signer.create_receipt(largest_allocation, fee)
} {
let receipt = match ctx.receipt_signer.create_receipt(largest_allocation, fee) {
Ok(receipt) => receipt,
Err(err) => {
tracing::error!(?indexer, %deployment, error=?err, "failed to create receipt");
continue;
}
};
debug_assert!(fee == receipt.grt_value());
debug_assert!(fee == receipt.value());

let blocks_behind = blocks_behind(seconds_behind, blocks_per_minute);
let indexer_client = ctx.indexer_client.clone();
Expand All @@ -328,7 +321,6 @@ async fn run_indexer_queries(
let report = reports::IndexerRequest {
indexer,
deployment,
largest_allocation,
url: url.to_string(),
receipt,
subgraph_chain,
Expand Down Expand Up @@ -358,17 +350,6 @@ async fn run_indexer_queries(
}
}

let receipt_status = match &report.result {
Ok(_) => ReceiptStatus::Success,
Err(IndexerError::Timeout) => ReceiptStatus::Unknown,
Err(_) => ReceiptStatus::Failure,
};
ctx.receipt_signer.record_receipt(
&report.largest_allocation,
&report.receipt,
receipt_status,
);

indexer_requests.push(report);
}

Expand Down Expand Up @@ -399,7 +380,7 @@ async fn run_indexer_queries(

let total_fees_grt: f64 = indexer_requests
.iter()
.map(|i| i.receipt.grt_value() as f64 * 1e-18)
.map(|i| i.receipt.value() as f64 * 1e-18)
.sum();
let total_fees_usd = USD(NotNan::new(total_fees_grt / *grt_per_usd).unwrap());
let _ = ctx.budgeter.feedback.send(total_fees_usd);
Expand Down Expand Up @@ -445,7 +426,7 @@ async fn run_indexer_queries(
result = ?indexer_request.result.as_ref().map(|_| ()),
response_time_ms = indexer_request.response_time_ms,
seconds_behind = indexer_request.seconds_behind,
fee = indexer_request.receipt.grt_value() as f64 * 1e-18,
fee = indexer_request.receipt.value() as f64 * 1e-18,
"indexer_request"
);
tracing::trace!(indexer_request = indexer_request.request);
Expand Down Expand Up @@ -485,7 +466,6 @@ struct CandidateMetadata {
#[debug(with = std::fmt::Display::fmt)]
url: Url,
largest_allocation: AllocationId,
tap_support: bool,
}

/// Given a list of indexings, build a list of candidates that are within the required block range
Expand Down Expand Up @@ -594,7 +574,6 @@ fn build_candidates_list(
deployment,
url: indexing.indexer.url.clone(),
largest_allocation: indexing.largest_allocation,
tap_support: indexing.indexer.tap_support,
},
perf: perf.response,
fee: Normalized::new(indexing.fee as f64 / budget as f64).unwrap_or(Normalized::ONE),
Expand Down Expand Up @@ -733,11 +712,7 @@ pub async fn handle_indexer_query(
let fee = *(ctx.budgeter.query_fees_target.0 * grt_per_usd * one_grt) as u128;

let allocation = indexing.largest_allocation;
let receipt = match if indexing.indexer.tap_support {
ctx.receipt_signer.create_receipt(allocation, fee)
} else {
ctx.receipt_signer.create_legacy_receipt(allocation, fee)
} {
let receipt = match ctx.receipt_signer.create_receipt(allocation, fee) {
Ok(receipt) => receipt,
Err(err) => {
return Err(Error::Internal(anyhow!("failed to create receipt: {err}")));
Expand All @@ -762,7 +737,6 @@ pub async fn handle_indexer_query(
let indexer_request = reports::IndexerRequest {
indexer: indexing_id.indexer,
deployment: indexing_id.deployment,
largest_allocation: allocation,
url: indexing.indexer.url.to_string(),
receipt,
subgraph_chain: subgraph.chain,
Expand Down Expand Up @@ -805,7 +779,7 @@ pub async fn handle_indexer_query(
result = ?indexer_request.result.as_ref().map(|_| ()),
response_time_ms = indexer_request.response_time_ms,
seconds_behind = indexer_request.seconds_behind,
fee = indexer_request.receipt.grt_value() as f64 * 1e-18,
fee = indexer_request.receipt.value() as f64 * 1e-18,
"indexer_request"
);

Expand Down
2 changes: 0 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ impl From<KafkaConfig> for rdkafka::config::ClientConfig {
pub struct Receipts {
/// TAP verifier contract chain
pub chain_id: U256,
/// Secret key for legacy voucher signing (Scalar)
pub legacy_signer: Option<B256>,
/// TAP signer key
pub signer: B256,
/// TAP verifier contract address
Expand Down
5 changes: 3 additions & 2 deletions src/indexer_client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use http::header::CONTENT_TYPE;
use reqwest::header::AUTHORIZATION;
use serde::{Deserialize, Serialize};
use thegraph_core::{
Expand Down Expand Up @@ -47,14 +48,14 @@ impl IndexerClient {
query: &str,
) -> Result<IndexerResponse, IndexerError> {
let (auth_key, auth_value) = match auth {
IndexerAuth::Paid(receipt, _) => (receipt.header_name(), receipt.serialize()),
IndexerAuth::Paid(receipt, _) => ("Tap-Receipt", receipt.serialize()),
IndexerAuth::Free(token) => (AUTHORIZATION.as_str(), format!("Bearer {token}")),
};

let result = self
.client
.post(deployment_url)
.header("Content-Type", "application/json")
.header(CONTENT_TYPE.as_str(), "application/json")
.header(auth_key, auth_value)
.body(query.to_string())
.send()
Expand Down
20 changes: 0 additions & 20 deletions src/json.rs

This file was deleted.

30 changes: 0 additions & 30 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ mod http_ext;
mod indexer_client;
mod indexers;
mod indexing_performance;
mod json;
mod metrics;
mod middleware;
mod network;
Expand All @@ -25,7 +24,6 @@ mod time;
#[allow(dead_code)]
mod ttl_hash_map;
mod unattestable_errors;
mod vouchers;

use std::{
collections::HashSet,
Expand All @@ -39,7 +37,6 @@ use std::{

use auth::AuthContext;
use axum::{
extract::DefaultBodyLimit,
http::{self, status::StatusCode},
routing, Router,
};
Expand Down Expand Up @@ -128,21 +125,10 @@ async fn main() {
let indexing_perf = IndexingPerformance::new(network.clone());
network.wait_until_ready().await;

let legacy_signer: &'static secp256k1::SecretKey = Box::leak(Box::new(
secp256k1::SecretKey::from_slice(
conf.receipts
.legacy_signer
.as_ref()
.map(|s| s.0.as_slice())
.unwrap_or(receipt_signer.to_bytes().as_slice()),
)
.expect("invalid legacy signer key"),
));
let receipt_signer: &'static ReceiptSigner = Box::leak(Box::new(ReceiptSigner::new(
receipt_signer,
conf.receipts.chain_id,
conf.receipts.verifier,
legacy_signer,
)));

// Initialize the auth service
Expand Down Expand Up @@ -241,22 +227,6 @@ async fn main() {
.route("/", routing::get(|| async { "Ready to roll!" }))
// This path is required by NGINX ingress controller
.route("/ready", routing::get(|| async { "Ready" }))
.route(
"/collect-receipts",
routing::post(vouchers::handle_collect_receipts)
.with_state(legacy_signer)
.layer(DefaultBodyLimit::max(3_000_000)),
)
.route(
"/partial-voucher",
routing::post(vouchers::handle_partial_voucher)
.with_state(legacy_signer)
.layer(DefaultBodyLimit::max(3_000_000)),
)
.route(
"/voucher",
routing::post(vouchers::handle_voucher).with_state(legacy_signer),
)
.route(
"/blocklist",
routing::get(move || async move {
Expand Down
9 changes: 0 additions & 9 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ pub struct Metrics {
pub client_query: ResponseMetrics,
pub avg_query_fees: Gauge,
pub indexer_query: ResponseMetricVecs,
pub collect_receipts: ResponseMetrics,
pub partial_voucher: ResponseMetrics,
pub voucher: ResponseMetrics,
pub blocks_per_minute: IntGaugeVec,
}

Expand All @@ -34,12 +31,6 @@ impl Metrics {
"indexer query",
&["deployment", "indexer"],
),
collect_receipts: ResponseMetrics::new(
"gw_collect_receipts",
"collect-receipts request",
),
partial_voucher: ResponseMetrics::new("gw_partial_voucher", "partial-voucher request"),
voucher: ResponseMetrics::new("gw_voucher", "requests for voucher"),
blocks_per_minute: register_int_gauge_vec!(
"gw_blocks_per_minute",
"chain blocks per minute",
Expand Down
Loading

0 comments on commit 90d5fd0

Please sign in to comment.