~aleteoryx/employ

ref: a9f243dd6e62eb9597824a6fae5f75d0ad5da5a2 employ/employee.py -rwxr-xr-x 4.4 KiB
a9f243ddAleteoryx do networking properly jesus 8 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/bin/env python3

"""
this software distributed under a "fuck around, find out" license. the
author gives nobody in the world any explicit legal right to use this
code for any purpose whatsoever. the author reserves the right to send
holy legal fury down from the heavens should you invoke her ire.

the author does not have the money to do so, in any juris diction
worldwide. the author is also prety chill, she thinks, so, you know, do
what you want or whatever. the text of this license is intentionally
god damn stupid specifically to scare away corporate lawyers and
attract internet queers

if you do choose to redistribute this software (to "fuck around", as it
were), you should change this disclaimer some and make it funnier

the mitochondrea is the powerhouse of the cell
"""

import os
from sys import argv, exit, stderr
import logging
import time
import datetime
from subprocess import PIPE
import asyncio
from tempfile import mktemp
from pathlib import Path

argv0 = 'employee.py'


def usage():
	global argv0
	print(f'usage: python3 {argv0} <host> <port>', )
	exit(-1)


co_lock = asyncio.Lock()

class JobError(Exception):
	pass
async def send_cmd(reader, writer, cmd):

	global co_lock

	async def impl():
		lines = []

		async with co_lock:
			writer.write(cmd.encode()+b'\n')
			print(cmd)
			await writer.drain()
			while True:
				line = (await reader.readline()).rstrip()
				print(line)
				if line == b'?':
					raise JobError(cmd)
				elif line == b'OKAY':
					return lines
				else:
					lines.append(line.decode())
	
	return await asyncio.shield(asyncio.create_task(impl()))


async def take_job(reader, writer):
	l = logging.getLogger('employee.job')

	while True:
		l.debug('polling for job')

		try:
			lines = await send_cmd(reader, writer, 'TAKE JOB')
			jid = lines[0]
			break
		except JobError:
			pass

		await asyncio.sleep(5)

	l.info(f'took job {jid}')

	writer.write(b'SCRIPT IS\n')
	await writer.drain()

	line = (await reader.readline()).strip()
	length = int(line.decode())
	script = await reader.readexactly(length)
	
	await reader.readline()

	return script


async def log_stream(stream, name, log):
	async for line in stream:
		try:
			line = line.decode().replace('\n', '')
		except:
			line = repr(line)

		log(f'{name}: {line}')
async def wait_proc(proc, l):
	async with asyncio.TaskGroup() as tg:
		tg.create_task(log_stream(proc.stdout, 'STDOUT', l.info))
		tg.create_task(log_stream(proc.stderr, 'STDERR', l.warn))
		await proc.wait()
	
	return proc.returncode

async def poll_cancel(reader, writer):
	while True:
		try:
			await send_cmd(reader, writer, 'EXISTS')
		except JobError:
			return
		
		await asyncio.sleep(5)

async def exec_script(script, l):
	file = Path(mktemp(prefix='employ'))
	l.debug(f'saving script as {str(file)}')
	with open(file, 'wb') as fp:
		fp.write(script)

	file.chmod(file.stat().st_mode | 0o700)

	return file, await asyncio.create_subprocess_exec(file, file, stdout=PIPE, stderr=PIPE)

async def run_script(reader, writer, script):
	l = logging.getLogger('employee.exec')
	
	l.info('executing script...')
	file, proc = await exec_script(script, l)
	
	l.info('attaching stdio...')
	waiter = asyncio.create_task(wait_proc(proc, l))
	killer = asyncio.create_task(poll_cancel(reader, writer))
	
	which, _ = await asyncio.wait([waiter, killer], return_when=asyncio.FIRST_COMPLETED)

	if which.pop() is waiter:
		killer.cancel()
		while not killer.cancelled():
			await asyncio.sleep(1)
		state = 'done' if proc.returncode == 0 else 'failed'
		await complete_job(reader, writer, state, l)
		l.info(f'job returned {proc.returncode}, marked {state}')
	else:
		waiter.cancel()
		while not waiter.cancelled():
			await asyncio.sleep(1)
		proc.terminate()
		l.info(f'job cancelled')
	
	file.unlink()

async def complete_job(reader, writer, state, l):
	try:
		await send_cmd(reader, writer, f'MARK AS {state}')
	except:
		l.warn(f'got non-OKAY response completing job: {line}')

async def main():
	global argv, argv0
	
	l = logging.getLogger('employee')
	
	if len(argv) > 0:
		argv0 = argv[0]
		argv = argv[1:]
	if len(argv) != 2:
		usage()

	host, port = argv

	l.info(f'connecting to {host}:{port}...')
	reader, writer = await asyncio.open_connection(host, port)
	l.info(f'connected!')
	
	while True:
		script = await take_job(reader, writer)
		await run_script(reader, writer, script)
		

if __name__ == '__main__':
	logging.basicConfig(stream=stderr, level=logging.INFO, format='[%(asctime)s %(levelname)s %(name)s] %(msg)s')
	asyncio.run(main())