#!/bin/env python3 """ this software distributed under a "fuck around, find out" license. the author gives nobody in the world any explicit legal right to use this code for any purpose whatsoever. the author reserves the right to send holy legal fury down from the heavens should you invoke her ire. the author does not have the money to do so, in any juris diction worldwide. the author is also prety chill, she thinks, so, you know, do what you want or whatever. the text of this license is intentionally god damn stupid specifically to scare away corporate lawyers and attract internet queers if you do choose to redistribute this software (to "fuck around", as it were), you should change this disclaimer some and make it funnier the mitochondrea is the powerhouse of the cell """ import os from sys import argv, exit, stderr import logging import time import datetime from subprocess import PIPE import asyncio from tempfile import mktemp from pathlib import Path argv0 = 'employee.py' def usage(): global argv0 print(f'usage: python3 {argv0} ', ) exit(-1) co_lock = asyncio.Lock() class JobError(Exception): pass async def send_cmd(reader, writer, cmd): global co_lock async def impl(): lines = [] async with co_lock: writer.write(cmd.encode()+b'\n') await writer.drain() while True: line = (await reader.readline()).rstrip() if line == b'?': raise JobError(f'{cmd} {lines}') elif line == b'OKAY': return lines else: lines.append(line.decode()) return await asyncio.shield(asyncio.create_task(impl())) async def take_job(reader, writer): global co_lock l = logging.getLogger('employee.job') while True: l.debug('polling for job') try: lines = await send_cmd(reader, writer, 'TAKE JOB') jid = lines[0] break except JobError: pass await asyncio.sleep(5) l.info(f'took job {jid}') async with co_lock: writer.write(b'SCRIPT IS\n') await writer.drain() line = (await reader.readline()).strip() if line == b'?': raise JobError('SCRIPT IS') length = int(line.decode()) script = await reader.readexactly(length) await reader.readline() return script async def log_stream(stream, name, log): async for line in stream: try: line = line.decode().replace('\n', '') except: line = repr(line) log(f'{name}: {line}') async def wait_proc(proc, l): async with asyncio.TaskGroup() as tg: tg.create_task(log_stream(proc.stdout, 'STDOUT', l.info)) tg.create_task(log_stream(proc.stderr, 'STDERR', l.warn)) await proc.wait() return proc.returncode async def poll_cancel(reader, writer): while True: try: await send_cmd(reader, writer, 'EXISTS') except JobError: return await asyncio.sleep(5) async def exec_script(script, l): file = Path(mktemp(prefix='employ')) l.debug(f'saving script as {str(file)}') with open(file, 'wb') as fp: fp.write(script) file.chmod(file.stat().st_mode | 0o700) return file, await asyncio.create_subprocess_exec(file, file, stdout=PIPE, stderr=PIPE) async def run_script(reader, writer, script): l = logging.getLogger('employee.exec') l.info('executing script...') file, proc = await exec_script(script, l) l.info('attaching stdio...') waiter = asyncio.create_task(wait_proc(proc, l)) killer = asyncio.create_task(poll_cancel(reader, writer)) which, _ = await asyncio.wait([waiter, killer], return_when=asyncio.FIRST_COMPLETED) if which.pop() is waiter: killer.cancel() while not killer.cancelled(): await asyncio.sleep(1) state = 'done' if proc.returncode == 0 else 'failed' await complete_job(reader, writer, state, l) l.info(f'job returned {proc.returncode}, marked {state}') else: waiter.cancel() while not waiter.cancelled(): await asyncio.sleep(1) proc.terminate() l.info(f'job cancelled') file.unlink() async def complete_job(reader, writer, state, l): try: await send_cmd(reader, writer, f'MARK AS {state}') except: l.warn(f'got non-OKAY response completing job: {line}') async def main(): global argv, argv0 l = logging.getLogger('employee') if len(argv) > 0: argv0 = argv[0] argv = argv[1:] if len(argv) != 2: usage() host, port = argv l.info(f'connecting to {host}:{port}...') reader, writer = await asyncio.open_connection(host, port) l.info(f'connected!') while True: try: script = await take_job(reader, writer) await run_script(reader, writer, script) except JobError as e: l.error(f'got {e}, skipping') if __name__ == '__main__': logging.basicConfig(stream=stderr, level=logging.INFO, format='[%(asctime)s %(levelname)s %(name)s] %(msg)s') asyncio.run(main())