From 9075ba6ffbaca996767e942618d94f129050f5c1 Mon Sep 17 00:00:00 2001 From: YeahNotSewerSide <47860375+DoctorEenot@users.noreply.github.com> Date: Tue, 13 Apr 2021 18:28:10 +0300 Subject: [PATCH] MORE OPTIMIZATION --- cluster_server.py | 168 +++++++++++++++++++++++++++++++++------------- cluster_worker.py | 79 ++++++++++++++++------ 2 files changed, 177 insertions(+), 70 deletions(-) diff --git a/cluster_server.py b/cluster_server.py index 78a7d0a..7361b3a 100644 --- a/cluster_server.py +++ b/cluster_server.py @@ -148,12 +148,28 @@ def __repr__(self): master_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) master_server_socket.settimeout(15) +master_server_timeout = 15 +master_server_is_connected = False -def connect_to_master(): +def connect_to_master(dispatcher,event): + ''' + event - {'t':'e', + 'event':'connect_to_master'} + ''' logger.info('CONNECTING TO MASTER') global master_server_socket global masterServer_address global masterServer_port + global master_server_timeout + global master_server_is_connected + + try: + event.dict_representation['address'] + return + except: + pass + + master_server_is_connected = False get_master_server_info() while True: @@ -163,11 +179,22 @@ def connect_to_master(): try: master_server_socket.connect((str(masterServer_address), int(masterServer_port))) - serverVersion = master_server_socket.recv(3).decode().rstrip("\n") # Get server version except Exception as e: - #time.sleep(3) + yield continue - break + master_server_socket.settimeout(0) + serverVersion = None + timeout_start = time.time() + while time.time()-timeout_start0: + if len(devices)>0 and master_server_is_connected: if JOB == None: #MIN_PARTS = len(devices)+INC_COEF #logger.debug('MIN_PARTS is setted to '+str(MIN_PARTS)) @@ -738,11 +810,11 @@ def server(): 'parts':20}) event_dispatcher.add_to_queue(event) #request_job(event_dispatcher,event) - event = Event({'t':'e', - 'event':'job_start', - 'secret':JOB_START_SECRET, - 'callback':server_socket}) - event_dispatcher.add_to_queue(event) + #event = Event({'t':'e', + # 'event':'job_start', + # 'secret':JOB_START_SECRET, + # 'callback':server_socket}) + #event_dispatcher.add_to_queue(event) #job_start(event_dispatcher,event) @@ -763,7 +835,7 @@ def server(): logger.info('STARTING SERVER') loadConfig() - connect_to_master() + #connect_to_master() try: server() except Exception as e: diff --git a/cluster_worker.py b/cluster_worker.py index c535d94..abab908 100644 --- a/cluster_worker.py +++ b/cluster_worker.py @@ -7,6 +7,7 @@ import traceback import logging import json +import types logger = logging.getLogger('Cluster_Client') logger.setLevel(logging.DEBUG) @@ -26,6 +27,7 @@ WORKER_NAME = 'TEST' CLUSTER_SERVER_ADDRESS = ('192.168.1.2',9090) client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) +client_socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) client_socket.setblocking(False) END_JOB = False @@ -104,11 +106,17 @@ def ducos1xxh( calculation_result = [None,hashcount,start,end,expectedHash] -def ping(): - global client_socket,CLUSTER_SERVER_ADDRESS +def ping(dispatcher,event): + ''' + event - {'t':'e', + 'event':'ping', + 'address':(1,1)} + ''' + global client_socket + update_last_ping() logger.info('Pinging master server') data = b'{"t":"e","event":"ping"}' - client_socket.sendto(data,CLUSTER_SERVER_ADDRESS) + client_socket.sendto(data,event.address) def register(dispatcher,event): ''' @@ -167,6 +175,8 @@ def start_job(dispatcher,event): END_JOB = True + #yield + try: calculation_thread.join() except: @@ -217,10 +227,11 @@ def stop_job(dispatcher,event): END_JOB = True - #try: - # calculation_thread.join() - #except: - # pass + try: + calculation_thread.join() + except: + pass + END_JOB = False #calculation_result = [None,0,0,0] #calculation_thread = None @@ -252,7 +263,6 @@ def send_result(): client_socket.sendto(data.encode('ascii'),CLUSTER_SERVER_ADDRESS) - #calculation_result = [None,0,0,0] calculation_thread = None END_JOB = False @@ -281,6 +291,7 @@ class Dispatcher: def __init__(self): self.actions = {} self.queue = [] + self.active_loop = [] def register(self,event_name,action): self.actions[event_name] = action @@ -292,17 +303,32 @@ def add_to_queue(self,event:Event): def clear_queue(self): self.queue = [] - def dispatch_event(self): - try: - event = self.queue.pop(0) - except: - return None - logger.debug('dispatching event') - func = self.actions.get(event.event,None) - if func == None: - logger.warning('NO SUCH ACTION '+event.event) - return None - return self.actions[event.event](self,event) + def iter_through_active_list(self): + counter = 0 + while counter