@@ 1,48 @@
+a job is:
+ - id
+ - executable data
+ - marking: new/taken/done/failed
+ - dates: created, taken, finished
+ - metadata: dict[str,str]
+ keys cannot include : or ' ', neither can include \n
+
+server protocol:
+ case-insensitive commands separated by \n
+ on error, send back '?', else send back 'OKAY'
+
+server verbs:
+ - ''
+ ignored
+ - NEW JOB len
+ server reads in len bytes, and sends back a job id
+ - LIST marking? JOBS
+ server sends back a list of all jobs, in the format:
+ id\tcreated\tlen\tmarking
+ - INFO [FOR JOB id] IS
+ server sends back a response of the form:
+ SCRIPT IS len BYTES
+ MARKED AS marking
+ CREATED|TAKEN|FINISHED|UPDATED AT iso-8601
+ - SCRIPT [FOR JOB id] IS len?
+ get the script as len\ndata, or update the script.
+ if a job is not marked 'new', the update fails
+
+ - META [FOR JOB id] IS
+ get metadata for id as key: value lines, terminated with a blank line
+ - META key [FOR JOB id] IS value?
+ set/get id.key
+ - TAKE JOB id?
+ mark a job as taken, record timestamp
+ if n is unset, pick the next available job and send it
+ back
+ - MARK [JOB n] AS marking
+ mark a job as complete, record timestamp
+ - CANCEL [JOB n]
+ delete a job from the job queue
+
+ - all others:
+ send back ?
+
+
+
+
@@ 1,292 @@
+from sys import argv, stdin, stdout, stderr, exit, exc_info
+import asyncio
+import pickle
+import gzip
+import time
+import logging
+import base64 as b64
+from pathlib import Path
+from dataclasses import dataclass, field, asdict
+from datetime import datetime
+from typing import Dict, Optional, List
+
+
+MARKINGS = ('new', 'available', 'taken', 'done', 'failed')
+argv0 = 'employer.py'
+
+
+def usage():
+ global argv0
+ print(f'usage: python {argv0} <db path> [host] <port>', file=stderr)
+ exit(-1)
+
+
+@dataclass
+class Job:
+ jid: int
+ script: bytes
+ meta: Dict[str,str] = field(default_factory=dict)
+ marking: str = 'new'
+
+ created: datetime = field(default_factory=datetime.now)
+ taken: Optional[datetime] = None
+ completed: Optional[datetime] = None
+ updated: Optional[datetime] = None
+
+ def update(self):
+ self.updated = datetime.now()
+
+ def __getitem__(self, k):
+ return self.meta.get(k, '')
+ def __setitem__(self, k, v):
+ if type(k) is not str or type(v) is not str:
+ raise ValueError('key and value must be str')
+
+ if v == '':
+ del self.meta[k]
+ else:
+ self.meta[k] = v
+ self.update()
+ def __delitem__(self, k):
+ del self.meta[k]
+
+ def take(self):
+ if self.marking != 'available':
+ raise ValueError("can't take a job that's been taken")
+ self.marking = 'taken'
+ self.taken = datetime.now()
+
+ self.update()
+
+ def mark(self, marking):
+ global MARKINGS
+ if marking not in MARKINGS:
+ raise ValueError(f"{marking}: not a marking")
+
+ if self.marking == marking:
+ return
+
+ self.marking = marking
+ if marking in ('new', 'available'):
+ self.taken = None
+ self.completed = None
+ elif marking == 'taken':
+ self.taken = datetime.now()
+ self.completed = None
+ else:
+ self.completed = datetime.now()
+
+ self.update()
+
+ def infostr(self) -> str:
+ s = f'SCRIPT IS {len(self.script)} BYTES\nMARKED AS {self.marking}\nCREATED AT {self.created.isoformat()}\n'
+ if self.taken is not None:
+ s += f'TAKEN AT {self.taken.isoformat()}\n'
+ if self.completed is not None:
+ s += f'COMPLETED AT {self.completed.isoformat()}\n'
+ if self.updated is not None:
+ s += f'UPDATED AT {self.updated.isoformat()}\n'
+ return s
+ def linestr(self) -> str:
+ return f'{self.jid}\t{self.created.isoformat()}\t{len(self.script)}\t{self.marking}\n'
+ def metastr(self) -> str:
+ return ''.join(f'{k}: {v}\n' for (k, v) in self.meta.items())
+ def script(self) -> bytes:
+ return f'{len(self.script)}\n'.encode() + self.script
+
+class Jobs:
+ def __init__(self, filename):
+ self.filename = filename
+ self.lock = asyncio.Lock()
+
+ try:
+ fp = open(filename+'.lock', 'x')
+ fp.write('')
+ except FileExistsError:
+ raise Exception(f"{filename} is already locked. kill the other instance or delete {filename}.lock.")
+
+ if Path(filename).exists():
+ data = pickle.load(gzip.open(filename, 'rb'))
+ self.nextid = data['nextid']
+ self.jobs = data['jobs']
+ else:
+ self.nextid = 0
+ self.jobs = {}
+ self.save()
+ def __del__(self):
+ Path(self.filename+'.lock').unlink()
+
+ def __getitem__(self, k):
+ global MARKINGS
+ if type(k) is int:
+ return self.jobs[k]
+ elif k in MARKINGS:
+ return filter(lambda j: j.marking == k, self.jobs.values())
+ else:
+ raise KeyError(k)
+ def __delitem__(self, k):
+ if type(k) is Job:
+ del self.jobs[k.jid]
+ else:
+ del self.jobs[k]
+
+ async def __aenter__(self):
+ await self.lock.acquire()
+ async def __aexit__(self, _exc_ty, _exc_val, _tb):
+ self.save()
+ self.lock.release()
+
+ def save(self):
+ data = {'nextid': self.nextid, 'jobs': self.jobs}
+ with gzip.open(self.filename, 'wb') as fp:
+ pickle.dump(data, fp)
+
+ def new(self, script):
+ job = Job(self.nextid, script)
+ self.jobs[self.nextid] = job
+ self.nextid += 1
+ return job
+
+
+def find_val_in_line(line) -> str:
+ line = line[5:]
+ return line[line.lower().index(b' is')+3:].strip().decode()
+
+async def handle_line(topic, tokens, line, reader, writer) -> int:
+ global jobs
+ match tokens:
+ case ['new', 'job', int(length)]:
+ data = await reader.readexactly(length)
+ job = jobs.new(data)
+ writer.write(f'{job.jid}\n'.encode())
+ return job.jid
+
+ case ['list', str(marking), 'jobs']:
+ writer.write(''.join(map(lambda x: x.linestr(), jobs[marking])).encode())
+ case ['list', 'jobs']:
+ writer.write(''.join(map(lambda x: x.linestr(), jobs.jobs.values())).encode())
+
+ case ['info', 'for', 'job', int(topic), 'is']:
+ writer.write(jobs[topic].infostr().encode())
+ case ['info', 'is']:
+ writer.write(jobs[topic].infostr().encode())
+
+ case ['script', 'for', 'job', int(topic), 'is']:
+ writer.write(f'{len(jobs[topic].script)}\n'.encode() + jobs[topic].script)
+ case ['script', 'is']:
+ writer.write(f'{len(jobs[topic].script)}\n'.encode() + jobs[topic].script)
+
+ case ['script', 'for', 'job', int(topic), 'is', int(length)]:
+ jobs[topic].script = await reader.readexactly(length)
+ case ['script', 'is', int(length)]:
+ jobs[topic].script = await reader.readexactly(length)
+
+ case ['meta', 'for', 'job', int(topic), 'is']:
+ writer.write(jobs[topic].metastr().encode())
+ case ['meta', 'is']:
+ writer.write(jobs[topic].metastr().encode())
+
+ case ['meta', key, 'for', 'job', int(topic), 'is']:
+ writer.write(f'{jobs[topic][key]}\n'.encode())
+ case ['meta', key, 'is']:
+ writer.write(f'{jobs[topic][key]}\n'.encode())
+
+ case ['meta', key, 'for', 'job', int(topic), 'is', *_]:
+ jobs[topic][key] = find_val_in_line(line)
+ case ['meta', key, 'is', *_]:
+ jobs[topic][key] = find_val_in_line(line)
+
+ case ['take', 'job', int(topic)]:
+ jobs[topic].take()
+ case ['take', 'job']:
+ jobs[topic].take()
+
+ case ['mark', 'job', int(topic), 'as', str(marking)]:
+ jobs[topic].mark(marking)
+ case ['mark', 'as', str(marking)]:
+ jobs[topic].mark(marking)
+
+ case ['cancel', 'job', int(topic)]:
+ del jobs[topic]
+ case ['cancel']:
+ del jobs[topic]
+
+ case _:
+ raise ValueError(f'Unknown command {line!r}')
+
+ return topic
+
+
+def tokenize(line) -> List[bytes]:
+ tokens = []
+ it = filter(lambda x: x != '', line.decode().split(' '))
+ for tok in it:
+ try:
+ tokens.append(int(tok))
+ except ValueError:
+ tokens.append(tok.lower())
+
+ return tokens
+async def accept(reader, writer):
+ global jobs
+ l = logging.getLogger('employer.sock')
+
+ peer = writer.get_extra_info('peername')
+ if peer is not None:
+ peer = f'{peer[0]}:{peer[1]}'
+ else:
+ peer = 'unknown'
+
+ l.info(f'connection from {peer}')
+
+ topic = None
+
+ try:
+ while (line := await reader.readline()) != b'':
+ line = line.strip(b'\n')
+ if line == b'':
+ continue
+
+ tokens = tokenize(line)
+ l.debug(f'{peer}: {line}, parsed as {tokens}')
+ try:
+ async with jobs:
+ topic = await handle_line(topic, tokens, line, reader, writer)
+ writer.write(b'OKAY\n')
+ except ValueError:
+ writer.write(b'?\n')
+ except Exception:
+ l.debug('', exc_info=exc_info())
+ writer.write(b'?\n')
+
+ await writer.drain()
+ except ConnectionResetError:
+ pass
+
+ l.info(f'disconnect from {peer}')
+
+async def main():
+ global jobs, argv
+
+ if len(argv) > 0:
+ argv0 = argv[0]
+ argv = argv[1:]
+ if len(argv) not in (2,3):
+ usage()
+
+ file = argv[0]
+ host = '' if len(argv) == 2 else argv[1]
+ port = argv[-1]
+
+ jobs = Jobs(file)
+
+ l = logging.getLogger('employer')
+ l.info(f'boot: using db {file}, serving {host}:{port}')
+
+ server = await asyncio.start_server(accept, host, port)
+ await server.serve_forever()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=stderr, level=logging.DEBUG, format='[%(asctime)s %(levelname)s %(name)s] %(msg)s')
+ asyncio.run(main())