Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(congestion_control) - Implemented nayduck tests for congestion control #11429

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions nightly/pytest-sanity.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,8 @@ pytest --timeout=120 sanity/resharding_error_handling.py
# Tests for slow chunks and extreme undercharging
pytest sanity/slow_chunk.py
pytest sanity/slow_chunk.py --features nightly

# Tests for congestion control
# TODO(congestion_control) - enable pytest on stabilization
# pytest sanity/congestion_control.py
pytest sanity/congestion_control.py --features nightly
367 changes: 367 additions & 0 deletions pytest/tests/sanity/congestion_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,367 @@
#!/usr/bin/env python3
# This is a test for congestion control. It spins up a single node and loads it
# with heavy transactions to congest one of the shards. It then checks that the
# congestion info is correct and that the chain rejected some transactions.
# Usage:
# python3 pytest/tests/sanity/congestion_control.py

import unittest
import pathlib
import sys
import json
import time
import threading

sys.path.append(str(pathlib.Path(__file__).resolve().parents[2] / 'lib'))

from configured_logger import logger
from cluster import init_cluster, load_config, spin_up_node, Key
from utils import load_test_contract, poll_blocks, wait_for_blocks
from transaction import sign_create_account_with_full_access_key_and_balance_tx, sign_deploy_contract_tx, sign_function_call_tx

ONE_NEAR = 10**24
TGAS = 10**12

ACCESS_KEY_NONCE_RANGE_MULTIPLIER = 1_000_000

# Shard layout with 5 roughly equal size shards for convenience.
SHARD_LAYOUT = {
"V1": {
"boundary_accounts": [
"fff",
"kkk",
"ppp",
"uuu",
],
"version": 2,
"shards_split_map": [],
"to_parent_shard_map": [],
}
}

NUM_SHARDS = len(SHARD_LAYOUT["V1"]["boundary_accounts"]) + 1

ACCOUNT_SHARD_0 = "aaa.test0"
ACCOUNT_SHARD_1 = "ggg.test0"
ACCOUNT_SHARD_2 = "lll.test0"
ACCOUNT_SHARD_3 = "rrr.test0"
ACCOUNT_SHARD_4 = "vvv.test0"

ALL_ACCOUNTS = [
ACCOUNT_SHARD_0,
ACCOUNT_SHARD_1,
ACCOUNT_SHARD_2,
ACCOUNT_SHARD_3,
ACCOUNT_SHARD_4,
]

TxHash = str
AccountId = str


class CongestionControlTest(unittest.TestCase):

def tearDown(self):
self.__stop_load()

def test(self):

self.nonce = 1

accounts = self.__prepare_accounts()

node = self.__setup_node()

self.__create_accounts(node, accounts)

self.__deploy_contracts(node, accounts)

self.__run(node, accounts)

node.kill()

logger.info("The test is finished.")

def __run(self, node, accounts):

self.__start_load(node, accounts)

self.__run_under_congestion(node)

self.__stop_load()

self.__run_after_congestion(node)

self.__check_txs(node)

def __run_under_congestion(self, node):
logger.info("Checking the chain under congestion")
(start_height, _) = node.get_latest_block()
for height, hash in poll_blocks(node, __target=30):
# Wait for a few blocks to congest the chain.
if height < start_height + 5:
continue

# Check the target shard.

chunk = self.__get_chunk(node, hash, 0)

# Check that the target is busy - always using 1000TG.
gas_used = chunk['header']['gas_used']
self.assertGreaterEqual(gas_used, 1000 * TGAS)

# Check that the congestion info has no buffered receipts and some delayed receipts.
congestion_info = chunk['header']['congestion_info']
self.assertEqual(int(congestion_info['buffered_receipts_gas']), 0)
self.assertGreater(int(congestion_info['delayed_receipts_gas']), 0)
self.assertGreater(congestion_info['receipt_bytes'], 0)

logger.info(
f"#{height} target gas used: {gas_used} congestion info {congestion_info}"
)

# Check one other shard.

chunk = self.__get_chunk(node, hash, 1)
gas_used = chunk['header']['gas_used']
congestion_info = chunk['header']['congestion_info']

self.assertEqual(int(congestion_info['buffered_receipts_gas']), 0)
self.assertEqual(int(congestion_info['delayed_receipts_gas']), 0)
self.assertEqual(congestion_info['receipt_bytes'], 0)

logger.info(
f"#{height} other gas used: {gas_used} congestion info {congestion_info}"
)

def __run_after_congestion(self, node):
logger.info("Checking the chain after congestion")
for height, hash in poll_blocks(node, __target=50):
chunk = self.__get_chunk(node, hash, 0)

gas_used = chunk['header']['gas_used']
congestion_info = chunk['header']['congestion_info']

logger.info(
f"#{height} gas used: {gas_used} congestion info {congestion_info}"
)

chunk = self.__get_chunk(node, hash, 0)
gas_used = chunk['header']['gas_used']
congestion_info = chunk['header']['congestion_info']

self.assertEqual(gas_used, 0)
self.assertEqual(int(congestion_info['buffered_receipts_gas']), 0)
self.assertEqual(int(congestion_info['delayed_receipts_gas']), 0)
self.assertEqual(congestion_info['receipt_bytes'], 0)

def __check_txs(self, node):
logger.info("Checking transactions. This is slow.")

accepted_count = 0
rejected_count = 0
for (tx_sender, tx_hash) in self.txs:
try:
result = node.get_tx(tx_hash, tx_sender)

