From 89ef9eae498bdb070ebce1591a3e4525855cdc4b Mon Sep 17 00:00:00 2001 From: Aleteoryx Date: Mon, 22 Dec 2025 17:53:11 -0500 Subject: [PATCH] what's this??? --- PLAN | 48 +++++++++ README.md | 3 + employer.py | 292 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 343 insertions(+) create mode 100644 PLAN create mode 100644 README.md create mode 100644 employer.py diff --git a/PLAN b/PLAN new file mode 100644 index 0000000000000000000000000000000000000000..527db1c8d50fa8441126ce367012c50fff9fc397 --- /dev/null +++ b/PLAN @@ -0,0 +1,48 @@ +a job is: + - id + - executable data + - marking: new/taken/done/failed + - dates: created, taken, finished + - metadata: dict[str,str] + keys cannot include : or ' ', neither can include \n + +server protocol: + case-insensitive commands separated by \n + on error, send back '?', else send back 'OKAY' + +server verbs: + - '' + ignored + - NEW JOB len + server reads in len bytes, and sends back a job id + - LIST marking? JOBS + server sends back a list of all jobs, in the format: + id\tcreated\tlen\tmarking + - INFO [FOR JOB id] IS + server sends back a response of the form: + SCRIPT IS len BYTES + MARKED AS marking + CREATED|TAKEN|FINISHED|UPDATED AT iso-8601 + - SCRIPT [FOR JOB id] IS len? + get the script as len\ndata, or update the script. + if a job is not marked 'new', the update fails + + - META [FOR JOB id] IS + get metadata for id as key: value lines, terminated with a blank line + - META key [FOR JOB id] IS value? + set/get id.key + - TAKE JOB id? + mark a job as taken, record timestamp + if n is unset, pick the next available job and send it + back + - MARK [JOB n] AS marking + mark a job as complete, record timestamp + - CANCEL [JOB n] + delete a job from the job queue + + - all others: + send back ? + + + + diff --git a/README.md b/README.md new file mode 100644 index 0000000000000000000000000000000000000000..05fbad98eb1f517edf70ea3b992d641bc6662263 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +## employ + +a simple job server. diff --git a/employer.py b/employer.py new file mode 100644 index 0000000000000000000000000000000000000000..55f5829321dfd2406c369fd3234831a35f76753e --- /dev/null +++ b/employer.py @@ -0,0 +1,292 @@ +from sys import argv, stdin, stdout, stderr, exit, exc_info +import asyncio +import pickle +import gzip +import time +import logging +import base64 as b64 +from pathlib import Path +from dataclasses import dataclass, field, asdict +from datetime import datetime +from typing import Dict, Optional, List + + +MARKINGS = ('new', 'available', 'taken', 'done', 'failed') +argv0 = 'employer.py' + + +def usage(): + global argv0 + print(f'usage: python {argv0} [host] ', file=stderr) + exit(-1) + + +@dataclass +class Job: + jid: int + script: bytes + meta: Dict[str,str] = field(default_factory=dict) + marking: str = 'new' + + created: datetime = field(default_factory=datetime.now) + taken: Optional[datetime] = None + completed: Optional[datetime] = None + updated: Optional[datetime] = None + + def update(self): + self.updated = datetime.now() + + def __getitem__(self, k): + return self.meta.get(k, '') + def __setitem__(self, k, v): + if type(k) is not str or type(v) is not str: + raise ValueError('key and value must be str') + + if v == '': + del self.meta[k] + else: + self.meta[k] = v + self.update() + def __delitem__(self, k): + del self.meta[k] + + def take(self): + if self.marking != 'available': + raise ValueError("can't take a job that's been taken") + self.marking = 'taken' + self.taken = datetime.now() + + self.update() + + def mark(self, marking): + global MARKINGS + if marking not in MARKINGS: + raise ValueError(f"{marking}: not a marking") + + if self.marking == marking: + return + + self.marking = marking + if marking in ('new', 'available'): + self.taken = None + self.completed = None + elif marking == 'taken': + self.taken = datetime.now() + self.completed = None + else: + self.completed = datetime.now() + + self.update() + + def infostr(self) -> str: + s = f'SCRIPT IS {len(self.script)} BYTES\nMARKED AS {self.marking}\nCREATED AT {self.created.isoformat()}\n' + if self.taken is not None: + s += f'TAKEN AT {self.taken.isoformat()}\n' + if self.completed is not None: + s += f'COMPLETED AT {self.completed.isoformat()}\n' + if self.updated is not None: + s += f'UPDATED AT {self.updated.isoformat()}\n' + return s + def linestr(self) -> str: + return f'{self.jid}\t{self.created.isoformat()}\t{len(self.script)}\t{self.marking}\n' + def metastr(self) -> str: + return ''.join(f'{k}: {v}\n' for (k, v) in self.meta.items()) + def script(self) -> bytes: + return f'{len(self.script)}\n'.encode() + self.script + +class Jobs: + def __init__(self, filename): + self.filename = filename + self.lock = asyncio.Lock() + + try: + fp = open(filename+'.lock', 'x') + fp.write('') + except FileExistsError: + raise Exception(f"{filename} is already locked. kill the other instance or delete {filename}.lock.") + + if Path(filename).exists(): + data = pickle.load(gzip.open(filename, 'rb')) + self.nextid = data['nextid'] + self.jobs = data['jobs'] + else: + self.nextid = 0 + self.jobs = {} + self.save() + def __del__(self): + Path(self.filename+'.lock').unlink() + + def __getitem__(self, k): + global MARKINGS + if type(k) is int: + return self.jobs[k] + elif k in MARKINGS: + return filter(lambda j: j.marking == k, self.jobs.values()) + else: + raise KeyError(k) + def __delitem__(self, k): + if type(k) is Job: + del self.jobs[k.jid] + else: + del self.jobs[k] + + async def __aenter__(self): + await self.lock.acquire() + async def __aexit__(self, _exc_ty, _exc_val, _tb): + self.save() + self.lock.release() + + def save(self): + data = {'nextid': self.nextid, 'jobs': self.jobs} + with gzip.open(self.filename, 'wb') as fp: + pickle.dump(data, fp) + + def new(self, script): + job = Job(self.nextid, script) + self.jobs[self.nextid] = job + self.nextid += 1 + return job + + +def find_val_in_line(line) -> str: + line = line[5:] + return line[line.lower().index(b' is')+3:].strip().decode() + +async def handle_line(topic, tokens, line, reader, writer) -> int: + global jobs + match tokens: + case ['new', 'job', int(length)]: + data = await reader.readexactly(length) + job = jobs.new(data) + writer.write(f'{job.jid}\n'.encode()) + return job.jid + + case ['list', str(marking), 'jobs']: + writer.write(''.join(map(lambda x: x.linestr(), jobs[marking])).encode()) + case ['list', 'jobs']: + writer.write(''.join(map(lambda x: x.linestr(), jobs.jobs.values())).encode()) + + case ['info', 'for', 'job', int(topic), 'is']: + writer.write(jobs[topic].infostr().encode()) + case ['info', 'is']: + writer.write(jobs[topic].infostr().encode()) + + case ['script', 'for', 'job', int(topic), 'is']: + writer.write(f'{len(jobs[topic].script)}\n'.encode() + jobs[topic].script) + case ['script', 'is']: + writer.write(f'{len(jobs[topic].script)}\n'.encode() + jobs[topic].script) + + case ['script', 'for', 'job', int(topic), 'is', int(length)]: + jobs[topic].script = await reader.readexactly(length) + case ['script', 'is', int(length)]: + jobs[topic].script = await reader.readexactly(length) + + case ['meta', 'for', 'job', int(topic), 'is']: + writer.write(jobs[topic].metastr().encode()) + case ['meta', 'is']: + writer.write(jobs[topic].metastr().encode()) + + case ['meta', key, 'for', 'job', int(topic), 'is']: + writer.write(f'{jobs[topic][key]}\n'.encode()) + case ['meta', key, 'is']: + writer.write(f'{jobs[topic][key]}\n'.encode()) + + case ['meta', key, 'for', 'job', int(topic), 'is', *_]: + jobs[topic][key] = find_val_in_line(line) + case ['meta', key, 'is', *_]: + jobs[topic][key] = find_val_in_line(line) + + case ['take', 'job', int(topic)]: + jobs[topic].take() + case ['take', 'job']: + jobs[topic].take() + + case ['mark', 'job', int(topic), 'as', str(marking)]: + jobs[topic].mark(marking) + case ['mark', 'as', str(marking)]: + jobs[topic].mark(marking) + + case ['cancel', 'job', int(topic)]: + del jobs[topic] + case ['cancel']: + del jobs[topic] + + case _: + raise ValueError(f'Unknown command {line!r}') + + return topic + + +def tokenize(line) -> List[bytes]: + tokens = [] + it = filter(lambda x: x != '', line.decode().split(' ')) + for tok in it: + try: + tokens.append(int(tok)) + except ValueError: + tokens.append(tok.lower()) + + return tokens +async def accept(reader, writer): + global jobs + l = logging.getLogger('employer.sock') + + peer = writer.get_extra_info('peername') + if peer is not None: + peer = f'{peer[0]}:{peer[1]}' + else: + peer = 'unknown' + + l.info(f'connection from {peer}') + + topic = None + + try: + while (line := await reader.readline()) != b'': + line = line.strip(b'\n') + if line == b'': + continue + + tokens = tokenize(line) + l.debug(f'{peer}: {line}, parsed as {tokens}') + try: + async with jobs: + topic = await handle_line(topic, tokens, line, reader, writer) + writer.write(b'OKAY\n') + except ValueError: + writer.write(b'?\n') + except Exception: + l.debug('', exc_info=exc_info()) + writer.write(b'?\n') + + await writer.drain() + except ConnectionResetError: + pass + + l.info(f'disconnect from {peer}') + +async def main(): + global jobs, argv + + if len(argv) > 0: + argv0 = argv[0] + argv = argv[1:] + if len(argv) not in (2,3): + usage() + + file = argv[0] + host = '' if len(argv) == 2 else argv[1] + port = argv[-1] + + jobs = Jobs(file) + + l = logging.getLogger('employer') + l.info(f'boot: using db {file}, serving {host}:{port}') + + server = await asyncio.start_server(accept, host, port) + await server.serve_forever() + + +if __name__ == '__main__': + logging.basicConfig(stream=stderr, level=logging.DEBUG, format='[%(asctime)s %(levelname)s %(name)s] %(msg)s') + asyncio.run(main())