From d09394d305a5ceb26a17bc9802d749097eab8066 Mon Sep 17 00:00:00 2001 From: Ashok Menon Date: Mon, 6 May 2024 00:57:52 +0100 Subject: [PATCH] [GraphQL] Leverage `objects_version` table. (#17543) ## Description Use the `objects_version` table to speed up point look-ups (via data loaders) for historical objects (ID + version), and dynamic fields (object look-up bounding version by parent ID). With this change, the restriction of accessing dynamic fields only within the available range is dropped. ## Test plan ``` sui$ cargo nextest run -p sui-graphql-rpc sui$ cargo nextest run -p sui-graphql-e2e-tests --features pg_integration. ``` Perform a query that involves fetching a large number of dynamic fields, which should now be fast. The following example, fetching dynamic fields on a deepbook pool loads 50 dynamic fields in about 5s from cold (which also requires loading packages for resolution), and then 2s from there: ``` query { owner( address: "0x029170bfa0a1677054263424fe4f9960c7cf05d359f6241333994c8830772bdb" ) { dynamicFields { pageInfo { hasNextPage endCursor } nodes { name { type { repr } json } value { ... on MoveValue { type { repr } json } ... on MoveObject { contents { json type { repr } } } } } } } } ``` ## Stack - #17686 - #17687 - #17688 - #17689 - #17691 - #17694 - #17695 - #17542 - #17726 --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [x] GraphQL: Dynamic fields can now be looked up on any historical object (not just objects in the available range). - [ ] CLI: - [ ] Rust SDK: --- .../dynamic_fields/dynamic_fields.exp | 12 +- .../src/types/dynamic_field.rs | 9 +- crates/sui-graphql-rpc/src/types/object.rs | 226 +++++++++++++----- crates/sui-graphql-rpc/src/types/owner.rs | 7 +- 4 files changed, 186 insertions(+), 68 deletions(-) diff --git a/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp b/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp index afbfbb96ca3a13..51b7b407d25ad7 100644 --- a/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp +++ b/crates/sui-graphql-e2e-tests/tests/consistency/dynamic_fields/dynamic_fields.exp @@ -1166,7 +1166,17 @@ task 34, lines 497-528: Response: { "data": { "parent_version_4": { - "dfAtParentVersion4_outside_range": null + "dfAtParentVersion4_outside_range": { + "name": { + "bcs": "A2RmMQ==", + "type": { + "repr": "0x0000000000000000000000000000000000000000000000000000000000000001::string::String" + } + }, + "value": { + "json": "df1" + } + } }, "parent_version_6": { "dfAtParentVersion6": null diff --git a/crates/sui-graphql-rpc/src/types/dynamic_field.rs b/crates/sui-graphql-rpc/src/types/dynamic_field.rs index ebbc86df505a48..60e05ed610512d 100644 --- a/crates/sui-graphql-rpc/src/types/dynamic_field.rs +++ b/crates/sui-graphql-rpc/src/types/dynamic_field.rs @@ -10,7 +10,7 @@ use sui_types::dynamic_field::{derive_dynamic_field_id, DynamicFieldInfo, Dynami use super::available_range::AvailableRange; use super::cursor::{Page, Target}; -use super::object::{self, deserialize_move_struct, Object, ObjectKind, ObjectLookup}; +use super::object::{self, deserialize_move_struct, Object, ObjectKind}; use super::type_filter::ExactTypeFilter; use super::{ base64::Base64, move_object::MoveObject, move_value::MoveValue, sui_address::SuiAddress, @@ -170,9 +170,10 @@ impl DynamicField { let super_ = MoveObject::query( ctx, SuiAddress::from(field_id), - ObjectLookup::LatestAt { - parent_version, - checkpoint_viewed_at, + if let Some(parent_version) = parent_version { + Object::under_parent(parent_version, checkpoint_viewed_at) + } else { + Object::latest_at(checkpoint_viewed_at) }, ) .await?; diff --git a/crates/sui-graphql-rpc/src/types/object.rs b/crates/sui-graphql-rpc/src/types/object.rs index 7243666ef520c5..e9177a122057d7 100644 --- a/crates/sui-graphql-rpc/src/types/object.rs +++ b/crates/sui-graphql-rpc/src/types/object.rs @@ -34,12 +34,12 @@ use crate::{filter, or_filter}; use async_graphql::connection::{CursorType, Edge}; use async_graphql::dataloader::Loader; use async_graphql::{connection::Connection, *}; -use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl}; +use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper}; use move_core_types::annotated_value::{MoveStruct, MoveTypeLayout}; use move_core_types::language_storage::StructTag; use serde::{Deserialize, Serialize}; use sui_indexer::models::objects::{StoredDeletedHistoryObject, StoredHistoryObject}; -use sui_indexer::schema::objects_history; +use sui_indexer::schema::{objects_history, objects_version}; use sui_indexer::types::ObjectStatus as NativeObjectStatus; use sui_indexer::types::OwnerType; use sui_types::object::bounded_visitor::BoundedVisitor; @@ -183,9 +183,14 @@ pub(crate) struct AddressOwner { pub(crate) enum ObjectLookup { LatestAt { - /// The parent version to be used as an optional upper bound for the query. Look for the - /// latest version of a child object that is less than or equal to this upper bound. - parent_version: Option, + /// The checkpoint sequence number at which this was viewed at + checkpoint_viewed_at: u64, + }, + + UnderParent { + /// The parent version to be used as an upper bound for the query. Look for the latest + /// version of a child object whose version is less than or equal to this upper bound. + parent_version: u64, /// The checkpoint sequence number at which this was viewed at checkpoint_viewed_at: u64, }, @@ -283,13 +288,21 @@ struct HistoricalKey { checkpoint_viewed_at: u64, } -/// DataLoader key for fetching the latest version of an `Object` as of a consistency cursor. The -/// query can optionally be bounded by a `parent_version` which imposes an additional requirement -/// that the object's version is bounded above by the parent version. +/// DataLoader key for fetching the latest version of an object whose parent object has version +/// `parent_version`, as of `checkpoint_viewed_at`. This look-up can fail to find a valid object if +/// the key is not self-consistent, for example if the `parent_version` is set to a higher version +/// than the object's actual parent as of `checkpoint_viewed_at`. +#[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] +struct ParentVersionKey { + id: SuiAddress, + parent_version: u64, + checkpoint_viewed_at: u64, +} + +/// DataLoader key for fetching the latest version of an `Object` as of a consistency cursor. #[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] struct LatestAtKey { id: SuiAddress, - parent_version: Option, checkpoint_viewed_at: u64, } @@ -807,7 +820,6 @@ impl Object { /// Look-up the latest version of the object as of a given checkpoint. pub(crate) fn latest_at(checkpoint_viewed_at: u64) -> ObjectLookup { ObjectLookup::LatestAt { - parent_version: None, checkpoint_viewed_at, } } @@ -815,8 +827,8 @@ impl Object { /// Look-up the latest version of an object whose version is less than or equal to its parent's /// version, as of a given checkpoint. pub(crate) fn under_parent(parent_version: u64, checkpoint_viewed_at: u64) -> ObjectLookup { - ObjectLookup::LatestAt { - parent_version: Some(parent_version), + ObjectLookup::UnderParent { + parent_version, checkpoint_viewed_at, } } @@ -849,18 +861,30 @@ impl Object { }) .await } - ObjectLookup::LatestAt { + + ObjectLookup::UnderParent { parent_version, checkpoint_viewed_at, } => { loader - .load_one(LatestAtKey { + .load_one(ParentVersionKey { id, parent_version, checkpoint_viewed_at, }) .await } + + ObjectLookup::LatestAt { + checkpoint_viewed_at, + } => { + loader + .load_one(LatestAtKey { + id, + checkpoint_viewed_at, + }) + .await + } } } @@ -1177,7 +1201,8 @@ impl Loader for Db { type Error = Error; async fn load(&self, keys: &[HistoricalKey]) -> Result, Error> { - use objects_history::dsl; + use objects_history::dsl as h; + use objects_version::dsl as v; let id_versions: BTreeSet<_> = keys .iter() @@ -1187,12 +1212,19 @@ impl Loader for Db { let objects: Vec = self .execute(move |conn| { conn.results(move || { - let mut query = dsl::objects_history.into_boxed(); + let mut query = h::objects_history + .inner_join( + v::objects_version.on(v::cp_sequence_number + .eq(h::checkpoint_sequence_number) + .and(v::object_id.eq(h::object_id)) + .and(v::object_version.eq(h::object_version))), + ) + .select(StoredHistoryObject::as_select()) + .into_boxed(); - // TODO: Speed up using an `obj_version` table. for (id, version) in id_versions.iter().cloned() { - query = query - .or_filter(dsl::object_id.eq(id).and(dsl::object_version.eq(version))); + query = + query.or_filter(v::object_id.eq(id).and(v::object_version.eq(version))); } query @@ -1234,17 +1266,20 @@ impl Loader for Db { } #[async_trait::async_trait] -impl Loader for Db { +impl Loader for Db { type Value = Object; type Error = Error; - async fn load(&self, keys: &[LatestAtKey]) -> Result, Error> { + async fn load( + &self, + keys: &[ParentVersionKey], + ) -> Result, Error> { // Group keys by checkpoint viewed at and parent version -- we'll issue a separate query for // each group. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] struct GroupKey { checkpoint_viewed_at: u64, - parent_version: Option, + parent_version: u64, } let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new(); @@ -1257,50 +1292,40 @@ impl Loader for Db { keys_by_cursor_and_parent_version .entry(group_key) .or_default() - .insert(key.id); + .insert(key.id.into_vec()); } // Issue concurrent reads for each group of keys. let futures = keys_by_cursor_and_parent_version .into_iter() .map(|(group_key, ids)| { - self.execute_repeatable(move |conn| { - let Some(range) = AvailableRange::result(conn, group_key.checkpoint_viewed_at)? - else { - return Ok::, diesel::result::Error>( - vec![], - ); - }; - - let filter = ObjectFilter { - object_ids: Some(ids.iter().cloned().collect()), - ..Default::default() - }; - - // TODO: Implement queries that use a parent version bound using an - // `obj_version` table. - let apply_parent_bound = |q: RawQuery| { - if let Some(parent_version) = group_key.parent_version { - filter!(q, format!("object_version <= {parent_version}")) - } else { - q - } - }; - - Ok(conn - .results(move || { - build_objects_query( - View::Consistent, - range, - &Page::bounded(ids.len() as u64), - |q| apply_parent_bound(filter.apply(q)), - apply_parent_bound, + self.execute(move |conn| { + let stored: Vec = conn.results(move || { + use objects_history::dsl as h; + use objects_version::dsl as v; + + h::objects_history + .inner_join( + v::objects_version.on(v::cp_sequence_number + .eq(h::checkpoint_sequence_number) + .and(v::object_id.eq(h::object_id)) + .and(v::object_version.eq(h::object_version))), ) + .select(StoredHistoryObject::as_select()) + .filter(v::object_id.eq_any(ids.iter().cloned())) + .filter(v::object_version.le(group_key.parent_version as i64)) + .distinct_on(v::object_id) + .order_by(v::object_id) + .then_order_by(v::object_version.desc()) .into_boxed() - })? - .into_iter() - .map(|r| (group_key, r)) - .collect()) + })?; + + Ok::<_, diesel::result::Error>( + stored + .into_iter() + .map(|stored| (group_key, stored)) + .collect::>(), + ) }) }); @@ -1312,15 +1337,21 @@ impl Loader for Db { for (group_key, stored) in group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))? { + // This particular object is invalid -- it didn't exist at the checkpoint we are + // viewing at. + if group_key.checkpoint_viewed_at < stored.checkpoint_sequence_number as u64 { + continue; + } + let object = Object::try_from_stored_history_object( stored, group_key.checkpoint_viewed_at, // If `LatestAtKey::parent_version` is set, it must have been correctly // propagated from the `Object::root_version` of some object. - group_key.parent_version, + Some(group_key.parent_version), )?; - let key = LatestAtKey { + let key = ParentVersionKey { id: object.address, checkpoint_viewed_at: group_key.checkpoint_viewed_at, parent_version: group_key.parent_version, @@ -1334,6 +1365,81 @@ impl Loader for Db { } } +#[async_trait::async_trait] +impl Loader for Db { + type Value = Object; + type Error = Error; + + async fn load(&self, keys: &[LatestAtKey]) -> Result, Error> { + // Group keys by checkpoint viewed at -- we'll issue a separate query for each group. + let mut keys_by_cursor_and_parent_version: BTreeMap<_, BTreeSet<_>> = BTreeMap::new(); + + for key in keys { + keys_by_cursor_and_parent_version + .entry(key.checkpoint_viewed_at) + .or_default() + .insert(key.id); + } + + // Issue concurrent reads for each group of keys. + let futures = + keys_by_cursor_and_parent_version + .into_iter() + .map(|(checkpoint_viewed_at, ids)| { + self.execute_repeatable(move |conn| { + let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? + else { + return Ok::, diesel::result::Error>( + vec![], + ); + }; + + let filter = ObjectFilter { + object_ids: Some(ids.iter().cloned().collect()), + ..Default::default() + }; + + Ok(conn + .results(move || { + build_objects_query( + View::Consistent, + range, + &Page::bounded(ids.len() as u64), + |q| filter.apply(q), + |q| q, + ) + .into_boxed() + })? + .into_iter() + .map(|r| (checkpoint_viewed_at, r)) + .collect()) + }) + }); + + // Wait for the reads to all finish, and gather them into the result map. + let groups = futures::future::join_all(futures).await; + + let mut results = HashMap::new(); + for group in groups { + for (checkpoint_viewed_at, stored) in + group.map_err(|e| Error::Internal(format!("Failed to fetch objects: {e}")))? + { + let object = + Object::try_from_stored_history_object(stored, checkpoint_viewed_at, None)?; + + let key = LatestAtKey { + id: object.address, + checkpoint_viewed_at, + }; + + results.insert(key, object); + } + } + + Ok(results) + } +} + impl From<&ObjectKind> for ObjectStatus { fn from(kind: &ObjectKind) -> Self { match kind { diff --git a/crates/sui-graphql-rpc/src/types/owner.rs b/crates/sui-graphql-rpc/src/types/owner.rs index 79525ca9e99215..3986a39f6ad231 100644 --- a/crates/sui-graphql-rpc/src/types/owner.rs +++ b/crates/sui-graphql-rpc/src/types/owner.rs @@ -251,9 +251,10 @@ impl Owner { Object::query( ctx, self.address, - object::ObjectLookup::LatestAt { - parent_version: self.root_version, - checkpoint_viewed_at: self.checkpoint_viewed_at, + if let Some(parent_version) = self.root_version { + Object::under_parent(parent_version, self.checkpoint_viewed_at) + } else { + Object::latest_at(self.checkpoint_viewed_at) }, ) .await