#!/bin/env python3 """ this software distributed under a "fuck around, find out" license. the author gives nobody in the world any explicit legal right to use this code for any purpose whatsoever. the author reserves the right to send holy legal fury down from the heavens should you invoke her ire. the author does not have the money to do so, in any juris diction worldwide. the author is also prety chill, she thinks, so, you know, do what you want or whatever. the text of this license is intentionally god damn stupid specifically to scare away corporate lawyers and attract internet queers if you do choose to redistribute this software (to "fuck around", as it were), you should change this disclaimer some and make it funnier the mitochondrea is the powerhouse of the cell """ import os from sys import argv, exit, stderr import logging import time import datetime from subprocess import PIPE import asyncio from tempfile import mktemp from pathlib import Path argv0 = 'employee.py' def usage(): global argv0 print(f'usage: python3 {argv0} ', ) exit(-1) async def take_job(reader, writer): l = logging.getLogger('employee.job') while True: l.debug('polling for job') writer.write(b'TAKE JOB\n') await writer.drain() line = (await reader.readline()).strip() if line != b'?': jid = int(line.decode()) await reader.readline() break await asyncio.sleep(5) l.info(f'took job {jid}') writer.write(b'SCRIPT IS\n') await writer.drain() line = (await reader.readline()).strip() length = int(line.decode()) script = await reader.readexactly(length) await reader.readline() return script async def log_stream(stream, name, log): async for line in stream: try: line = line.decode().replace('\n', '\\n') except: line = repr(line) log(f'{name}: {line}') async def wait_proc(proc, l): async with asyncio.TaskGroup() as tg: tg.create_task(log_stream(proc.stdout, 'STDOUT', l.info)) tg.create_task(log_stream(proc.stderr, 'STDERR', l.warn)) await proc.wait() return proc.returncode async def poll_cancel(reader, writer): while True: writer.write(b'EXISTS\n') await writer.drain() if await reader.readline() == b'?\n': return await asyncio.sleep(5) async def exec_script(script, l): file = Path(mktemp(prefix='employ')) l.debug(f'saving script as {str(file)}') with open(file, 'wb') as fp: fp.write(script) file.chmod(file.stat().st_mode | 0o700) return file, await asyncio.create_subprocess_exec(file, file, stdout=PIPE, stderr=PIPE) async def run_script(reader, writer, script): l = logging.getLogger('employee.exec') l.info('executing script...') file, proc = await exec_script(script, l) l.info('attaching stdio...') waiter = asyncio.create_task(wait_proc(proc, l)) killer = asyncio.create_task(poll_cancel(reader, writer)) which, _ = await asyncio.wait([waiter, killer], return_when=asyncio.FIRST_COMPLETED) if which.pop() is waiter: killer.cancel() state = 'done' if proc.returncode == 0 else 'failed' await complete_job(reader, writer, state, l) l.info(f'job returned {proc.returncode}, marked {state}') else: waiter.cancel() proc.terminate() l.info(f'job cancelled') file.unlink() async def complete_job(reader, writer, state, l): writer.write(f'MARK AS {state}\n'.encode()) await writer.drain() line = await reader.readline() if line != b'OKAY\n': l.warn(f'got non-OKAY response completing job: {line}') async def main(): global argv, argv0 l = logging.getLogger('employee') if len(argv) > 0: argv0 = argv[0] argv = argv[1:] if len(argv) != 2: usage() host, port = argv l.info(f'connecting to {host}:{port}...') reader, writer = await asyncio.open_connection(host, port) l.info(f'connected!') while True: script = await take_job(reader, writer) await run_script(reader, writer, script) if __name__ == '__main__': logging.basicConfig(stream=stderr, level=logging.INFO, format='[%(asctime)s %(levelname)s %(name)s] %(msg)s') asyncio.run(main())