#include <u.h>
#include <libc.h>
#include <ctype.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>
enum {
QNew = 0,
QDeleted
};
// typeof(file->aux)
typedef struct {
int state;
usize length;
Fid *members[];
} Queue;
// typeof(fid->aux)
typedef struct {
usize length;
Req *members[];
} Inflight;
typedef struct {
char *name;
File *file;
} QueueDesc;
Srv s;
int stdio;
char *service;
int maxname;
int queueslen;
QueueDesc *queues;
char *ctlbuf;
void
usage(void)
{
fprint(2,"usage: queuefs [ -Ds ] [ -n maxname ] [ service ]\n");
exits("usage");
}
void
setuptree(void)
{
s.tree = alloctree("none", "none", 0666, nil);
createfile(s.tree->root, "ctl", "none", 0666, nil);
}
void
main(int argc, char *argv[])
{
ARGBEGIN{
case 's':
stdio = 1;
break;
case 'D':
chatty9p++;
break;
case 'n':
maxname = atoi(EARGF(usage()));
break;
default:
usage();
}ARGEND
switch(argc){
case 0:
service = "queue";
break;
case 1:
service = argv[0];
break;
default:
usage();
}
if(maxname <= 0)
maxname = 64;
fmtinstall('D', dirfmt);
fmtinstall('M', dirmodefmt);
#pragma varargck type "F" Fcall *
fmtinstall('F', fcallfmt);
queueslen = 0;
setuptree();
if(stdio){
s.infd = 0;
s.outfd = 1;
srv(&s);
}else{
print("posting service %s\n", service);
if(postsrv(&s, service) < 0)
sysfatal("couldn't postsrv: %r");
}
}
void
ctlclear(void)
{
if(ctlbuf != nil){
free(ctlbuf);
ctlbuf = nil;
}
}
char *
newqueue(char *name)
{
int i;
Queue *queue;
for(i = 0; i < queueslen && queues[i].file != nil; i++);
print("pre-realloc %d\n", queueslen);
if(i == queueslen)
queues = realloc(queues, (++queueslen) * sizeof(QueueDesc));
print("post-realloc %d\n", queueslen);
queue = calloc(1, sizeof(Queue) + sizeof(Fid *));
queue->length = 1;
queues[i].file = createfile(s.tree->root, name, "none", 0444, queue);
queues[i].name = name;
ctlclear();
return nil;
}
uint
newmember(Fid *fid)
{
File *file;
Queue *queue;
Inflight *inflight;
uint i;
file = fid->file;
queue = file->aux;
// append member to queue
for(i = 0; i < queue->length && queue->members[i] != nil; i++);
// reallocate queue if necessary
if(i == queue->length){
queue->length *= 2;
queue = realloc(queue, sizeof(Queue) + sizeof(Fid *) * queue->length);
file->aux = queue;
}
queue->members[i] = fid;
// setup read queue
inflight = calloc(1, sizeof(Inflight) + sizeof(Req *));
inflight->length = 1;
fid->aux = inflight;
ctlclear();
// return spot in queue
return i;
}
char *
delqueue(QueueDesc* qd)
{
uint i;
File *file;
Queue *queue;
file = qd->file;
queue = file->aux;
for(i = 0; i < queue->length && queue->members[i] == nil; i++);
if(i == queue->length){
removefile(file);
free(queue);
}else
queue->state = QDeleted;
free(qd->name);
qd->file = nil;
qd->name = nil;
ctlclear();
return nil;
}
void
registerreq(Req *r)
{
Inflight *inflight;
uint i;
inflight = r->fid->aux;
for(i = 0; i < inflight->length && inflight->members[i] != nil; i++);
if(i == inflight->length){
inflight->length *= 2;
inflight = realloc(inflight, sizeof(Inflight) + sizeof(Req *) * inflight->length);
r->fid->aux = inflight;
}
inflight->members[i] = r;
}
// FIXME: global buffer could race on small read size. store data in fid->aux.
void
ctlregen(void)
{
char *p;
usize blen;
uint nmems, i;
QueueDesc *qd;
Queue *queue;
blen = 10;
for(i = 0; i < queueslen; i++){
qd = queues + i;
if(qd->file != nil && qd->name != nil)
blen += strlen(qd->name) + 20;
}
p = ctlbuf = malloc(blen);
p[0] = 0;
for(i = 0; i < queueslen; i++){
qd = queues + i;
if(qd->file == nil)
continue;
queue = qd->file->aux;
for(nmems = 0;
nmems < queue->length && queue->members[nmems] != nil;
nmems++);
p += sprint(p, "%s\t%d\n", qd->name, nmems);
}
}
void
ctlread(Req *r)
{
if(ctlbuf == nil)
ctlregen();
readstr(r, ctlbuf);
respond(r, nil);
}
// FIXME: errors if command split between multiple writes
void
ctlwrite(Req *r)
{
char *s, *p, *resp;
int i;
//FIXME: break parsing logic out into a function
if(r->ifcall.count > 5 && strncmp(r->ifcall.data, "new ", 4) == 0){
p = s = malloc(r->ifcall.count - 3);
strncpy(s, r->ifcall.data + 4, r->ifcall.count - 4);
do{
if(isspace(*p))
*p = '\0';
}
while(*p++ != '\0');
if(strlen(s) == 0){
respond(r, "new: missing argument");
return;
}
for(i = 0; i < queueslen; i++){
if(queues[i].file == nil)
continue;
if(strcmp(s, queues[i].name) == 0)
break;
}
if(i == queueslen){
resp = newqueue(s);
respond(r, resp);
if(resp != nil)
free(s);
}else
respond(r, nil);
}else if(r->ifcall.count > 8 && strncmp(r->ifcall.data, "delete ", 7) == 0){
p = s = malloc(r->ifcall.count - 6);
strncpy(s, r->ifcall.data + 7, r->ifcall.count - 7);
do{
if(isspace(*p))
*p = '\0';
}
while(*p++ != '\0');
if(strlen(s) == 0){
respond(r, "delete: missing argument");
return;
}
print("trying to delete %s\n", s);
for(i = 0; i < queueslen; i++){
if(queues[i].file == nil)
continue;
if(strcmp(s, queues[i].name) == 0)
break;
}
if(i != queueslen){
respond(r, delqueue(&(queues[i])));
free(s);
}else
respond(r, "queue does not exist");
}else
respond(r, "bad command");
}
void
queueread(Req *r)
{
uint spot;
char buf[32];
if(r->fid->aux == nil){
spot = newmember(r->fid);
sprint(buf, "%ud\n", spot);
readstr(r, buf);
respond(r, nil);
}
else
registerreq(r);
}
void
queuewrite(Req *r)
{
respond(r, "queues are not writable");
}
void
srvread(Req *r)
{
if(r->fid->file->aux == nil)
ctlread(r);
else
queueread(r);
}
void
srvwrite(Req *r)
{
if(r->fid->file->aux == nil)
ctlwrite(r);
else
queuewrite(r);
}
void
srvdestroyfid(Fid *fid)
{
Queue *queue;
Inflight *inflight;
Req *r;
uint i, j;
char buf[32];
if(fid->aux == nil || fid->file->aux == nil)
return;
queue = fid->file->aux;
for(i = 0; i < queue->length && queue->members[i] != fid && queue->members[i] != nil; i++);
if(i == queue->length || queue->members[i] == nil)
sysfatal("bugcheck: fid->aux != nil but fid not in queue->members");
free(fid->aux);
for(; (i + 1) < queue->length && queue->members[i + 1] != nil; i++){
queue->members[i] = queue->members[i + 1];
inflight = queue->members[i]->aux;
sprint(buf, "%ud\n", i);
for(j = 0; j < inflight->length; j++){
r = inflight->members[j];
if(r == nil)
continue;
r->ifcall.offset = 0;
readstr(r, buf);
respond(r, nil);
inflight->members[j] = nil;
}
}
queue->members[i] = nil;
if(i == 0 && queue->state == QDeleted){
removefile(fid->file);
free(queue);
}
ctlclear();
}
void
srvflush(Req *r)
{
if(r->fid != nil)
srvdestroyfid(r->fid);
respond(r->oldreq, nil);
respond(r, nil);
}
Srv s = {
.read = srvread,
.write = srvwrite,
.flush = srvflush,
.destroyfid = srvdestroyfid,
};