~aleteoryx/9c

41a67f231be44cf679a8ad6df2f6133cd8340c61 — glenda 10 months ago b296858
queuesrv works!!
2 files changed, 146 insertions(+), 22 deletions(-)

M mkfile
M queuesrv.c
M mkfile => mkfile +1 -1
@@ 22,5 22,5 @@ qclean:V:
	unmount /n || echo -n
qdev:V:
	mk queuesrv
	queuesrv || echo -n
	queuesrv -D || echo -n
	mount /srv/queue /n

M queuesrv.c => queuesrv.c +145 -21
@@ 1,12 1,18 @@
#include <u.h>
#include <libc.h>
#include <ctype.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>

enum {
	QNew = 0,
	QDeleted
};

// typeof(file->aux)
typedef struct {
	int deleted;
	int state;
	usize length;
	Fid* members[];
} Queue;


@@ 17,7 23,7 @@ typedef struct {
} Inflight;
typedef struct {
	char* name;
	Queue* queue;
	File* file;
} QueueDesc;

Srv s;


@@ 83,6 89,7 @@ main(int argc, char* argv[])

	fmtinstall('D', dirfmt);
	fmtinstall('M', dirmodefmt);
	#pragma varargck type "F" Fcall*
	fmtinstall('F', fcallfmt);

	queues = calloc(maxqueues, sizeof(QueueDesc));


@@ 105,19 112,77 @@ char*
newqueue(char* name)
{
	int i;
	Queue* queue;

	for(i = 0; i < maxqueues && queues[i].queue != nil; i++);
	for(i = 0; i < maxqueues && queues[i].file != nil; i++);
	if(i == maxqueues)
		return "new: too many queues";

	queues[i].queue = calloc(1, sizeof(Queue));
	queue = calloc(1, sizeof(Queue) + sizeof(Fid*));
	queue->length = 1;
	
	queues[i].file = createfile(s.tree->root, name, "none", 0444, queue);
	queues[i].name = name;
	createfile(s.tree->root, name, "none", 0444, queues[i].queue);

	ctlbuf = nil;
	if(ctlbuf != nil){
		free(ctlbuf);
		ctlbuf = nil;
	}

	return nil;
}
uint
newmember(Fid* fid)
{
	File* file;
	Queue* queue;
	Inflight* inflight;
	uint i;

	file = fid->file;
	queue = file->aux;

	// append member to queue
	for(i = 0; i < queue->length && queue->members[i] != nil; i++);
	// reallocate queue if necessary
	if(i == queue->length){
		queue->length *= 2;
		queue = realloc(queue, sizeof(Queue) + sizeof(Fid*) * queue->length);
		file->aux = queue;
	}
	queue->members[i] = fid;

	// setup read queue
	inflight = calloc(1, sizeof(Inflight) + sizeof(Req*));
	inflight->length = 1;
	fid->aux = inflight;

	// invalidate ctlbuf
	if(ctlbuf != nil)
		free(ctlbuf);
	ctlbuf = nil;

	// return spot in queue
	return i;
}

void
registerreq(Req* r)
{
	Inflight* inflight;
	uint i;

	inflight = r->fid->aux;

	for(i = 0; i < inflight->length && inflight->members[i] != nil; i++);
	if(i == inflight->length){
		inflight->length *= 2;
		inflight = realloc(inflight, sizeof(Inflight) + sizeof(Req*) * inflight->length);
		r->fid->aux = inflight;
	}

	inflight->members[i] = r;
}

// FIXME: global buffer could race on small read size. store data in fid->aux.
void


@@ 125,20 190,24 @@ ctlregen(void)
{
	char* p;
	usize blen;
	int nmems, i;
	uint nmems, i;
	QueueDesc* qd;
	Queue* queue;

	blen = (maxname + 1 + 20 + 1) * maxqueues + 10;
	blen = (maxname + 20) * maxqueues + 10;
	p = ctlbuf = calloc(blen, 1);
	
	for(i = 0; i < maxqueues; i++){
		if(queues[i].queue == nil)
		qd = queues + i;
		if(qd->file == nil)
			continue;
		queue = qd->file->aux;

		for(nmems = 0;
		    nmems < queues[i].queue->length && queues[i].queue->members[nmems] != nil;
		    nmems < queue->length && queue->members[nmems] != nil;
		    nmems++);

		p += sprint(p, "%s %d\n", queues[i].name, nmems);
		p += sprint(p, "%s %d\n", qd->name, nmems);
	}
}
void


@@ 161,14 230,8 @@ ctlwrite(Req* r)
		p = s = malloc(r->ifcall.count - 3);
		strncpy(s, r->ifcall.data + 4, r->ifcall.count - 4);
		do{
			switch(*p){
			case ' ':
			case '\n':
			case '\t':
			case '\0':
			if(isspace(*p))
				*p = '\0';
				break;
			}
		}
		while(*p++ != '\0');



@@ 178,7 241,7 @@ ctlwrite(Req* r)
		}

		for(i = 0; i < maxqueues; i++){
			if(queues[i].queue == nil)
			if(queues[i].file == nil)
				continue;
			if(strcmp(s, queues[i].name) == 0)
				break;


@@ 197,12 260,22 @@ ctlwrite(Req* r)
void
queueread(Req* r)
{
	respond(r, "not implemented");
	uint spot;
	char buf[32];

	if(r->fid->aux == nil){
		spot = newmember(r->fid);
		sprint(buf, "%ud\n", spot);
		readstr(r, buf);
		respond(r, nil);
	}
	else
		registerreq(r);
}
void
queuewrite(Req* r)
{
	respond(r, "not implemented");
	respond(r, "queues are not writable");
}

void


@@ 213,6 286,7 @@ srvread(Req* r)
	else
		queueread(r);
}

void
srvwrite(Req* r)
{


@@ 221,7 295,57 @@ srvwrite(Req* r)
	else
		queuewrite(r);
}

void
srvdestroyfid(Fid* fid)
{
	print("dfid %p\n", fid);

	Queue* queue;
	Inflight* inflight;
	Req* r;
	uint i, j;
	char buf[32];

	if(fid->aux == nil || fid->file->aux == nil)
		return;

	queue = fid->file->aux;
	for(i = 0; i < queue->length && queue->members[i] != fid && queue->members[i] != nil; i++);
	if(i == queue->length || queue->members[i] == nil)
		sysfatal("bugcheck: fid->aux != nil but fid not in queue->members");

	for(; (i + 1) < queue->length && queue->members[i + 1] != nil; i++){
		queue->members[i] = queue->members[i + 1];
		inflight = queue->members[i]->aux;
		
		sprint(buf, "%ud\n", i);
		for(j = 0; j < inflight->length; j++){
			r = inflight->members[j];
			if(r == nil)
				continue;

			r->ifcall.offset = 0;
			readstr(r, buf);
			respond(r, nil);
		}
	}

	queue->members[i] = nil;
}

void
srvflush(Req* r)
{
	if(r->fid != nil)
		srvdestroyfid(r->fid);
	respond(r->oldreq, nil);
	respond(r, nil);
}

Srv s = {
	.read = srvread,
	.write = srvwrite,
	.flush = srvflush,
	.destroyfid = srvdestroyfid,
};