A employee.py => employee.py +167 -0
@@ 0,0 1,167 @@
+#!/bin/env python3
+
+"""
+this software distributed under a "fuck around, find out" license. the
+author gives nobody in the world any explicit legal right to use this
+code for any purpose whatsoever. the author reserves the right to send
+holy legal fury down from the heavens should you invoke her ire.
+
+the author does not have the money to do so, in any juris diction
+worldwide. the author is also prety chill, she thinks, so, you know, do
+what you want or whatever. the text of this license is intentionally
+god damn stupid specifically to scare away corporate lawyers and
+attract internet queers
+
+if you do choose to redistribute this software (to "fuck around", as it
+were), you should change this disclaimer some and make it funnier
+
+the mitochondrea is the powerhouse of the cell
+"""
+
+import os
+from sys import argv, exit, stderr
+import logging
+import time
+import datetime
+from subprocess import PIPE
+import asyncio
+from tempfile import mktemp
+from pathlib import Path
+
+argv0 = 'employee.py'
+
+
+def usage():
+ global argv0
+ print(f'usage: python3 {argv0} <host> <port>', )
+ exit(-1)
+
+
+async def take_job(reader, writer):
+ l = logging.getLogger('employee.job')
+
+ while True:
+ l.debug('polling for job')
+ writer.write(b'TAKE JOB\n')
+ await writer.drain()
+
+ line = (await reader.readline()).strip()
+ if line != b'?':
+ jid = int(line.decode())
+ await reader.readline()
+ break
+
+ await asyncio.sleep(5)
+
+ l.info(f'took job {jid}')
+
+ writer.write(b'SCRIPT IS\n')
+ await writer.drain()
+
+ line = (await reader.readline()).strip()
+ length = int(line.decode())
+ script = await reader.readexactly(length)
+
+ await reader.readline()
+
+ return script
+
+
+async def log_stream(stream, name, log):
+ async for line in stream:
+ try:
+ line = line.decode().replace('\n', '\\n')
+ except:
+ line = repr(line)
+
+ log(f'{name}: {line}')
+async def wait_proc(proc, l):
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(log_stream(proc.stdout, 'STDOUT', l.info))
+ tg.create_task(log_stream(proc.stderr, 'STDERR', l.warn))
+ await proc.wait()
+
+ return proc.returncode
+
+async def poll_cancel(reader, writer):
+ while True:
+ writer.write(b'EXISTS\n')
+ await writer.drain()
+
+ if await reader.readline() == b'?\n':
+ return
+
+ await asyncio.sleep(5)
+
+async def exec_script(script, l):
+ file = Path(mktemp(prefix='employ'))
+ l.debug(f'saving script as {str(file)}')
+ with open(file, 'wb') as fp:
+ fp.write(script)
+
+ file.chmod(file.stat().st_mode | 0o700)
+
+ return file, await asyncio.create_subprocess_exec(file, file, stdout=PIPE, stderr=PIPE)
+
+async def run_script(reader, writer, script):
+ l = logging.getLogger('employee.exec')
+
+ l.info('executing script...')
+ file, proc = await exec_script(script, l)
+
+ l.info('attaching stdio...')
+ waiter = asyncio.create_task(wait_proc(proc, l))
+ killer = asyncio.create_task(poll_cancel(reader, writer))
+
+ which, _ = await asyncio.wait([waiter, killer], return_when=asyncio.FIRST_COMPLETED)
+
+ if which.pop() is waiter:
+ killer.cancel()
+ state = 'done' if proc.returncode == 0 else 'failed'
+ await complete_job(reader, writer, state, l)
+ l.info(f'job returned {proc.returncode}, marked {state}')
+ else:
+ waiter.cancel()
+ proc.terminate()
+ l.info(f'job cancelled')
+
+ file.unlink()
+
+async def complete_job(reader, writer, state, l):
+ writer.write(f'MARK AS {state}\n'.encode())
+ await writer.drain()
+
+ line = await reader.readline()
+ if line != b'OKAY\n':
+ l.warn(f'got non-OKAY response completing job: {line}')
+
+async def main():
+ global argv, argv0
+
+ l = logging.getLogger('employee')
+
+ if len(argv) > 0:
+ argv0 = argv[0]
+ argv = argv[1:]
+ if len(argv) != 2:
+ usage()
+
+ host, port = argv
+
+ l.info(f'connecting to {host}:{port}...')
+ reader, writer = await asyncio.open_connection(host, port)
+ l.info(f'connected!')
+
+ while True:
+ script = await take_job(reader, writer)
+ await run_script(reader, writer, script)
+
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=stderr, level=logging.INFO, format='[%(asctime)s %(levelname)s %(name)s] %(msg)s')
+ asyncio.run(main())
+
+
+
+
+
M employer.py => employer.py +32 -5
@@ 1,10 1,29 @@
+#!/bin/env python3
+
+"""
+this software distributed under a "fuck around, find out" license. the
+author gives nobody in the world any explicit legal right to use this
+code for any purpose whatsoever. the author reserves the right to send
+holy legal fury down from the heavens should you invoke her ire.
+
+the author does not have the money to do so, in any juris diction
+worldwide. the author is also prety chill, she thinks, so, you know, do
+what you want or whatever. the text of this license is intentionally
+god damn stupid specifically to scare away corporate lawyers and
+attract internet queers
+
+if you do choose to redistribute this software (to "fuck around", as it
+were), you should change this disclaimer some and make it funnier
+
+the mitochondrea is the powerhouse of the cell
+
+"""
+
from sys import argv, stdin, stdout, stderr, exit, exc_info
import asyncio
import pickle
import gzip
-import time
import logging
-import base64 as b64
from pathlib import Path
from dataclasses import dataclass, field, asdict
from datetime import datetime
@@ 17,7 36,7 @@ argv0 = 'employer.py'
def usage():
global argv0
- print(f'usage: python {argv0} <db path> [host] <port>', file=stderr)
+ print(f'usage: python3 {argv0} <db path> [host] <port>', file=stderr)
exit(-1)
@@ 224,6 243,14 @@ async def handle_line(topic, tokens, line, reader, writer) -> int:
del jobs[topic]
case ['cancel']:
del jobs[topic]
+
+ case ['job', int(topic), 'exists']:
+ jobs[topic]
+ case ['exists']:
+ jobs[topic]
+
+ case ['quit']:
+ writer.close()
case _:
raise ValueError(f'Unknown command {line!r}')
@@ 267,7 294,7 @@ async def accept(reader, writer):
async with jobs:
topic = await handle_line(topic, tokens, line, reader, writer)
writer.write(b'OKAY\n')
- except ValueError:
+ except (ValueError, KeyError):
writer.write(b'?\n')
except Exception:
l.debug('', exc_info=exc_info())
@@ 280,7 307,7 @@ async def accept(reader, writer):
l.info(f'disconnect from {peer}')
async def main():
- global jobs, argv
+ global jobs, argv, argv0
if len(argv) > 0:
argv0 = argv[0]