#include #include #include #include #include <9p.h> // typeof(file->aux) typedef struct { int deleted; usize length; Fid* members[]; } Queue; // typeof(fid->aux) typedef struct { usize length; Req* members[]; } Inflight; typedef struct { char* name; Queue* queue; } QueueDesc; Srv s; int stdio; char* service; // FIXME: dynamically allocate queues int maxname; int maxqueues; QueueDesc* queues; char* ctlbuf; void usage(void) { fprint(2,"usage: queuefs [ -Ds ] [ -n maxname ] [ -q maxqueues ] [ service ]\n"); exits("usage"); } void setuptree(void) { s.tree = alloctree("none", "none", 0666, nil); createfile(s.tree->root, "ctl", "none", 0666, nil); } void main(int argc, char* argv[]) { ARGBEGIN{ case 's': stdio = 1; break; case 'D': chatty9p++; break; case 'n': maxname = atoi(EARGF(usage())); break; case 'q': maxqueues = atoi(EARGF(usage())); break; default: usage(); } ARGEND switch(argc){ case 0: service = "queue"; break; case 1: service = argv[0]; break; default: usage(); } if(maxname <= 0) maxname = 64; if(maxqueues <= 0) maxqueues = 256; fmtinstall('D', dirfmt); fmtinstall('M', dirmodefmt); fmtinstall('F', fcallfmt); queues = calloc(maxqueues, sizeof(QueueDesc)); setuptree(); if(stdio){ s.infd = 0; s.outfd = 1; srv(&s); } else{ print("posting service %s\n", service); if(postsrv(&s, service) < 0) sysfatal("couldn't postsrv: %r"); } } char* newqueue(char* name) { int i; for(i = 0; i < maxqueues && queues[i].queue != nil; i++); if(i == maxqueues) return "new: too many queues"; queues[i].queue = calloc(1, sizeof(Queue)); queues[i].name = name; createfile(s.tree->root, name, "none", 0444, queues[i].queue); ctlbuf = nil; return nil; } // FIXME: global buffer could race on small read size. store data in fid->aux. void ctlregen(void) { char* p; usize blen; int nmems, i; blen = (maxname + 1 + 20 + 1) * maxqueues + 10; p = ctlbuf = calloc(blen, 1); for(i = 0; i < maxqueues; i++){ if(queues[i].queue == nil) continue; for(nmems = 0; nmems < queues[i].queue->length && queues[i].queue->members[nmems] != nil; nmems++); p += sprint(p, "%s %d\n", queues[i].name, nmems); } } void ctlread(Req* r) { if(ctlbuf == nil) ctlregen(); readstr(r, ctlbuf); respond(r, nil); } // FIXME: errors if command split between multiple writes void ctlwrite(Req* r) { char *s, *p; int i; if(r->ifcall.count > 5 && strncmp(r->ifcall.data, "new ", 4) == 0){ p = s = malloc(r->ifcall.count - 3); strncpy(s, r->ifcall.data + 4, r->ifcall.count - 4); do{ switch(*p){ case ' ': case '\n': case '\t': case '\0': *p = '\0'; break; } } while(*p++ != '\0'); if(strlen(s) == 0){ respond(r, "new: missing argument"); return; } for(i = 0; i < maxqueues; i++){ if(queues[i].queue == nil) continue; if(strcmp(s, queues[i].name) == 0) break; } // FIXME: might leak s here if(i == maxqueues) respond(r, newqueue(s)); else respond(r, nil); } else respond(r, "bad command"); } void queueread(Req* r) { respond(r, "not implemented"); } void queuewrite(Req* r) { respond(r, "not implemented"); } void srvread(Req* r) { if(r->fid->file->aux == nil) ctlread(r); else queueread(r); } void srvwrite(Req* r) { if(r->fid->file->aux == nil) ctlwrite(r); else queuewrite(r); } Srv s = { .read = srvread, .write = srvwrite, };