From a9f243dd6e62eb9597824a6fae5f75d0ad5da5a2 Mon Sep 17 00:00:00 2001 From: Aleteoryx Date: Thu, 25 Dec 2025 00:30:41 -0500 Subject: [PATCH] do networking properly jesus --- employee.py | 62 ++++++++++++++++++++++++++++++++++++++--------------- taken.py | 4 +++- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/employee.py b/employee.py index f5e6bb7951646406941b561f42f222c8bb48f4d4..05f03f53c601f5e83ef44685f71393e27efefd4c 100755 --- a/employee.py +++ b/employee.py @@ -37,19 +37,46 @@ def usage(): exit(-1) +co_lock = asyncio.Lock() + +class JobError(Exception): + pass +async def send_cmd(reader, writer, cmd): + + global co_lock + + async def impl(): + lines = [] + + async with co_lock: + writer.write(cmd.encode()+b'\n') + print(cmd) + await writer.drain() + while True: + line = (await reader.readline()).rstrip() + print(line) + if line == b'?': + raise JobError(cmd) + elif line == b'OKAY': + return lines + else: + lines.append(line.decode()) + + return await asyncio.shield(asyncio.create_task(impl())) + + async def take_job(reader, writer): l = logging.getLogger('employee.job') while True: l.debug('polling for job') - writer.write(b'TAKE JOB\n') - await writer.drain() - - line = (await reader.readline()).strip() - if line != b'?': - jid = int(line.decode()) - await reader.readline() + + try: + lines = await send_cmd(reader, writer, 'TAKE JOB') + jid = lines[0] break + except JobError: + pass await asyncio.sleep(5) @@ -70,7 +97,7 @@ async def take_job(reader, writer): async def log_stream(stream, name, log): async for line in stream: try: - line = line.decode().replace('\n', '\\n') + line = line.decode().replace('\n', '') except: line = repr(line) @@ -85,10 +112,9 @@ async def wait_proc(proc, l): async def poll_cancel(reader, writer): while True: - writer.write(b'EXISTS\n') - await writer.drain() - - if await reader.readline() == b'?\n': + try: + await send_cmd(reader, writer, 'EXISTS') + except JobError: return await asyncio.sleep(5) @@ -117,22 +143,24 @@ async def run_script(reader, writer, script): if which.pop() is waiter: killer.cancel() + while not killer.cancelled(): + await asyncio.sleep(1) state = 'done' if proc.returncode == 0 else 'failed' await complete_job(reader, writer, state, l) l.info(f'job returned {proc.returncode}, marked {state}') else: waiter.cancel() + while not waiter.cancelled(): + await asyncio.sleep(1) proc.terminate() l.info(f'job cancelled') file.unlink() async def complete_job(reader, writer, state, l): - writer.write(f'MARK AS {state}\n'.encode()) - await writer.drain() - - line = await reader.readline() - if line != b'OKAY\n': + try: + await send_cmd(reader, writer, f'MARK AS {state}') + except: l.warn(f'got non-OKAY response completing job: {line}') async def main(): diff --git a/taken.py b/taken.py index 1218088d8310055bb9432a4084eef47a8322cf9f..aa97ff9310c92c5c6d17a1e2eee5daab5a03d00a 100755 --- a/taken.py +++ b/taken.py @@ -10,6 +10,8 @@ def usage(): print(f'usage: {argv0} ', file=stderr) exit(-1) +class JobError(Exception): + pass def send_cmd(reader, writer, cmd): lines = [] @@ -18,7 +20,7 @@ def send_cmd(reader, writer, cmd): while True: line = reader.readline() if line == b'?\n': - raise Exception('bad cmd') + raise JobError(cmd) elif line == b'OKAY\n': return lines else: