From 217373af5ba6db98c94baef34bb1939ce6f174c7 Mon Sep 17 00:00:00 2001 From: Lehman Garrison Date: Sat, 9 Nov 2024 11:40:51 -0500 Subject: [PATCH] lint: apply pyupgrade for 3.9 --- disbatch/dbMon.py | 13 ++++--- disbatch/disBatch.py | 67 +++++++++++++++-------------------- disbatch/kvsstcp/kvsclient.py | 24 ++++++------- disbatch/kvsstcp/kvscommon.py | 8 ++--- disbatch/kvsstcp/kvsstcp.py | 32 ++++++++--------- pyproject.toml | 3 ++ 6 files changed, 69 insertions(+), 78 deletions(-) diff --git a/disbatch/dbMon.py b/disbatch/dbMon.py index 74382cb..596adb8 100644 --- a/disbatch/dbMon.py +++ b/disbatch/dbMon.py @@ -3,7 +3,6 @@ import curses import json import os -import socket import sys import time from queue import Queue @@ -36,7 +35,7 @@ curses.init_pair(7, curses.COLOR_WHITE, curses.COLOR_WHITE) curses.curs_set(False) -CPCB, CPGB, CPBR, CPYB, CPRB, CPBB, CPWW = [curses.color_pair(x) for x in range(1, 8)] +CPCB, CPGB, CPBR, CPYB, CPRB, CPBB, CPWW = (curses.color_pair(x) for x in range(1, 8)) Diamond = curses.ACS_DIAMOND Horizontal, Vertical = curses.ACS_HLINE, curses.ACS_VLINE @@ -316,14 +315,14 @@ def display(S, kvsc, inq): ) if r == 'Y': try: - msg = 'Asking controller to stop context %r' % cRank + msg = f'Asking controller to stop context {cRank!r}' kvsc.put('.controller', ('stop context', cRank)) for rank, e in engines.items(): if e['cRank'] == cRank: localEngineStatus[rank] = ( 'requesting shutdown' ) - except socket.error: + except OSError: pass elif k == ord('E'): r = popYNC( @@ -336,11 +335,11 @@ def display(S, kvsc, inq): if r == 'Y': try: msg = ( - 'Asking controller to stop engine %r' % target + f'Asking controller to stop engine {target!r}' ) kvsc.put('.controller', ('stop engine', target)) localEngineStatus[target] = 'requesting shutdown' - except socket.error: + except OSError: pass else: msg = 'Got unrecognized key: %d' % k @@ -354,7 +353,7 @@ def display(S, kvsc, inq): elif tag == 'stop': done = True else: - msg = 'Unrecognized tag: "%s",' % tag + msg = f'Unrecognized tag: "{tag}",' # (Wrapped) main. diff --git a/disbatch/disBatch.py b/disbatch/disBatch.py index cfaffb8..041db4f 100644 --- a/disbatch/disBatch.py +++ b/disbatch/disBatch.py @@ -110,7 +110,7 @@ def register(kvs, which): return kvs.get(key) -class DisBatcher(object): +class DisBatcher: """Encapsulates a disBatch instance.""" def __init__(self, tasksname='DisBatcher', args=[], kvsserver=None): @@ -225,16 +225,7 @@ def __init__( self.retireCmd = None def __str__(self): - return ( - 'Context type: %s\nLabel: %s\nNodes: %r\nCylinders: %r\nCores per cylinder: %r\n' - % ( - self.sysid, - self.label, - self.nodes, - self.cylinders, - self.cores_per_cylinder, - ) - ) + return f'Context type: {self.sysid}\nLabel: {self.label}\nNodes: {self.nodes!r}\nCylinders: {self.cylinders!r}\nCores per cylinder: {self.cores_per_cylinder!r}\n' def finish(self): """Check that all engines completed successfully and return True on success.""" @@ -257,7 +248,7 @@ def launch(self, kvs): def launchNode(self, node): """Launch an engine for a single node. Should return a subprocess handle (unless launch itself is overridden).""" - raise NotImplementedError('%s.launchNode is not implemented' % type(self)) + raise NotImplementedError(f'{type(self)}.launchNode is not implemented') def poll(self): """Check if any engines have stopped.""" @@ -302,8 +293,7 @@ def retireNodeList(self, nodeList, retList): shell=True, env=env, check=True, - stdout=SUB.PIPE, - stderr=SUB.PIPE, + capture_output=True, ) except Exception as e: logger.warning('Retirement planning needs improvement: %s', repr(e)) @@ -354,8 +344,7 @@ def setNode(self, node=None): except ValueError: # Should we instead assume 0 or carry on with none? raise LookupError( - 'Couldn\'t find nodeId for %s in "%s".' - % (node or myHostname, self.nodes) + f'Couldn\'t find nodeId for {node or myHostname} in "{self.nodes}".' ) @@ -509,7 +498,7 @@ def decodeSlurmVal(val): opts = [] if opt_file: self.for_log.append( - ('Taking srun options from "%s".' % opt_file, logging.INFO) + (f'Taking srun options from "{opt_file}".', logging.INFO) ) opts = open(opt_file).read().split('\n') else: @@ -525,8 +514,8 @@ def decodeSlurmVal(val): name, value = L.split('=', 1) os.environ[name] = value - contextLabel = args.label if args.label else 'J%s' % jobid - super(SlurmContext, self).__init__( + contextLabel = args.label if args.label else f'J{jobid}' + super().__init__( 'Slurm', dbInfo, rank, @@ -639,7 +628,7 @@ def engine_stop(self): p.flush() def launchNode(self, n): - lfp = '%s_%s_%s_engine_wrap.log' % (self.dbInfo.uniqueId, self.label, n) + lfp = f'{self.dbInfo.uniqueId}_{self.label}_{n}_engine_wrap.log' # To convince Slurm to give us the right gres, request the right number of tasks. nx = self.nodes.index(n) tasks = self.cylinders[nx] @@ -681,11 +670,11 @@ def __retirementThrottle__(self): # been at least ThrottleTime since the # last node was added. logging.info(f'Throttle releasing: {nodeList}, {retList}.') - super(SlurmContext, self).retireNodeList(nodeList, retList) + super().retireNodeList(nodeList, retList) nodeList, retList = [], [] def retireEnv(self, nodeList, retList): - env = super(SlurmContext, self).retireEnv(nodeList, retList) + env = super().retireEnv(nodeList, retList) if self.driverNode: env['DRIVER_NODE'] = self.driverNode return env @@ -735,7 +724,7 @@ def poll_task(self, p): return None def setNode(self, node=None): - super(SlurmContext, self).setNode(node or os.getenv('SLURMD_NODENAME')) + super().setNode(node or os.getenv('SLURMD_NODENAME')) # TODO: @@ -817,7 +806,7 @@ def __init__(self, dbInfo, rank, args): cores_per_cylinder = [ cc // c if c else cc for cc, c in zip(core_count, cylinders) ] - super(SSHContext, self).__init__( + super().__init__( 'SSH', dbInfo, rank, @@ -830,12 +819,12 @@ def __init__(self, dbInfo, rank, args): def launchNode(self, n): prefix = [] if compHostnames(n, myHostname) else ['ssh', n] - lfp = '%s_%s_%s_engine_wrap.log' % (self.dbInfo.uniqueId, self.label, n) + lfp = f'{self.dbInfo.uniqueId}_{self.label}_{n}_engine_wrap.log' cmd = prefix + [DbUtilPath, '--engine', '-n', n, self.kvsKey] logger.info('ssh launch comand: %r', cmd) return SUB.Popen( cmd, - stdin=open(os.devnull, 'r'), + stdin=open(os.devnull), stdout=open(lfp, 'w'), stderr=SUB.STDOUT, close_fds=True, @@ -1083,7 +1072,7 @@ def parseStatusFiles(*files): status = dict() for f in files: try: - with open(f, 'r', encoding='utf-8') as s: + with open(f, encoding='utf-8') as s: for L in s: tr = TaskReport(L[:-1]) ti = tr.taskInfo @@ -1313,7 +1302,7 @@ def statusTaskFilter(tasks, status, retry=False, force=False): # Main control loop that sends new tasks to the execution engines. class Feeder(Thread): def __init__(self, kvs, ageQ, tasks, slots): - super(Feeder, self).__init__(name='Feeder') + super().__init__(name='Feeder') self.kvs = kvs.clone() self.ageQ = ageQ self.taskGenerator = tasks @@ -1366,7 +1355,7 @@ def main(self): # Main control loop that processes completed tasks. class Driver(Thread): def __init__(self, kvs, db_info, tasks, trackResults=None): - super(Driver, self).__init__(name='Driver') + super().__init__(name='Driver') self.kvs = kvs.clone() self.db_info = db_info # uniqueId can have a path component. Remove that here. @@ -1774,7 +1763,7 @@ def run(self): rd = tReport.reportDict() rd['TaskCmd'] = rd['TaskCmd'].decode('utf-8', 'replace') self.kvs.put( - self.trackResults + f' {tinfo.taskId}'.encode('utf-8'), + self.trackResults + f' {tinfo.taskId}'.encode(), json.dumps(rd), b'JSON', ) @@ -1877,7 +1866,7 @@ def run(self): # and possibly collect the first and/or last few bytes of it class OutputCollector(Thread): def __init__(self, pipe, takeStart=0, takeEnd=0): - super(OutputCollector, self).__init__(name='OutputCollector') + super().__init__(name='OutputCollector') # We don't really care for python's file abstraction -- get back a real fd self.pipefd = os.dup(pipe.fileno()) pipe.close() @@ -1996,7 +1985,7 @@ def run(self): # signal.signal(signal.SIGTERM, lambda s, f: sys.exit(1)) try: self.main() - except socket.error as e: + except OSError as e: if not self.shuttingDown: logger.info('Cylinder %d got socket error %r', self.cylinderRank, e) except Exception: @@ -2141,7 +2130,7 @@ def main(self): logger.info('Cylinder %s completed: %s', self.cylinderRank, tr) def __init__(self, kvs, context, rank): - super(EngineBlock, self).__init__(name='EngineBlock') + super().__init__(name='EngineBlock') self.daemon = True self.context = context self.hbQueue = Queue() @@ -2409,7 +2398,7 @@ def main(kvsq=None): os.chdir(dbInfo.wd) except Exception: print( - 'Failed to change working directory to "%s".' % dbInfo.wd, + f'Failed to change working directory to "{dbInfo.wd}".', file=sys.stderr, ) # TODO: Fail here? @@ -2459,7 +2448,7 @@ def shutdown(s=None, f=None): try: e.join() - except socket.error as r: + except OSError as r: logger.info('got socket error waiting on shutdown: %r', r) except Exception as e: logger.exception('EngineBlock during join.') @@ -2506,7 +2495,7 @@ def shutdown(s=None, f=None): os.chdir(dbInfo.wd) except Exception: print( - 'Failed to change working directory to "%s".' % dbInfo.wd, + f'Failed to change working directory to "{dbInfo.wd}".', file=sys.stderr, ) # TODO: Fail here? @@ -2525,7 +2514,7 @@ def shutdown(s=None, f=None): 'format': '%(asctime)s %(levelname)-8s %(name)-15s: %(message)s', 'level': dbInfo.args.loglevel, } - lconf['filename'] = '%s_%s.context.log' % (dbInfo.uniqueId, context.label) + lconf['filename'] = f'{dbInfo.uniqueId}_{context.label}.context.log' logging.basicConfig(**lconf) logging.info('%s context started on %s (%d).', context.sysid, myHostname, myPid) logger.info('argv: %r', sys.argv) @@ -2828,7 +2817,7 @@ def shutdown(s=None, f=None): urlfile = uniqueId + '_url' wskvsmu.main(kvsserver, urlfile=open(urlfile, 'w'), monitorspec=':gpvw') - DbUtilPath = '%s_dbUtil.sh' % uniqueId + DbUtilPath = f'{uniqueId}_dbUtil.sh' dbutil_template = ( importlib.resources.files('disbatch') .joinpath('dbUtil.template.sh') @@ -2868,7 +2857,7 @@ def shutdown(s=None, f=None): subContext = SUB.Popen( [DbUtilPath] + extraArgs, - stdin=open(os.devnull, 'r'), + stdin=open(os.devnull), stdout=open(uniqueId + '_context_wrap.out', 'w'), close_fds=True, ) diff --git a/disbatch/kvsstcp/kvsclient.py b/disbatch/kvsstcp/kvsclient.py index c64299e..ceeb27a 100755 --- a/disbatch/kvsstcp/kvsclient.py +++ b/disbatch/kvsstcp/kvsclient.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -from __future__ import print_function import errno import os @@ -12,7 +11,7 @@ from .kvscommon import AsciiLenChars, AsciiLenFormat, recvall -class KVSClient(object): +class KVSClient: """KVS convenience wrapper that includes pickling by default.""" def __init__(self, host=None, port=None, retry=0): @@ -51,7 +50,7 @@ def _close(self): return try: self._real_socket().close() - except socket.error: + except OSError: pass self.socket = None @@ -85,8 +84,9 @@ def __bool__(self): def __getattr__(self, attr): """Disallow any other operations on a waiting socket.""" raise Exception( - "Previous %s timed out: you must retreive the previously requested '%s' value first." - % self.op + "Previous {} timed out: you must retreive the previously requested '{}' value first.".format( + *self.op + ) ) def _real_socket(self): @@ -119,7 +119,7 @@ def _get_view(self, op, k, encoding, timeout=None): except socket.timeout: self.socket = self.SocketWaiting(self.socket, (op, k)) return - except socket.error as e: + except OSError as e: if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): self.socket = self.SocketWaiting(self.socket, (op, k)) return @@ -128,7 +128,7 @@ def _get_view(self, op, k, encoding, timeout=None): finally: self._real_socket().settimeout(None) if not c: - raise socket.error('Connection closed') + raise OSError('Connection closed') coding = c + recvall(self.socket, 3) v = self._recvValue(encoding is True and coding == b'PYPK') return v if isinstance(encoding, bool) else (coding, v) @@ -144,11 +144,11 @@ def connect(self, retry=0): self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.socket.connect(self.addr) return - except socket.error as msg: + except OSError as msg: self._close() if rep >= retry: raise - print('kvs socket error: %s, retrying' % msg, file=sys.stderr) + print(f'kvs socket error: {msg}, retrying', file=sys.stderr) # exponential backoff time.sleep(2**rep) rep += 1 @@ -160,9 +160,9 @@ def close(self): try: self.socket.sendall(b'clos') self.socket.shutdown(socket.SHUT_RDWR) - except socket.error as e: + except OSError as e: # this is the client --- cannot assume logging is available. - print('Ignoring exception during client close: "%s"' % e, file=sys.stderr) + print(f'Ignoring exception during client close: "{e}"', file=sys.stderr) self._close() def dump(self): @@ -218,7 +218,7 @@ def put(self, key, value, encoding=True): encoding = repr(encoding) encoding = bytes(encoding, 'utf-8') if len(encoding) != 4: - raise TypeError('Invalid encoding: %s' % encoding) + raise TypeError(f'Invalid encoding: {encoding}') self.socket.sendall(b'put_') self._sendLenAndBytes(key) diff --git a/disbatch/kvsstcp/kvscommon.py b/disbatch/kvsstcp/kvscommon.py index 5e58a7b..0fbfe09 100644 --- a/disbatch/kvsstcp/kvscommon.py +++ b/disbatch/kvsstcp/kvscommon.py @@ -46,24 +46,24 @@ def AsciiLenFormat(n): # MSG_WAITALL on OSX ends up blocking if the tcp buffer is not big enough for the entire message: don't use it def recvall(s, n): if s is None: - raise socket.error('socket is None, cannot receive') + raise OSError('socket is None, cannot receive') if not n: return b'' r = s.recv(n, socket.MSG_WAITALL) if len(r) < n: - raise socket.error('Connection dropped') + raise OSError('Connection dropped') return r else: def recvall(s, n): """Wrapper to deal with partial recvs when we know there are N bytes to be had.""" if s is None: - raise socket.error('socket is None, cannot receive') + raise OSError('socket is None, cannot receive') d = b'' while n: b = s.recv(n) if not b: - raise socket.error('Connection dropped') + raise OSError('Connection dropped') d += b n -= len(b) return d diff --git a/disbatch/kvsstcp/kvsstcp.py b/disbatch/kvsstcp/kvsstcp.py index b52f85e..faee923 100755 --- a/disbatch/kvsstcp/kvsstcp.py +++ b/disbatch/kvsstcp/kvsstcp.py @@ -51,7 +51,7 @@ # This approach has the very important benefit that it is single threaded. -class Handler(object): +class Handler: """Based on asyncore, but with a simpler, stricter per-thread interface that allows better performance.""" def __init__(self): @@ -69,7 +69,7 @@ def run(self): while self.running: try: self.poll() - except IOError as e: + except OSError as e: if e.errno == errno.EINTR: continue raise @@ -205,7 +205,7 @@ def stop(self, disp): self.close() -class Dispatcher(object): +class Dispatcher: def __init__(self, sock, handler, mask=0): self.sock = sock self.fd = sock.fileno() @@ -221,13 +221,13 @@ def close(self): self.handler.unregister(self) try: self.sock.close() - except socket.error: + except OSError: pass def accept(self): try: return self.sock.accept() - except socket.error as e: + except OSError as e: if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): return if e.errno in _DISCONNECTED or e.errno == errno.EINVAL: @@ -238,7 +238,7 @@ def accept(self): def send(self, data): try: return self.sock.send(data) - except socket.error as e: + except OSError as e: if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): return 0 if e.errno in _DISCONNECTED: @@ -252,7 +252,7 @@ def recv(self, siz): if not data: self.handle_close() return data - except socket.error as e: + except OSError as e: if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): return b'' if e.errno in _DISCONNECTED: @@ -266,7 +266,7 @@ def recv_into(self, buf): if n == 0: self.handle_close() return n - except socket.error as e: + except OSError as e: if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): return b'' if e.errno in _DISCONNECTED: @@ -278,7 +278,7 @@ def shutdown(self): try: self.mask |= self.handler.IN self.sock.shutdown(socket.SHUT_RDWR) - except socket.error as e: + except OSError as e: if e.errno not in _DISCONNECTED: raise @@ -291,7 +291,7 @@ class StreamDispatcher(Dispatcher): Also allows input of known-size blocks.""" def __init__(self, sock, handler): - super(StreamDispatcher, self).__init__(sock, handler) + super().__init__(sock, handler) self.out_buf = [] self.in_buf = memoryview(bytearray(_BUFSIZ)) self.in_off = 0 @@ -346,7 +346,7 @@ def __init__(self, pair, server, handler): # Keep track of any currently waiting get: self.waiter = None sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - super(KVSRequestDispatcher, self).__init__(sock, handler) + super().__init__(sock, handler) logger.info('Accepted connect from %r', self.addr) self.next_op() self.open() @@ -357,7 +357,7 @@ def handle_close(self): self.close() def error(self, msg): - logger.error('Error from %r: %s' % (self.addr, msg)) + logger.error(f'Error from {self.addr!r}: {msg}') self.close() def cancel_waiter(self): @@ -377,7 +377,7 @@ def handle_len(L): except ValueError: n = -1 if n < 0: - self.error("invalid data len: '%s'" % L) + self.error(f"invalid data len: '{L}'") return self.next_read(n, handler) @@ -397,7 +397,7 @@ def handle_op(self, op): elif op in [b'get_', b'mkey', b'put_', b'view']: self.next_lendata(partial(self.handle_opkey, op)) else: - self.error("Unknown op: '%r'" % op) + self.error(f"Unknown op: '{op!r}'") def handle_opkey(self, op, key): key = key.tobytes() @@ -446,7 +446,7 @@ def __init__(self, op, key, handler): self.handler = handler -class KVS(object): +class KVS: """Get/Put/View implements a client-server key value store. If no value is associated with a given key, clients will block on get or view until a value is available. Multiple values may be associated @@ -654,7 +654,7 @@ def handle_close(self): def shutdown(self): if self.handler.running: - super(KVSServer, self).shutdown() + super().shutdown() self.handler.stop(self) def env(self, env=os.environ.copy()): diff --git a/pyproject.toml b/pyproject.toml index e8551a7..3b46acb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,9 @@ build-backend = "hatchling.build" [tool.ruff.format] quote-style = "single" +[tool.ruff.lint] +select = ["E4", "E7", "E9", "F", "UP"] + [tool.hatch.version] source = "vcs"