#!/bin/env python3
"""
this software distributed under a "fuck around, find out" license. the
author gives nobody in the world any explicit legal right to use this
code for any purpose whatsoever. the author reserves the right to send
holy legal fury down from the heavens should you invoke her ire.
the author does not have the money to do so, in any juris diction
worldwide. the author is also prety chill, she thinks, so, you know, do
what you want or whatever. the text of this license is intentionally
god damn stupid specifically to scare away corporate lawyers and
attract internet queers
if you do choose to redistribute this software (to "fuck around", as it
were), you should change this disclaimer some and make it funnier
the mitochondrea is the powerhouse of the cell
"""
import os
from sys import argv, exit, stderr
import logging
import time
import datetime
from subprocess import PIPE
import asyncio
from tempfile import mktemp
from pathlib import Path
argv0 = 'employee.py'
def usage():
global argv0
print(f'usage: python3 {argv0} <host> <port>', )
exit(-1)
co_lock = asyncio.Lock()
class JobError(Exception):
pass
async def send_cmd(reader, writer, cmd):
global co_lock
async def impl():
lines = []
async with co_lock:
writer.write(cmd.encode()+b'\n')
await writer.drain()
while True:
line = (await reader.readline()).rstrip()
if line == b'?':
raise JobError(f'{cmd} {lines}')
elif line == b'OKAY':
return lines
else:
lines.append(line.decode())
return await asyncio.shield(asyncio.create_task(impl()))
async def take_job(reader, writer):
global co_lock
l = logging.getLogger('employee.job')
while True:
l.debug('polling for job')
try:
lines = await send_cmd(reader, writer, 'TAKE JOB')
jid = lines[0]
break
except JobError:
pass
await asyncio.sleep(5)
l.info(f'took job {jid}')
async with co_lock:
writer.write(b'SCRIPT IS\n')
await writer.drain()
line = (await reader.readline()).strip()
if line == b'?':
raise JobError('SCRIPT IS')
length = int(line.decode())
script = await reader.readexactly(length)
await reader.readline()
return script
async def log_stream(stream, name, log):
async for line in stream:
try:
line = line.decode().replace('\n', '')
except:
line = repr(line)
log(f'{name}: {line}')
async def wait_proc(proc, l):
async with asyncio.TaskGroup() as tg:
tg.create_task(log_stream(proc.stdout, 'STDOUT', l.info))
tg.create_task(log_stream(proc.stderr, 'STDERR', l.warn))
await proc.wait()
return proc.returncode
async def poll_cancel(reader, writer):
while True:
try:
await send_cmd(reader, writer, 'EXISTS')
except JobError:
return
await asyncio.sleep(5)
async def exec_script(script, l):
file = Path(mktemp(prefix='employ'))
l.debug(f'saving script as {str(file)}')
with open(file, 'wb') as fp:
fp.write(script)
file.chmod(file.stat().st_mode | 0o700)
return file, await asyncio.create_subprocess_exec(file, file, stdout=PIPE, stderr=PIPE)
async def run_script(reader, writer, script):
l = logging.getLogger('employee.exec')
l.info('executing script...')
file, proc = await exec_script(script, l)
l.info('attaching stdio...')
waiter = asyncio.create_task(wait_proc(proc, l))
killer = asyncio.create_task(poll_cancel(reader, writer))
which, _ = await asyncio.wait([waiter, killer], return_when=asyncio.FIRST_COMPLETED)
if which.pop() is waiter:
killer.cancel()
while not killer.cancelled():
await asyncio.sleep(1)
state = 'done' if proc.returncode == 0 else 'failed'
await complete_job(reader, writer, state, l)
l.info(f'job returned {proc.returncode}, marked {state}')
else:
waiter.cancel()
while not waiter.cancelled():
await asyncio.sleep(1)
proc.terminate()
l.info(f'job cancelled')
file.unlink()
async def complete_job(reader, writer, state, l):
try:
await send_cmd(reader, writer, f'MARK AS {state}')
except:
l.warn(f'got non-OKAY response completing job: {line}')
async def main():
global argv, argv0
l = logging.getLogger('employee')
if len(argv) > 0:
argv0 = argv[0]
argv = argv[1:]
if len(argv) != 2:
usage()
host, port = argv
l.info(f'connecting to {host}:{port}...')
reader, writer = await asyncio.open_connection(host, port)
l.info(f'connected!')
while True:
try:
script = await take_job(reader, writer)
await run_script(reader, writer, script)
except JobError as e:
l.error(f'got {e}, skipping')
if __name__ == '__main__':
logging.basicConfig(stream=stderr, level=logging.INFO, format='[%(asctime)s %(levelname)s %(name)s] %(msg)s')
asyncio.run(main())