~aleteoryx/employ

ref: 03f38b65ed032d4e174bc375135af1b9ee282887 employ/employee.py -rwxr-xr-x 4.0 KiB
03f38b65Aleteoryx no debug 9 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/bin/env python3

"""
this software distributed under a "fuck around, find out" license. the
author gives nobody in the world any explicit legal right to use this
code for any purpose whatsoever. the author reserves the right to send
holy legal fury down from the heavens should you invoke her ire.

the author does not have the money to do so, in any juris diction
worldwide. the author is also prety chill, she thinks, so, you know, do
what you want or whatever. the text of this license is intentionally
god damn stupid specifically to scare away corporate lawyers and
attract internet queers

if you do choose to redistribute this software (to "fuck around", as it
were), you should change this disclaimer some and make it funnier

the mitochondrea is the powerhouse of the cell
"""

import os
from sys import argv, exit, stderr
import logging
import time
import datetime
from subprocess import PIPE
import asyncio
from tempfile import mktemp
from pathlib import Path

argv0 = 'employee.py'


def usage():
	global argv0
	print(f'usage: python3 {argv0} <host> <port>', )
	exit(-1)


async def take_job(reader, writer):
	l = logging.getLogger('employee.job')

	while True:
		l.debug('polling for job')
		writer.write(b'TAKE JOB\n')
		await writer.drain()
		
		line = (await reader.readline()).strip()
		if line != b'?':
			jid = int(line.decode())
			await reader.readline()
			break

		await asyncio.sleep(5)

	l.info(f'took job {jid}')

	writer.write(b'SCRIPT IS\n')
	await writer.drain()

	line = (await reader.readline()).strip()
	length = int(line.decode())
	script = await reader.readexactly(length)
	
	await reader.readline()

	return script


async def log_stream(stream, name, log):
	async for line in stream:
		try:
			line = line.decode().replace('\n', '\\n')
		except:
			line = repr(line)

		log(f'{name}: {line}')
async def wait_proc(proc, l):
	async with asyncio.TaskGroup() as tg:
		tg.create_task(log_stream(proc.stdout, 'STDOUT', l.info))
		tg.create_task(log_stream(proc.stderr, 'STDERR', l.warn))
		await proc.wait()
	
	return proc.returncode

async def poll_cancel(reader, writer):
	while True:
		writer.write(b'EXISTS\n')
		await writer.drain()
		
		if await reader.readline() == b'?\n':
			return
		
		await asyncio.sleep(5)

async def exec_script(script, l):
	file = Path(mktemp(prefix='employ'))
	l.debug(f'saving script as {str(file)}')
	with open(file, 'wb') as fp:
		fp.write(script)

	file.chmod(file.stat().st_mode | 0o700)

	return file, await asyncio.create_subprocess_exec(file, file, stdout=PIPE, stderr=PIPE)

async def run_script(reader, writer, script):
	l = logging.getLogger('employee.exec')
	
	l.info('executing script...')
	file, proc = await exec_script(script, l)
	
	l.info('attaching stdio...')
	waiter = asyncio.create_task(wait_proc(proc, l))
	killer = asyncio.create_task(poll_cancel(reader, writer))
	
	which, _ = await asyncio.wait([waiter, killer], return_when=asyncio.FIRST_COMPLETED)

	if which.pop() is waiter:
		killer.cancel()
		state = 'done' if proc.returncode == 0 else 'failed'
		await complete_job(reader, writer, state, l)
		l.info(f'job returned {proc.returncode}, marked {state}')
	else:
		waiter.cancel()
		proc.terminate()
		l.info(f'job cancelled')
	
	file.unlink()

async def complete_job(reader, writer, state, l):
	writer.write(f'MARK AS {state}\n'.encode())
	await writer.drain()
	
	line = await reader.readline()
	if line != b'OKAY\n':
		l.warn(f'got non-OKAY response completing job: {line}')

async def main():
	global argv, argv0
	
	l = logging.getLogger('employee')
	
	if len(argv) > 0:
		argv0 = argv[0]
		argv = argv[1:]
	if len(argv) != 2:
		usage()

	host, port = argv

	l.info(f'connecting to {host}:{port}...')
	reader, writer = await asyncio.open_connection(host, port)
	l.info(f'connected!')
	
	while True:
		script = await take_job(reader, writer)
		await run_script(reader, writer, script)
		

if __name__ == '__main__':
	logging.basicConfig(stream=stderr, level=logging.INFO, format='[%(asctime)s %(levelname)s %(name)s] %(msg)s')
	asyncio.run(main())