Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

port "Ship delay error fix" test changes #143

Merged
merged 12 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 27 additions & 32 deletions tests/Cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(self, walletd=False, localCluster=True, host="localhost", port=8888
defproducerbPrvtKey: Defproducerb account private key
"""
self.accounts={}
self.nodes={}
self.nodes=[]
self.unstartedNodes=[]
self.localCluster=localCluster
self.wallet=None
Expand All @@ -87,6 +87,7 @@ def __init__(self, walletd=False, localCluster=True, host="localhost", port=8888
self.useBiosBootFile=False
self.filesToCleanup=[]
self.alternateVersionLabels=Cluster.__defaultAlternateVersionLabels()
self.biosNode = None


def setChainStrategy(self, chainSyncStrategy=Utils.SyncReplayTag):
Expand Down Expand Up @@ -464,7 +465,8 @@ def initAccountKeys(account, keys):
def initializeNodes(self, defproduceraPrvtKey=None, defproducerbPrvtKey=None, onlyBios=False):
port=Cluster.__BiosPort if onlyBios else self.port
host=Cluster.__BiosHost if onlyBios else self.host
node=Node(host, port, walletMgr=self.walletMgr)
nodeNum="bios" if onlyBios else 0
node=Node(host, port, nodeNum, walletMgr=self.walletMgr)
if Utils.Debug: Utils.Print("Node: %s", str(node))

node.checkPulse(exitOnError=True)
Expand Down Expand Up @@ -503,12 +505,13 @@ def initializeNodesFromJson(self, nodesJsonStr):
for n in nArr:
port=n["port"]
host=n["host"]
node=Node(host, port, walletMgr=self.walletMgr)
node=Node(host, port, nodeId=len(nodes), walletMgr=self.walletMgr)
if Utils.Debug: Utils.Print("Node:", node)

node.checkPulse(exitOnError=True)
nodes.append(node)


self.nodes=nodes
return True

Expand Down Expand Up @@ -669,7 +672,14 @@ def getNode(self, nodeId=0, exitOnError=True):
return self.nodes[nodeId]

def getNodes(self):
return self.nodes
return self.nodes[:]

def getAllNodes(self):
nodes = []
if self.biosNode is not None:
nodes.append(self.biosNode)
nodes += self.getNodes()
return nodes

def launchUnstarted(self, numToLaunch=1, cachePopen=False):
assert(isinstance(numToLaunch, int))
Expand All @@ -678,7 +688,7 @@ def launchUnstarted(self, numToLaunch=1, cachePopen=False):
del self.unstartedNodes[:numToLaunch]
for node in launchList:
# the node number is indexed off of the started nodes list
node.launchUnstarted(len(self.nodes), cachePopen=cachePopen)
node.launchUnstarted(cachePopen=cachePopen)
self.nodes.append(node)

# Spread funds across accounts with transactions spread through cluster nodes.
Expand Down Expand Up @@ -1342,7 +1352,7 @@ def discoverLocalNode(self, nodeNum, psOut=None, timeout=None):
if m is None:
Utils.Print("ERROR: Failed to find %s pid. Pattern %s" % (Utils.EosServerName, pattern))
return None
instance=Node(self.host, self.port + nodeNum, pid=int(m.group(1)), cmd=m.group(2), walletMgr=self.walletMgr)
instance=Node(self.host, self.port + nodeNum, nodeNum, pid=int(m.group(1)), cmd=m.group(2), walletMgr=self.walletMgr)
if Utils.Debug: Utils.Print("Node>", instance)
return instance

Expand All @@ -1355,7 +1365,7 @@ def discoverBiosNode(self, timeout=None):
Utils.Print("ERROR: Failed to find %s pid. Pattern %s" % (Utils.EosServerName, pattern))
return None
else:
return Node(Cluster.__BiosHost, Cluster.__BiosPort, pid=int(m.group(1)), cmd=m.group(2), walletMgr=self.walletMgr)
return Node(Cluster.__BiosHost, Cluster.__BiosPort, "bios", pid=int(m.group(1)), cmd=m.group(2), walletMgr=self.walletMgr)

# Kills a percentange of Eos instances starting from the tail and update eosInstanceInfos state
def killSomeEosInstances(self, killCount, killSignalStr=Utils.SigKillTag):
Expand Down Expand Up @@ -1383,7 +1393,7 @@ def relaunchEosInstances(self, cachePopen=False):
newChain= False if self.__chainSyncStrategy.name in [Utils.SyncHardReplayTag, Utils.SyncNoneTag] else True
for i in range(0, len(self.nodes)):
node=self.nodes[i]
if node.killed and not node.relaunch(i, chainArg, newChain=newChain, cachePopen=cachePopen):
if node.killed and not node.relaunch(chainArg, newChain=newChain, cachePopen=cachePopen):
return False

return True
Expand All @@ -1398,23 +1408,11 @@ def dumpErrorDetailImpl(fileName):
else:
Utils.Print("File %s not found." % (fileName))

@staticmethod
def __findFiles(path):
files=[]
it=os.scandir(path)
for entry in it:
if entry.is_file(follow_symlinks=False):
match=re.match("stderr\..+\.txt", entry.name)
if match:
files.append(os.path.join(path, entry.name))
files.sort()
return files

def dumpErrorDetails(self):
fileName=Utils.getNodeConfigDir("bios", "config.ini")
Cluster.dumpErrorDetailImpl(fileName)
path=Utils.getNodeDataDir("bios")
fileNames=Cluster.__findFiles(path)
fileNames=Node.findStderrFiles(path)
for fileName in fileNames:
Cluster.dumpErrorDetailImpl(fileName)

Expand All @@ -1425,7 +1423,7 @@ def dumpErrorDetails(self):
fileName=os.path.join(configLocation, "genesis.json")
Cluster.dumpErrorDetailImpl(fileName)
path=Utils.getNodeDataDir(i)
fileNames=Cluster.__findFiles(path)
fileNames=Node.findStderrFiles(path)
for fileName in fileNames:
Cluster.dumpErrorDetailImpl(fileName)

Expand Down Expand Up @@ -1533,8 +1531,7 @@ def discoverUnstartedLocalNode(self, nodeId):
with open(startFile, 'r') as file:
cmd=file.read()
Utils.Print("unstarted local node cmd: %s" % (cmd))
p=re.compile(r'^\s*(\w+)\s*=\s*([^\s](?:.*[^\s])?)\s*$')
instance=Node(self.host, port=self.port+nodeId, pid=None, cmd=cmd, walletMgr=self.walletMgr)
instance=Node(self.host, port=self.port+nodeId, nodeId=nodeId, pid=None, cmd=cmd, walletMgr=self.walletMgr)
if Utils.Debug: Utils.Print("Unstarted Node>", instance)
return instance

Expand All @@ -1546,14 +1543,12 @@ def getInfos(self, silentErrors=False, exitOnError=False):
return infos

def reportStatus(self):
if hasattr(self, "biosNode") and self.biosNode is not None:
self.biosNode.reportStatus()
if hasattr(self, "nodes"):
for node in self.nodes:
try:
node.reportStatus()
except:
Utils.Print("No reportStatus")
nodes = self.getAllNodes()
for node in nodes:
try:
node.reportStatus()
except:
Utils.Print("No reportStatus for nodeId: %s" % (node.nodeId))

def getBlockLog(self, nodeExtension, blockLogAction=BlockLogAction.return_blocks, outputFile=None, first=None, last=None, throwException=False, silentErrors=False, exitOnError=False):
blockLogDir=Utils.getNodeDataDir(nodeExtension, "blocks")
Expand Down
101 changes: 85 additions & 16 deletions tests/Node.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import time
import os
import re
import datetime
import json
import signal

from datetime import datetime
from datetime import timedelta
from core_symbol import CORE_SYMBOL
from testUtils import Utils
from testUtils import Account
Expand All @@ -27,11 +28,14 @@ class Node(object):

# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-arguments
def __init__(self, host, port, pid=None, cmd=None, walletMgr=None):
def __init__(self, host, port, nodeId, pid=None, cmd=None, walletMgr=None):
self.host=host
self.port=port
self.pid=pid
self.cmd=cmd
if nodeId != "bios":
assert isinstance(nodeId, int)
self.nodeId=nodeId
if Utils.Debug: Utils.Print("new Node host=%s, port=%s, pid=%s, cmd=%s" % (self.host, self.port, self.pid, self.cmd))
self.killed=False # marks node as killed
self.endpointHttp="http://%s:%d" % (self.host, self.port)
Expand All @@ -50,8 +54,7 @@ def eosClientArgs(self):
return self.endpointArgs + walletArgs + " " + Utils.MiscEosClientArgs

def __str__(self):
#return "Host: %s, Port:%d, Pid:%s, Cmd:\"%s\"" % (self.host, self.port, self.pid, self.cmd)
return "Host: %s, Port:%d, Pid:%s" % (self.host, self.port, self.pid)
return "Host: %s, Port:%d, NodeNum:%s, Pid:%s" % (self.host, self.port, self.nodeId, self.pid)

@staticmethod
def validateTransaction(trans):
Expand Down Expand Up @@ -1087,16 +1090,14 @@ def getNextCleanProductionCycle(self, trans):
return blockNum


# TBD: make nodeId an internal property
# pylint: disable=too-many-locals
# If nodeosPath is equal to None, it will use the existing nodeos path
def relaunch(self, nodeId, chainArg=None, newChain=False, skipGenesis=True, timeout=Utils.systemWaitTimeout, addSwapFlags=None, cachePopen=False, nodeosPath=None):
def relaunch(self, chainArg=None, newChain=False, skipGenesis=True, timeout=Utils.systemWaitTimeout, addSwapFlags=None, cachePopen=False, nodeosPath=None):

assert(self.pid is None)
assert(self.killed)
assert isinstance(nodeId, int) or (isinstance(nodeId, str) and nodeId == "bios"), "Invalid Node ID is passed"

if Utils.Debug: Utils.Print("Launching node process, Id: {}".format(nodeId))
if Utils.Debug: Utils.Print("Launching node process, Id: {}".format(self.nodeId))

cmdArr=[]
splittedCmd=self.cmd.split()
Expand Down Expand Up @@ -1130,7 +1131,7 @@ def relaunch(self, nodeId, chainArg=None, newChain=False, skipGenesis=True, time
myCmd=" ".join(cmdArr)

cmd=myCmd + ("" if chainArg is None else (" " + chainArg))
self.launchCmd(cmd, nodeId, cachePopen)
self.launchCmd(cmd, cachePopen)

def isNodeAlive():
"""wait for node to be responsive."""
Expand Down Expand Up @@ -1164,13 +1165,13 @@ def unstartedFile(nodeId):
Utils.errorExit("Cannot find unstarted node since %s file does not exist" % startFile)
return startFile

def launchUnstarted(self, nodeId, cachePopen=False):
def launchUnstarted(self, cachePopen=False):
Utils.Print("launchUnstarted cmd: %s" % (self.cmd))
self.launchCmd(self.cmd, nodeId, cachePopen)
self.launchCmd(self.cmd, cachePopen)

def launchCmd(self, cmd, nodeId, cachePopen=False):
dataDir=Utils.getNodeDataDir(nodeId)
dt = datetime.datetime.now()
def launchCmd(self, cmd, cachePopen=False):
dataDir=Utils.getNodeDataDir(self.nodeId)
dt = datetime.now()
dateStr=Utils.getDateString(dt)
stdoutFile="%s/stdout.%s.txt" % (dataDir, dateStr)
stderrFile="%s/stderr.%s.txt" % (dataDir, dateStr)
Expand Down Expand Up @@ -1310,8 +1311,8 @@ def getActivatedProtocolFeatures(self):
latestBlockHeaderState = self.getLatestBlockHeaderState()
return latestBlockHeaderState["activated_protocol_features"]["protocol_features"]

def modifyBuiltinPFSubjRestrictions(self, nodeId, featureCodename, subjectiveRestriction={}):
jsonPath = os.path.join(Utils.getNodeConfigDir(nodeId),
def modifyBuiltinPFSubjRestrictions(self, featureCodename, subjectiveRestriction={}):
jsonPath = os.path.join(Utils.getNodeConfigDir(self.nodeId),
"protocol_features",
"BUILTIN-{}.json".format(featureCodename))
protocolFeatureJson = []
Expand All @@ -1325,3 +1326,71 @@ def modifyBuiltinPFSubjRestrictions(self, nodeId, featureCodename, subjectiveRes
def createSnapshot(self):
param = { }
return self.processCurlCmd("producer", "create_snapshot", json.dumps(param))

@staticmethod
def findStderrFiles(path):
files=[]
it=os.scandir(path)
for entry in it:
if entry.is_file(follow_symlinks=False):
match=re.match("stderr\..+\.txt", entry.name)
if match:
files.append(os.path.join(path, entry.name))
files.sort()
return files

def analyzeProduction(self, specificBlockNum=None, thresholdMs=500):
dataDir=Utils.getNodeDataDir(self.nodeId)
files=Node.findStderrFiles(dataDir)
blockAnalysis={}
anyBlockStr=r'[0-9]+'
initialTimestamp=r'\s+([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})\s'
producedBlockPreStr=r'.+Produced\sblock\s+.+\s#('
producedBlockPostStr=r')\s@\s([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]{3})'
anyBlockPtrn=re.compile(initialTimestamp + producedBlockPreStr + anyBlockStr + producedBlockPostStr)
producedBlockPtrn=re.compile(initialTimestamp + producedBlockPreStr + str(specificBlockNum) + producedBlockPostStr) if specificBlockNum is not None else anyBlockPtrn
producedBlockDonePtrn=re.compile(initialTimestamp + r'.+Producing\sBlock\s+#' + anyBlockStr + '\sreturned:\strue')
for file in files:
with open(file, 'r') as f:
line = f.readline()
while line:
readLine=True # assume we need to read the next line before the next pass
match = producedBlockPtrn.search(line)
if match:
prodTimeStr = match.group(1)
slotTimeStr = match.group(3)
blockNum = int(match.group(2))

line = f.readline()
while line:
matchNextBlock = anyBlockPtrn.search(line)
if matchNextBlock:
readLine=False #already have the next line ready to check on next pass
break

matchBlockActuallyProduced = producedBlockDonePtrn.search(line)
if matchBlockActuallyProduced:
prodTimeStr = matchBlockActuallyProduced.group(1)
break

line = f.readline()

prodTime = datetime.strptime(prodTimeStr, Utils.TimeFmt)
slotTime = datetime.strptime(slotTimeStr, Utils.TimeFmt)
delta = prodTime - slotTime
limit = timedelta(milliseconds=thresholdMs)
if delta > limit:
if blockNum in blockAnalysis:
Utils.errorExit("Found repeat production of the same block num: %d in one of the stderr files in: %s" % (blockNum, dataDir))
blockAnalysis[blockNum] = { "slot": slotTimeStr, "prod": prodTimeStr }

if specificBlockNum is not None:
return blockAnalysis

if readLine:
line = f.readline()

if specificBlockNum is not None and specificBlockNum not in blockAnalysis:
blockAnalysis[specificBlockNum] = { "slot": None, "prod": None}

return blockAnalysis
14 changes: 13 additions & 1 deletion tests/TestHelper.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,24 @@ def shutdown(cluster, walletMgr, testSuccessful=True, killEosInstances=True, kil
Utils.Print("Test succeeded.")
else:
Utils.Print("Test failed.")

def reportProductionAnalysis(thresholdMs):
Utils.Print(Utils.FileDivider)
for node in cluster.getAllNodes():
missedBlocks=node.analyzeProduction(thresholdMs=thresholdMs)
if len(missedBlocks) > 0:
Utils.Print("NodeId: %s produced the following blocks late: %s" % (node.nodeId, missedBlocks))

if not testSuccessful and dumpErrorDetails:
cluster.reportStatus()
Utils.Print(Utils.FileDivider)
psOut=Cluster.pgrepEosServers(timeout=60)
psOut = Cluster.pgrepEosServers(timeout=60)
Utils.Print("pgrep output:\n%s" % (psOut))
reportProductionAnalysis(thresholdMs=0)
Utils.Print("== Errors see above ==")
elif dumpErrorDetails:
# for now report these to know how many blocks we are missing production windows for
reportProductionAnalysis(thresholdMs=200)

if killEosInstances:
Utils.Print("Shut down the cluster.")
Expand Down
4 changes: 2 additions & 2 deletions tests/block_log_util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def checkBlockLog(blockLog, blockNumsToFind, firstBlockNum=1):

# relaunch the node with the truncated block log and ensure it catches back up with the producers
current_head_block_num = node1.getInfo()["head_block_num"]
cluster.getNode(2).relaunch(2, cachePopen=True)
cluster.getNode(2).relaunch(cachePopen=True)
assert cluster.getNode(2).waitForBlock(current_head_block_num, timeout=60, reportInterval=15)

# ensure it continues to advance
Expand Down Expand Up @@ -186,7 +186,7 @@ def checkBlockLog(blockLog, blockNumsToFind, firstBlockNum=1):
# relaunch the node with the truncated block log and ensure it catches back up with the producers
current_head_block_num = node1.getInfo()["head_block_num"]
assert current_head_block_num >= info["head_block_num"]
cluster.getNode(2).relaunch(2, cachePopen=True)
cluster.getNode(2).relaunch(cachePopen=True)
assert cluster.getNode(2).waitForBlock(current_head_block_num, timeout=60, reportInterval=15)

# ensure it continues to advance
Expand Down
10 changes: 5 additions & 5 deletions tests/large-lib-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
walletMgr=WalletMgr(True)
cluster.setWalletMgr(walletMgr)

def relaunchNode(node: Node, nodeId, chainArg="", skipGenesis=True, relaunchAssertMessage="Fail to relaunch"):
isRelaunchSuccess=node.relaunch(nodeId, chainArg=chainArg, timeout=relaunchTimeout, skipGenesis=skipGenesis, cachePopen=True)
def relaunchNode(node: Node, chainArg="", skipGenesis=True, relaunchAssertMessage="Fail to relaunch"):
isRelaunchSuccess=node.relaunch(chainArg=chainArg, timeout=relaunchTimeout, skipGenesis=skipGenesis, cachePopen=True)
time.sleep(1) # Give a second to replay or resync if needed
assert isRelaunchSuccess, relaunchAssertMessage
return isRelaunchSuccess
Expand Down Expand Up @@ -103,9 +103,9 @@ def relaunchNode(node: Node, nodeId, chainArg="", skipGenesis=True, relaunchAsse

Print ("Relaunch all cluster nodes instances.")
# -e -p eosio for resuming production, skipGenesis=False for launch the same chain as before
relaunchNode(producingNode, 0, chainArg="-e -p eosio --sync-fetch-span 5 ", skipGenesis=False)
relaunchNode(speculativeNode1, 1, chainArg="--sync-fetch-span 5 ")
relaunchNode(speculativeNode2, 2, chainArg="--sync-fetch-span 5 ", skipGenesis=False)
relaunchNode(producingNode, chainArg="-e -p eosio --sync-fetch-span 5 ", skipGenesis=False)
relaunchNode(speculativeNode1, chainArg="--sync-fetch-span 5 ")
relaunchNode(speculativeNode2, chainArg="--sync-fetch-span 5 ", skipGenesis=False)

Print("Note LIBs")
prodLib = producingNode.getIrreversibleBlockNum()
Expand Down
Loading