@@ 37,19 37,46 @@ def usage():
exit(-1)
+co_lock = asyncio.Lock()
+
+class JobError(Exception):
+ pass
+async def send_cmd(reader, writer, cmd):
+
+ global co_lock
+
+ async def impl():
+ lines = []
+
+ async with co_lock:
+ writer.write(cmd.encode()+b'\n')
+ print(cmd)
+ await writer.drain()
+ while True:
+ line = (await reader.readline()).rstrip()
+ print(line)
+ if line == b'?':
+ raise JobError(cmd)
+ elif line == b'OKAY':
+ return lines
+ else:
+ lines.append(line.decode())
+
+ return await asyncio.shield(asyncio.create_task(impl()))
+
+
async def take_job(reader, writer):
l = logging.getLogger('employee.job')
while True:
l.debug('polling for job')
- writer.write(b'TAKE JOB\n')
- await writer.drain()
-
- line = (await reader.readline()).strip()
- if line != b'?':
- jid = int(line.decode())
- await reader.readline()
+
+ try:
+ lines = await send_cmd(reader, writer, 'TAKE JOB')
+ jid = lines[0]
break
+ except JobError:
+ pass
await asyncio.sleep(5)
@@ 70,7 97,7 @@ async def take_job(reader, writer):
async def log_stream(stream, name, log):
async for line in stream:
try:
- line = line.decode().replace('\n', '\\n')
+ line = line.decode().replace('\n', '')
except:
line = repr(line)
@@ 85,10 112,9 @@ async def wait_proc(proc, l):
async def poll_cancel(reader, writer):
while True:
- writer.write(b'EXISTS\n')
- await writer.drain()
-
- if await reader.readline() == b'?\n':
+ try:
+ await send_cmd(reader, writer, 'EXISTS')
+ except JobError:
return
await asyncio.sleep(5)
@@ 117,22 143,24 @@ async def run_script(reader, writer, script):
if which.pop() is waiter:
killer.cancel()
+ while not killer.cancelled():
+ await asyncio.sleep(1)
state = 'done' if proc.returncode == 0 else 'failed'
await complete_job(reader, writer, state, l)
l.info(f'job returned {proc.returncode}, marked {state}')
else:
waiter.cancel()
+ while not waiter.cancelled():
+ await asyncio.sleep(1)
proc.terminate()
l.info(f'job cancelled')
file.unlink()
async def complete_job(reader, writer, state, l):
- writer.write(f'MARK AS {state}\n'.encode())
- await writer.drain()
-
- line = await reader.readline()
- if line != b'OKAY\n':
+ try:
+ await send_cmd(reader, writer, f'MARK AS {state}')
+ except:
l.warn(f'got non-OKAY response completing job: {line}')
async def main():
@@ 10,6 10,8 @@ def usage():
print(f'usage: {argv0} <host> <port>', file=stderr)
exit(-1)
+class JobError(Exception):
+ pass
def send_cmd(reader, writer, cmd):
lines = []
@@ 18,7 20,7 @@ def send_cmd(reader, writer, cmd):
while True:
line = reader.readline()
if line == b'?\n':
- raise Exception('bad cmd')
+ raise JobError(cmd)
elif line == b'OKAY\n':
return lines
else: