From ed0f93f35c5a8f8105e9f2e3461f4f8b7dfc5d7d Mon Sep 17 00:00:00 2001 From: Aleteoryx Date: Tue, 23 Dec 2025 18:54:35 -0500 Subject: [PATCH] runner --- employee.py | 167 ++++++++++++++++++++++++++++++++++++++++++++++++++++ employer.py | 37 ++++++++++-- 2 files changed, 199 insertions(+), 5 deletions(-) create mode 100755 employee.py mode change 100644 => 100755 employer.py diff --git a/employee.py b/employee.py new file mode 100755 index 0000000000000000000000000000000000000000..f5e6bb7951646406941b561f42f222c8bb48f4d4 --- /dev/null +++ b/employee.py @@ -0,0 +1,167 @@ +#!/bin/env python3 + +""" +this software distributed under a "fuck around, find out" license. the +author gives nobody in the world any explicit legal right to use this +code for any purpose whatsoever. the author reserves the right to send +holy legal fury down from the heavens should you invoke her ire. + +the author does not have the money to do so, in any juris diction +worldwide. the author is also prety chill, she thinks, so, you know, do +what you want or whatever. the text of this license is intentionally +god damn stupid specifically to scare away corporate lawyers and +attract internet queers + +if you do choose to redistribute this software (to "fuck around", as it +were), you should change this disclaimer some and make it funnier + +the mitochondrea is the powerhouse of the cell +""" + +import os +from sys import argv, exit, stderr +import logging +import time +import datetime +from subprocess import PIPE +import asyncio +from tempfile import mktemp +from pathlib import Path + +argv0 = 'employee.py' + + +def usage(): + global argv0 + print(f'usage: python3 {argv0} ', ) + exit(-1) + + +async def take_job(reader, writer): + l = logging.getLogger('employee.job') + + while True: + l.debug('polling for job') + writer.write(b'TAKE JOB\n') + await writer.drain() + + line = (await reader.readline()).strip() + if line != b'?': + jid = int(line.decode()) + await reader.readline() + break + + await asyncio.sleep(5) + + l.info(f'took job {jid}') + + writer.write(b'SCRIPT IS\n') + await writer.drain() + + line = (await reader.readline()).strip() + length = int(line.decode()) + script = await reader.readexactly(length) + + await reader.readline() + + return script + + +async def log_stream(stream, name, log): + async for line in stream: + try: + line = line.decode().replace('\n', '\\n') + except: + line = repr(line) + + log(f'{name}: {line}') +async def wait_proc(proc, l): + async with asyncio.TaskGroup() as tg: + tg.create_task(log_stream(proc.stdout, 'STDOUT', l.info)) + tg.create_task(log_stream(proc.stderr, 'STDERR', l.warn)) + await proc.wait() + + return proc.returncode + +async def poll_cancel(reader, writer): + while True: + writer.write(b'EXISTS\n') + await writer.drain() + + if await reader.readline() == b'?\n': + return + + await asyncio.sleep(5) + +async def exec_script(script, l): + file = Path(mktemp(prefix='employ')) + l.debug(f'saving script as {str(file)}') + with open(file, 'wb') as fp: + fp.write(script) + + file.chmod(file.stat().st_mode | 0o700) + + return file, await asyncio.create_subprocess_exec(file, file, stdout=PIPE, stderr=PIPE) + +async def run_script(reader, writer, script): + l = logging.getLogger('employee.exec') + + l.info('executing script...') + file, proc = await exec_script(script, l) + + l.info('attaching stdio...') + waiter = asyncio.create_task(wait_proc(proc, l)) + killer = asyncio.create_task(poll_cancel(reader, writer)) + + which, _ = await asyncio.wait([waiter, killer], return_when=asyncio.FIRST_COMPLETED) + + if which.pop() is waiter: + killer.cancel() + state = 'done' if proc.returncode == 0 else 'failed' + await complete_job(reader, writer, state, l) + l.info(f'job returned {proc.returncode}, marked {state}') + else: + waiter.cancel() + proc.terminate() + l.info(f'job cancelled') + + file.unlink() + +async def complete_job(reader, writer, state, l): + writer.write(f'MARK AS {state}\n'.encode()) + await writer.drain() + + line = await reader.readline() + if line != b'OKAY\n': + l.warn(f'got non-OKAY response completing job: {line}') + +async def main(): + global argv, argv0 + + l = logging.getLogger('employee') + + if len(argv) > 0: + argv0 = argv[0] + argv = argv[1:] + if len(argv) != 2: + usage() + + host, port = argv + + l.info(f'connecting to {host}:{port}...') + reader, writer = await asyncio.open_connection(host, port) + l.info(f'connected!') + + while True: + script = await take_job(reader, writer) + await run_script(reader, writer, script) + + +if __name__ == '__main__': + logging.basicConfig(stream=stderr, level=logging.INFO, format='[%(asctime)s %(levelname)s %(name)s] %(msg)s') + asyncio.run(main()) + + + + + diff --git a/employer.py b/employer.py old mode 100644 new mode 100755 index 9eb689ee0ae534edda6d06448aa83b2fab0c62ad..2c017f3968731d002ec2f8ebfee3536c9f3f7c5f --- a/employer.py +++ b/employer.py @@ -1,10 +1,29 @@ +#!/bin/env python3 + +""" +this software distributed under a "fuck around, find out" license. the +author gives nobody in the world any explicit legal right to use this +code for any purpose whatsoever. the author reserves the right to send +holy legal fury down from the heavens should you invoke her ire. + +the author does not have the money to do so, in any juris diction +worldwide. the author is also prety chill, she thinks, so, you know, do +what you want or whatever. the text of this license is intentionally +god damn stupid specifically to scare away corporate lawyers and +attract internet queers + +if you do choose to redistribute this software (to "fuck around", as it +were), you should change this disclaimer some and make it funnier + +the mitochondrea is the powerhouse of the cell + +""" + from sys import argv, stdin, stdout, stderr, exit, exc_info import asyncio import pickle import gzip -import time import logging -import base64 as b64 from pathlib import Path from dataclasses import dataclass, field, asdict from datetime import datetime @@ -17,7 +36,7 @@ argv0 = 'employer.py' def usage(): global argv0 - print(f'usage: python {argv0} [host] ', file=stderr) + print(f'usage: python3 {argv0} [host] ', file=stderr) exit(-1) @@ -224,6 +243,14 @@ async def handle_line(topic, tokens, line, reader, writer) -> int: del jobs[topic] case ['cancel']: del jobs[topic] + + case ['job', int(topic), 'exists']: + jobs[topic] + case ['exists']: + jobs[topic] + + case ['quit']: + writer.close() case _: raise ValueError(f'Unknown command {line!r}') @@ -267,7 +294,7 @@ async def accept(reader, writer): async with jobs: topic = await handle_line(topic, tokens, line, reader, writer) writer.write(b'OKAY\n') - except ValueError: + except (ValueError, KeyError): writer.write(b'?\n') except Exception: l.debug('', exc_info=exc_info()) @@ -280,7 +307,7 @@ async def accept(reader, writer): l.info(f'disconnect from {peer}') async def main(): - global jobs, argv + global jobs, argv, argv0 if len(argv) > 0: argv0 = argv[0]