status = result['result']['final_execution_status']
self.assertIn(status,
['FINAL', 'EXECUTED', 'EXECUTED_OPTIMISTIC'])

status = result['result']['status']
self.assertIn('SuccessValue', status)

accepted_count += 1
except:
rejected_count += 1

logger.info(
f"Checking transactions done, total {len(self.txs)}, accepted {accepted_count}, rejected {rejected_count}"
)
self.assertGreater(accepted_count, 0)
self.assertGreater(rejected_count, 0)

def __start_load(self, node, accounts):
logger.info("Starting load threads")
self.threads = []
self.finished = False
self.lock = threading.Lock()

target_account = accounts[0]
for account in accounts:
thread = threading.Thread(
target=self.__load,
args=[node, account, target_account],
)
self.threads.append(thread)

for thread in self.threads:
thread.start()

def __stop_load(self):
logger.info("Stopping load threads")
self.finished = True
for thread in self.threads:
thread.join()

def __load(self, node, sender_account, target_account):
logger.debug(
f"Starting load thread {sender_account.account_id} -> {target_account.account_id}"
)
self.txs = []
while not self.finished:
tx_hash = self.__call_contract(node, sender_account, target_account)
with self.lock:
self.txs.append((sender_account.account_id, tx_hash))

# This sleep here is more a formality, the call_contract call is
# slower. This is also the reason for sending transactions from
# multiple threads.
time.sleep(0.1)

logger.debug(
f"Stopped load thread {sender_account.account_id} -> {target_account.account_id}"
)

def __setup_node(self):
logger.info("Setting up the node")
epoch_length = 100
config = load_config()
genesis_config_changes = [
("epoch_length", epoch_length),
("shard_layout", SHARD_LAYOUT),
]
client_config_changes = {
0: {
"tracked_shards": [0]
},
}

near_root, [node_dir] = init_cluster(
num_nodes=1,
num_observers=0,
num_shards=NUM_SHARDS,
config=config,
genesis_config_changes=genesis_config_changes,
client_config_changes=client_config_changes,
)

node = spin_up_node(config, near_root, node_dir, 0)
return node

def __prepare_accounts(self):
logger.info("Preparing accounts")

accounts = []
for account_id in ALL_ACCOUNTS:
account_key = Key.from_random(account_id)
accounts.append(account_key)
return accounts

def __create_accounts(self, node, accounts: list[Key]):
logger.info("Creating accounts")

create_account_tx_list = []
for account in accounts:
tx_hash = self.__create_account(node, account, 1000 * ONE_NEAR)

create_account_tx_list.append((node.signer_key.account_id, tx_hash))

self.__wait_for_txs(node, create_account_tx_list)

def __deploy_contracts(self, node, accounts: list[Key]):
logger.info("Deploying contracts")

deploy_contract_tx_list = list()
for account_key in accounts:
tx_hash = self.__deploy_contract(node, account_key)
deploy_contract_tx_list.append((account_key.account_id, tx_hash))

self.__wait_for_txs(node, deploy_contract_tx_list)

def __create_account(self, node, account_key, balance):
block_hash = node.get_latest_block().hash_bytes
new_signer_key = Key(
account_key.account_id,
account_key.pk,
account_key.sk,
)
create_account_tx = sign_create_account_with_full_access_key_and_balance_tx(
node.signer_key,
account_key.account_id,
new_signer_key,
balance,
self.nonce,
block_hash,
)
self.nonce += 1
result = node.send_tx(create_account_tx)
self.assertIn('result', result, result)
logger.debug(f"Create account {account_key.account_id}: {result}")
return result['result']

def __deploy_contract(self, node, account_key):
logger.debug("Deploying contract.")

block_hash = node.get_latest_block().hash_bytes
contract = load_test_contract('test_contract_rs.wasm')

tx = sign_deploy_contract_tx(
account_key,
contract,
self.nonce,
block_hash,
)
self.nonce += 1
result = node.send_tx(tx)
self.assertIn('result', result, result)
return result['result']

def __call_contract(self, node, sender: Key, receiver: Key):
logger.debug(
f"Calling contract. {sender.account_id} -> {receiver.account_id}")

block_hash = node.get_latest_block().hash_bytes

gas_amount = 250 * TGAS
gas_bytes = gas_amount.to_bytes(8, byteorder="little")

tx = sign_function_call_tx(
sender,
receiver.account_id,
'burn_gas_raw',
gas_bytes,
300 * TGAS,
0,
self.nonce,
block_hash,
)
self.nonce += 1
result = node.send_tx(tx)
self.assertIn('result', result, result)
return result['result']

def __wait_for_txs(self, node, tx_list: list[AccountId, TxHash]):
(height, _) = wait_for_blocks(node, count=3)
self.nonce = ACCESS_KEY_NONCE_RANGE_MULTIPLIER * height + 1

for (tx_sender, tx_hash) in tx_list:
result = node.get_tx(tx_hash, tx_sender)

status = result['result']['final_execution_status']
self.assertIn(status, ['FINAL', 'EXECUTED', 'EXECUTED_OPTIMISTIC'])

status = result['result']['status']
self.assertIn('SuccessValue', status)

def __get_chunk(self, node, block_hash, shard_id):
result = node.json_rpc("chunk", {
"block_id": block_hash,
"shard_id": shard_id
})
self.assertIn('result', result, result)
return result['result']


if __name__ == '__main__':
unittest.main()
Loading
Loading