#include #include #include #include #include #include <9p.h> enum { QNew = 0, QDeleted }; // typeof(file->aux) typedef struct { int state; usize length; Fid* members[]; } Queue; // typeof(fid->aux) typedef struct { usize length; Req* members[]; } Inflight; typedef struct { char* name; File* file; } QueueDesc; Srv s; int stdio; char* service; // FIXME: dynamically allocate queues int maxname; int maxqueues; QueueDesc* queues; char* ctlbuf; void usage(void) { fprint(2,"usage: queuefs [ -Ds ] [ -n maxname ] [ -q maxqueues ] [ service ]\n"); exits("usage"); } void setuptree(void) { s.tree = alloctree("none", "none", 0666, nil); createfile(s.tree->root, "ctl", "none", 0666, nil); } void main(int argc, char* argv[]) { ARGBEGIN{ case 's': stdio = 1; break; case 'D': chatty9p++; break; case 'n': maxname = atoi(EARGF(usage())); break; case 'q': maxqueues = atoi(EARGF(usage())); break; default: usage(); } ARGEND switch(argc){ case 0: service = "queue"; break; case 1: service = argv[0]; break; default: usage(); } if(maxname <= 0) maxname = 64; if(maxqueues <= 0) maxqueues = 256; fmtinstall('D', dirfmt); fmtinstall('M', dirmodefmt); #pragma varargck type "F" Fcall* fmtinstall('F', fcallfmt); queues = calloc(maxqueues, sizeof(QueueDesc)); setuptree(); if(stdio){ s.infd = 0; s.outfd = 1; srv(&s); } else{ print("posting service %s\n", service); if(postsrv(&s, service) < 0) sysfatal("couldn't postsrv: %r"); } } char* newqueue(char* name) { int i; Queue* queue; for(i = 0; i < maxqueues && queues[i].file != nil; i++); if(i == maxqueues) return "new: too many queues"; queue = calloc(1, sizeof(Queue) + sizeof(Fid*)); queue->length = 1; queues[i].file = createfile(s.tree->root, name, "none", 0444, queue); queues[i].name = name; if(ctlbuf != nil){ free(ctlbuf); ctlbuf = nil; } return nil; } uint newmember(Fid* fid) { File* file; Queue* queue; Inflight* inflight; uint i; file = fid->file; queue = file->aux; // append member to queue for(i = 0; i < queue->length && queue->members[i] != nil; i++); // reallocate queue if necessary if(i == queue->length){ queue->length *= 2; queue = realloc(queue, sizeof(Queue) + sizeof(Fid*) * queue->length); file->aux = queue; } queue->members[i] = fid; // setup read queue inflight = calloc(1, sizeof(Inflight) + sizeof(Req*)); inflight->length = 1; fid->aux = inflight; // invalidate ctlbuf if(ctlbuf != nil) free(ctlbuf); ctlbuf = nil; // return spot in queue return i; } void registerreq(Req* r) { Inflight* inflight; uint i; inflight = r->fid->aux; for(i = 0; i < inflight->length && inflight->members[i] != nil; i++); if(i == inflight->length){ inflight->length *= 2; inflight = realloc(inflight, sizeof(Inflight) + sizeof(Req*) * inflight->length); r->fid->aux = inflight; } inflight->members[i] = r; } // FIXME: global buffer could race on small read size. store data in fid->aux. void ctlregen(void) { char* p; usize blen; uint nmems, i; QueueDesc* qd; Queue* queue; blen = (maxname + 20) * maxqueues + 10; p = ctlbuf = calloc(blen, 1); for(i = 0; i < maxqueues; i++){ qd = queues + i; if(qd->file == nil) continue; queue = qd->file->aux; for(nmems = 0; nmems < queue->length && queue->members[nmems] != nil; nmems++); p += sprint(p, "%s %d\n", qd->name, nmems); } } void ctlread(Req* r) { if(ctlbuf == nil) ctlregen(); readstr(r, ctlbuf); respond(r, nil); } // FIXME: errors if command split between multiple writes void ctlwrite(Req* r) { char *s, *p; int i; if(r->ifcall.count > 5 && strncmp(r->ifcall.data, "new ", 4) == 0){ p = s = malloc(r->ifcall.count - 3); strncpy(s, r->ifcall.data + 4, r->ifcall.count - 4); do{ if(isspace(*p)) *p = '\0'; } while(*p++ != '\0'); if(strlen(s) == 0){ respond(r, "new: missing argument"); return; } for(i = 0; i < maxqueues; i++){ if(queues[i].file == nil) continue; if(strcmp(s, queues[i].name) == 0) break; } // FIXME: might leak s here if(i == maxqueues) respond(r, newqueue(s)); else respond(r, nil); } else respond(r, "bad command"); } void queueread(Req* r) { uint spot; char buf[32]; if(r->fid->aux == nil){ spot = newmember(r->fid); sprint(buf, "%ud\n", spot); readstr(r, buf); respond(r, nil); } else registerreq(r); } void queuewrite(Req* r) { respond(r, "queues are not writable"); } void srvread(Req* r) { if(r->fid->file->aux == nil) ctlread(r); else queueread(r); } void srvwrite(Req* r) { if(r->fid->file->aux == nil) ctlwrite(r); else queuewrite(r); } void srvdestroyfid(Fid* fid) { print("dfid %p\n", fid); Queue* queue; Inflight* inflight; Req* r; uint i, j; char buf[32]; if(fid->aux == nil || fid->file->aux == nil) return; queue = fid->file->aux; for(i = 0; i < queue->length && queue->members[i] != fid && queue->members[i] != nil; i++); if(i == queue->length || queue->members[i] == nil) sysfatal("bugcheck: fid->aux != nil but fid not in queue->members"); for(; (i + 1) < queue->length && queue->members[i + 1] != nil; i++){ queue->members[i] = queue->members[i + 1]; inflight = queue->members[i]->aux; sprint(buf, "%ud\n", i); for(j = 0; j < inflight->length; j++){ r = inflight->members[j]; if(r == nil) continue; r->ifcall.offset = 0; readstr(r, buf); respond(r, nil); } } queue->members[i] = nil; } void srvflush(Req* r) { if(r->fid != nil) srvdestroyfid(r->fid); respond(r->oldreq, nil); respond(r, nil); } Srv s = { .read = srvread, .write = srvwrite, .flush = srvflush, .destroyfid = srvdestroyfid, };