Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shenkeyao committed Oct 3, 2024
1 parent 1255ef9 commit 1ecc3c6
Showing 1 changed file with 176 additions and 119 deletions.
295 changes: 176 additions & 119 deletions marketplace-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,14 @@ mod test {
use espresso_types::{
mock::MockStateCatchup,
v0_3::{RollupRegistration, RollupRegistrationBody},
FeeAccount, MarketplaceVersion, NamespaceId, PubKey, SeqTypes, SequencerVersions,
Event, FeeAccount, MarketplaceVersion, NamespaceId, PubKey, SeqTypes, SequencerVersions,
Transaction,
};
use ethers::{core::k256::elliptic_curve::rand_core::block, utils::Anvil};
use futures::Stream;
use hooks::connect_to_solver;
use hotshot::types::{
BLSPrivKey, Event,
BLSPrivKey,
EventType::{Decide, *},
};
use hotshot::{rand, types::EventType};
Expand Down Expand Up @@ -358,6 +359,11 @@ mod test {
builder: Url,
}

enum Mempool {
Public,
Private,
}

/// Pick unused ports for URLs, then set up and start the network.
///
/// Returns ports and URLs that will be used later.
Expand Down Expand Up @@ -467,76 +473,125 @@ mod test {
builder_client
}

/// Submit transactions via the private mempool.
///
/// Returns the subscribed events.
async fn submit_transactions_via_private_mempool(
transactions: Vec<Transaction>,
urls: Urls,
) -> Connection<
Event<SeqTypes>,
Unsupported,
hotshot_events_service::events::Error,
SequencerApiVersion,
> {
let txn_submission_client: Client<ServerError, SequencerApiVersion> =
Client::new(urls.builder.clone());
txn_submission_client.connect(None).await;
/// Get the view number and commitment if given a `QuorumProposal` event.
async fn proposal_view_number_and_commitment(event: Event) -> Option<(u64, VidCommitment)> {
if let EventType::QuorumProposal { proposal, .. } = event.event {
let view_number = *proposal.data.view_number;
let commitment = Leaf::from_quorum_proposal(&proposal.data).payload_commitment();
return Some((view_number, commitment));
}
None
}

// Test submitting transactions
txn_submission_client
.post::<Vec<Commitment<Transaction>>>("txn_submit/batch")
.body_json(&transactions)
.unwrap()
/// Wait for a quorum proposal event and get its view number and commitment.
async fn wait_for_proposal_view_number_and_commitment(
events: &mut (impl Stream<Item = Event> + Unpin),
) -> (u64, VidCommitment) {
let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(5) {
panic!("Didn't get a quorum proposal in 5 seconds");
}
let event = events.next().await.unwrap();
if let Some((view_number, commitment)) =
proposal_view_number_and_commitment(event).await
{
return (view_number, commitment);
}
}
}

/// Wait for a transaction event.
async fn wait_for_transaction(
events: &mut (impl Stream<Item = Event> + Unpin),
transaction: Transaction,
) {
let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(5) {
panic!("Didn't get the transaction in 5 seconds");
}
let event = events.next().await.unwrap();
if let EventType::Transactions { transactions: txns } = event.event {
if txns == vec![transaction.clone()] {
return;
}
}
}
}

/// Fetch the bundle associated with the provided parent information.
async fn get_bundle(
builder_client: Client<ServerError, MarketplaceVersion>,
parent_view_number: u64,
parent_commitment: VidCommitment,
) -> Bundle<SeqTypes> {
builder_client
.get::<Bundle<SeqTypes>>(
format!(
"bundle_info/bundle/{parent_view_number}/{parent_commitment}/{}",
parent_view_number + 1
)
.as_str(),
)
.send()
.await
.unwrap();
.unwrap()
}

