~aleteoryx/9c

b29685897785d078f084c76fbab841361b0166cd — glenda 37 years ago 2a08c37
queuesrv ctl file
2 files changed, 237 insertions(+), 1 deletions(-)

M mkfile
A queuesrv.c
M mkfile => mkfile +10 -1
@@ 3,7 3,8 @@
bins=\
	pong\
	maze\
	spiral
	spiral\
	queuesrv

all:VQ: $bins



@@ 15,3 16,11 @@ all:VQ: $bins

clean:V:
	for(bin in $bins) rm -f $bin $bin.$O

qclean:V:
	rm -f /srv/queue
	unmount /n || echo -n
qdev:V:
	mk queuesrv
	queuesrv || echo -n
	mount /srv/queue /n

A queuesrv.c => queuesrv.c +227 -0
@@ 0,0 1,227 @@
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>

// typeof(file->aux)
typedef struct {
	int deleted;
	usize length;
	Fid* members[];
} Queue;
// typeof(fid->aux)
typedef struct {
	usize length;
	Req* members[];
} Inflight;
typedef struct {
	char* name;
	Queue* queue;
} QueueDesc;

Srv s;

int stdio;
char* service;

// FIXME: dynamically allocate queues
int maxname;
int maxqueues;
QueueDesc* queues;
char* ctlbuf;

void
usage(void)
{
	fprint(2,"usage: queuefs [ -Ds ] [ -n maxname ] [ -q maxqueues ] [ service ]\n");
	exits("usage");
}

void
setuptree(void)
{
	s.tree = alloctree("none", "none", 0666, nil);
	createfile(s.tree->root, "ctl", "none", 0666, nil);
}

void
main(int argc, char* argv[])
{
	ARGBEGIN{
	case 's':
		stdio = 1;
		break;
	case 'D':
		chatty9p++;
		break;
	case 'n':
		maxname = atoi(EARGF(usage()));
		break;
	case 'q':
		maxqueues = atoi(EARGF(usage()));
		break;
	default:
		usage();
	} ARGEND

	switch(argc){
	case 0:
		service = "queue";
		break;
	case 1:
		service = argv[0];
		break;
	default:
		usage();
	}

	if(maxname <= 0)
		maxname = 64;
	if(maxqueues <= 0)
		maxqueues = 256;

	fmtinstall('D', dirfmt);
	fmtinstall('M', dirmodefmt);
	fmtinstall('F', fcallfmt);

	queues = calloc(maxqueues, sizeof(QueueDesc));

	setuptree();

	if(stdio){
		s.infd = 0;
		s.outfd = 1;
		srv(&s);
	}
	else{
		print("posting service %s\n", service);
		if(postsrv(&s, service) < 0)
			sysfatal("couldn't postsrv: %r");
	}
}

char*
newqueue(char* name)
{
	int i;

	for(i = 0; i < maxqueues && queues[i].queue != nil; i++);
	if(i == maxqueues)
		return "new: too many queues";

	queues[i].queue = calloc(1, sizeof(Queue));
	queues[i].name = name;
	createfile(s.tree->root, name, "none", 0444, queues[i].queue);

	ctlbuf = nil;

	return nil;
}

// FIXME: global buffer could race on small read size. store data in fid->aux.
void
ctlregen(void)
{
	char* p;
	usize blen;
	int nmems, i;

	blen = (maxname + 1 + 20 + 1) * maxqueues + 10;
	p = ctlbuf = calloc(blen, 1);
	
	for(i = 0; i < maxqueues; i++){
		if(queues[i].queue == nil)
			continue;

		for(nmems = 0;
		    nmems < queues[i].queue->length && queues[i].queue->members[nmems] != nil;
		    nmems++);

		p += sprint(p, "%s %d\n", queues[i].name, nmems);
	}
}
void
ctlread(Req* r)
{
	if(ctlbuf == nil)
		ctlregen();

	readstr(r, ctlbuf);
	respond(r, nil);
}
// FIXME: errors if command split between multiple writes
void
ctlwrite(Req* r)
{
	char *s, *p;
	int i;

	if(r->ifcall.count > 5 && strncmp(r->ifcall.data, "new ", 4) == 0){
		p = s = malloc(r->ifcall.count - 3);
		strncpy(s, r->ifcall.data + 4, r->ifcall.count - 4);
		do{
			switch(*p){
			case ' ':
			case '\n':
			case '\t':
			case '\0':
				*p = '\0';
				break;
			}
		}
		while(*p++ != '\0');

		if(strlen(s) == 0){
			respond(r, "new: missing argument");
			return;
		}

		for(i = 0; i < maxqueues; i++){
			if(queues[i].queue == nil)
				continue;
			if(strcmp(s, queues[i].name) == 0)
				break;
		}

		// FIXME: might leak s here
		if(i == maxqueues)
			respond(r, newqueue(s));
		else
			respond(r, nil);
	}
	else
		respond(r, "bad command");
}

void
queueread(Req* r)
{
	respond(r, "not implemented");
}
void
queuewrite(Req* r)
{
	respond(r, "not implemented");
}

void
srvread(Req* r)
{
	if(r->fid->file->aux == nil)
		ctlread(r);
	else
		queueread(r);
}
void
srvwrite(Req* r)
{
	if(r->fid->file->aux == nil)
		ctlwrite(r);
	else
		queuewrite(r);
}
Srv s = {
	.read = srvread,
	.write = srvwrite,
};