-
Notifications
You must be signed in to change notification settings - Fork 5
/
indexer_client.rs
251 lines (233 loc) · 8.12 KB
/
indexer_client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
use http::header::CONTENT_TYPE;
use reqwest::header::AUTHORIZATION;
use serde::{Deserialize, Serialize};
use thegraph_core::{
alloy::{
dyn_abi::Eip712Domain,
primitives::{BlockHash, BlockNumber},
},
attestation::{self, Attestation},
};
use thegraph_graphql_http::http::response::Error as GQLError;
use url::Url;
use crate::{
blocks::Block,
errors::{
IndexerError::{self, *},
MissingBlockError, UnavailableReason,
},
receipts::Receipt,
unattestable_errors::miscategorized_unattestable,
};
#[derive(Clone, Debug)]
pub struct IndexerResponse {
pub original_response: String,
pub attestation: Option<Attestation>,
pub client_response: String,
pub errors: Vec<String>,
pub probe_block: Option<Block>,
}
#[derive(Clone)]
pub struct IndexerClient {
pub client: reqwest::Client,
}
pub enum IndexerAuth<'a> {
Paid(&'a Receipt, &'a Eip712Domain),
Free(&'a str),
}
impl IndexerClient {
pub async fn query_indexer<'a>(
&self,
deployment_url: Url,
auth: IndexerAuth<'a>,
query: &str,
) -> Result<IndexerResponse, IndexerError> {
let (auth_key, auth_value) = match auth {
IndexerAuth::Paid(receipt, _) => ("Tap-Receipt", receipt.serialize()),
IndexerAuth::Free(token) => (AUTHORIZATION.as_str(), format!("Bearer {token}")),
};
let result = self
.client
.post(deployment_url)
.header(CONTENT_TYPE.as_str(), "application/json")
.header(auth_key, auth_value)
.body(query.to_string())
.send()
.await;
let response = match result.and_then(|r| r.error_for_status()) {
Ok(response) => response,
Err(err) if err.is_timeout() => return Err(Timeout),
Err(err) => {
return match err.status() {
Some(status) => Err(BadResponse(status.as_u16().to_string())),
_ if err.is_connect() => Err(BadResponse("failed to connect".to_string())),
_ => Err(BadResponse(err.to_string())),
}
}
};
#[derive(Debug, Deserialize)]
pub struct IndexerResponsePayload {
#[serde(rename = "graphQLResponse")]
pub graphql_response: Option<String>,
pub attestation: Option<Attestation>,
pub error: Option<String>,
}
let payload = response
.json::<IndexerResponsePayload>()
.await
.map_err(|err| BadResponse(err.to_string()))?;
if let Some(err) = payload.error {
return Err(BadResponse(err));
}
let original_response = payload
.graphql_response
.ok_or_else(|| BadResponse("missing response".into()))?;
let (client_response, errors, probe_block) = rewrite_response(&original_response)?;
let errors: Vec<String> = errors.into_iter().map(|err| err.message).collect();
errors
.iter()
.try_for_each(|err| check_block_error(err))
.map_err(|err| Unavailable(UnavailableReason::MissingBlock(err)))?;
if let Some(error) = errors
.iter()
.find(|error| miscategorized_unattestable(error))
{
return Err(BadResponse(format!("unattestable response: {error}")));
}
if let IndexerAuth::Paid(receipt, attestation_domain) = auth {
match &payload.attestation {
Some(attestation) => {
let allocation = receipt.allocation();
if let Err(err) = attestation::verify(
attestation_domain,
attestation,
&allocation,
query,
&original_response,
) {
return Err(BadResponse(format!("bad attestation: {err}")));
}
}
None => {
let message = if !errors.is_empty() {
format!(
"no attestation: {}",
errors
.iter()
.map(|err| err.as_str())
.collect::<Vec<&str>>()
.join("; ")
)
} else {
"no attestation".to_string()
};
return Err(BadResponse(message));
}
};
}
Ok(IndexerResponse {
original_response,
attestation: payload.attestation,
client_response,
errors,
probe_block,
})
}
}
fn rewrite_response(
response: &str,
) -> Result<(String, Vec<GQLError>, Option<Block>), IndexerError> {
#[derive(Deserialize, Serialize)]
struct Response {
data: Option<ProbedData>,
#[serde(default)]
#[serde(skip_serializing_if = "Vec::is_empty")]
errors: Vec<GQLError>,
// indexer-service sometimes returns errors in this form, which isn't ideal
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
#[derive(Deserialize, Serialize)]
struct ProbedData {
#[serde(rename = "_gateway_probe_", skip_serializing)]
probe: Option<Meta>,
#[serde(flatten)]
data: serde_json::Value,
}
#[derive(Deserialize)]
struct Meta {
block: MaybeBlock,
}
#[derive(Deserialize)]
struct MaybeBlock {
number: BlockNumber,
hash: BlockHash,
timestamp: Option<u64>,
}
let mut payload: Response =
serde_json::from_str(response).map_err(|err| BadResponse(err.to_string()))?;
if let Some(err) = payload.error.take() {
payload.errors.push(GQLError {
message: err,
locations: Default::default(),
path: Default::default(),
});
}
// Avoid processing oversized errors.
for err in &mut payload.errors {
err.message.truncate(256);
err.message.shrink_to_fit();
}
let block = payload
.data
.as_mut()
.and_then(|data| data.probe.take())
.and_then(|meta| {
Some(Block {
number: meta.block.number,
hash: meta.block.hash,
timestamp: meta.block.timestamp?,
})
});
let client_response = serde_json::to_string(&payload).unwrap();
Ok((client_response, payload.errors, block))
}
fn check_block_error(err: &str) -> Result<(), MissingBlockError> {
// TODO: indexers should *always* report their block status in a header on every query. This
// will significantly reduce how brittle this feedback is, and also give a stronger basis for
// prediction in the happy path.
if !err.contains("Failed to decode `block") {
return Ok(());
}
let extract_block_number = |prefix: &str| -> Option<u64> {
let start = err.find(prefix)? + prefix.len();
let str = err.split_at(start).1.split_once(' ')?.0;
str.parse::<u64>().ok()
};
Err(MissingBlockError {
missing: extract_block_number("and data for block number "),
latest: extract_block_number("has only indexed up to block number "),
})
}
#[cfg(test)]
mod tests {
use crate::errors::MissingBlockError;
#[test]
fn check_block_error() {
let tests = [
("", Ok(())),
("Failed to decode `block.number` value: `subgraph QmQqLJVgZLcRduoszARzRi12qGheUTWAHFf3ixMeGm2xML has only indexed up to block number 133239690 and data for block number 133239697 is therefore not yet available", Err(MissingBlockError {
missing: Some(133239697),
latest: Some(133239690),
})),
("Failed to decode `block.hash` value", Err(MissingBlockError {
missing: None,
latest: None,
})),
];
for (input, expected) in tests {
assert_eq!(super::check_block_error(input), expected);
}
}
}