/// Submit transactions via the private mempool and fetch the bundle.
async fn submit_and_get_bundle_with_private_mempool(
builder_client: Client<ServerError, MarketplaceVersion>,
transactions: Vec<Transaction>,
urls: Urls,
) -> Bundle<SeqTypes> {
// Subscribe to events.
let events_service_client = Client::<
hotshot_events_service::events::Error,
SequencerApiVersion,
>::new(urls.event.clone());
events_service_client.connect(None).await;

events_service_client
let mut events = events_service_client
.socket("hotshot-events/events")
.subscribe::<Event<SeqTypes>>()
.subscribe::<Event>()
.await
.unwrap();

// Submit transactions via the private mempool.
let txn_submission_client: Client<ServerError, SequencerApiVersion> =
Client::new(urls.builder.clone());
txn_submission_client.connect(None).await;
txn_submission_client
.post::<Vec<Commitment<Transaction>>>("txn_submit/batch")
.body_json(&transactions)
.unwrap()
}
.send()
.await
.unwrap();

/// Get the bundle associated with the received quorum proposal.
///
/// # Returns
/// * `Some` if the bundle is fetched.
/// * `None` if the quorum proposal isn't received or the bundle fetching fails.
async fn get_bundle(
event: Event<SeqTypes>,
builder_client: Client<ServerError, MarketplaceVersion>,
) -> Option<Bundle<SeqTypes>> {
if let EventType::QuorumProposal { proposal, .. } = event.event {
let parent_view_number = *proposal.data.view_number;
let parent_commitment = Leaf::from_quorum_proposal(&proposal.data).payload_commitment();
if let Ok(bundle) = builder_client
.get::<Bundle<SeqTypes>>(
format!(
"bundle_info/bundle/{parent_view_number}/{parent_commitment}/{}",
parent_view_number + 1
)
.as_str(),
)
.send()
.await
// Get the parent view number and commitment.
let parent_view_number;
let parent_commitment;
let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(5) {
panic!("Didn't get a quorum proposal in 5 seconds");
}
let event = events.next().await.unwrap().unwrap();
if let Some((view_number, commitment)) =
proposal_view_number_and_commitment(event).await
{
return Some(bundle);
} else {
return None;
parent_view_number = view_number;
parent_commitment = commitment;
break;
}
}
None

// Fetch the bundle.
get_bundle(builder_client, parent_view_number, parent_commitment).await
}

async fn test_marketplace_reserve_builder(public_mempool: bool) {
async fn test_marketplace_reserve_builder(mempool: Mempool) {
setup_test();

let (ports, urls) = pick_urls_and_start_network().await;
Expand Down Expand Up @@ -593,42 +648,43 @@ mod test {
let _ = init.await.unwrap();
let builder_client = connect_to_builder(urls.clone()).await;

// Submit transactions.
// Construct transactions.
let registered_transaction =
Transaction::new(REGISTERED_NAMESPACE.into(), vec![1, 1, 1, 1]);
let unregistered_transaction =
Transaction::new(UNREGISTERED_NAMESPACE.into(), vec![1, 1, 1, 2]);
let transactions = vec![registered_transaction.clone(), unregistered_transaction];
let bundle = if public_mempool {
let server = &network.server;
for transaction in transactions {
server.submit_transaction(transaction).await.unwrap();
}
let mut events = server.event_stream().await;

let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(10) {
panic!("Didn't get a quorum proposal in 10 seconds");
}
let event = events.next().await.unwrap();

if let Some(bundle) = get_bundle(event, builder_client.clone()).await {
break bundle;
}
let bundle = match mempool {
Mempool::Public => {
let server = &network.server;
let mut events = server.event_stream().await;

// Get the parent information before submitting transactions.
let (parent_view_number, parent_commitment) =
wait_for_proposal_view_number_and_commitment(&mut events).await;

// Submit transactions and wait until they are received.
server
.submit_transaction(registered_transaction.clone())
.await
.unwrap();
wait_for_transaction(&mut events, registered_transaction.clone()).await;
server
.submit_transaction(unregistered_transaction.clone())
.await
.unwrap();
wait_for_transaction(&mut events, unregistered_transaction).await;

// Get the bundle.
get_bundle(builder_client, parent_view_number, parent_commitment).await
}
} else {
let mut events = submit_transactions_via_private_mempool(transactions, urls).await;

let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(10) {
panic!("Didn't get a quorum proposal in 10 seconds");
}
let event = events.next().await.unwrap().unwrap();
if let Some(bundle) = get_bundle(event, builder_client.clone()).await {
break bundle;
}
Mempool::Private => {
submit_and_get_bundle_with_private_mempool(
builder_client,
vec![registered_transaction.clone(), unregistered_transaction],
urls,
)
.await
}
};

Expand Down Expand Up @@ -666,7 +722,7 @@ mod test {
assert_eq!(bundle.sequencing_fee, sequencing_fee);
}

async fn test_marketplace_fallback_builder(public_mempool: bool) {
async fn test_marketplace_fallback_builder(mempool: Mempool) {
setup_test();

let (ports, urls) = pick_urls_and_start_network().await;
Expand Down Expand Up @@ -721,42 +777,43 @@ mod test {
let _ = init.await.unwrap();
let builder_client = connect_to_builder(urls.clone()).await;

// Submit transactions.
// Construct transactions.
let registered_transaction =
Transaction::new(REGISTERED_NAMESPACE.into(), vec![1, 1, 1, 1]);
let unregistered_transaction =
Transaction::new(UNREGISTERED_NAMESPACE.into(), vec![1, 1, 1, 2]);
let transactions = vec![registered_transaction, unregistered_transaction.clone()];
let bundle = if public_mempool {
let server = &network.server;
for transaction in transactions {
server.submit_transaction(transaction).await.unwrap();
}
let mut events = server.event_stream().await;

let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(10) {
panic!("Didn't get a quorum proposal in 10 seconds");
}
let event = events.next().await.unwrap();

if let Some(bundle) = get_bundle(event, builder_client.clone()).await {
break bundle;
}
let bundle = match mempool {
Mempool::Public => {
let server = &network.server;
let mut events = server.event_stream().await;

// Get the parent information before submitting transactions.
let (parent_view_number, parent_commitment) =
wait_for_proposal_view_number_and_commitment(&mut events).await;

// Submit transactions and wait until they are received.
server
.submit_transaction(registered_transaction.clone())
.await
.unwrap();
wait_for_transaction(&mut events, registered_transaction).await;
server
.submit_transaction(unregistered_transaction.clone())
.await
.unwrap();
wait_for_transaction(&mut events, unregistered_transaction.clone()).await;

// Get the bundle.
get_bundle(builder_client, parent_view_number, parent_commitment).await
}
} else {
let mut events = submit_transactions_via_private_mempool(transactions, urls).await;

let start = Instant::now();
loop {
if start.elapsed() > Duration::from_secs(10) {
panic!("Didn't get a quorum proposal in 10 seconds");
}
let event = events.next().await.unwrap().unwrap();
if let Some(bundle) = get_bundle(event, builder_client.clone()).await {
break bundle;
}
Mempool::Private => {
submit_and_get_bundle_with_private_mempool(
builder_client,
vec![registered_transaction, unregistered_transaction.clone()],
urls,
)
.await
}
};

Expand Down Expand Up @@ -796,21 +853,21 @@ mod test {

#[async_std::test]
async fn test_marketplace_reserve_builder_with_public_mempool() {
test_marketplace_reserve_builder(true).await;
test_marketplace_reserve_builder(Mempool::Public).await;
}

#[async_std::test]
async fn test_marketplace_reserve_builder_with_private_mempool() {
test_marketplace_reserve_builder(false).await;
test_marketplace_reserve_builder(Mempool::Private).await;
}

#[async_std::test]
async fn test_marketplace_fallback_builder_with_public_mempool() {
test_marketplace_fallback_builder(true).await;
test_marketplace_fallback_builder(Mempool::Public).await;
}

#[async_std::test]
async fn test_marketplace_fallback_builder_with_private_mempool() {
test_marketplace_fallback_builder(false).await;
test_marketplace_fallback_builder(Mempool::Private).await;
}
}

0 comments on commit 1ecc3c6

Please sign in to comment.