#include #include #include #include #include #include <9p.h> enum { QNew = 0, QDeleted }; // typeof(file->aux) typedef struct { int state; usize length; Fid *members[]; } Queue; // typeof(fid->aux) typedef struct { usize length; Req *members[]; } Inflight; typedef struct { char *name; File *file; } QueueDesc; Srv s; int stdio; char *service; int maxname; int queueslen; QueueDesc *queues; char *ctlbuf; void usage(void) { fprint(2,"usage: queuesrv [ -Ds ] [ -n maxname ] [ service ]\n"); exits("usage"); } void setuptree(void) { s.tree = alloctree("none", "none", 0666, nil); createfile(s.tree->root, "ctl", "none", 0666, nil); } void main(int argc, char *argv[]) { ARGBEGIN{ case 's': stdio = 1; break; case 'D': chatty9p++; break; case 'n': maxname = atoi(EARGF(usage())); break; default: usage(); }ARGEND switch(argc){ case 0: service = "queue"; break; case 1: service = argv[0]; break; default: usage(); } if(maxname <= 0) maxname = 64; fmtinstall('D', dirfmt); fmtinstall('M', dirmodefmt); #pragma varargck type "F" Fcall * fmtinstall('F', fcallfmt); queueslen = 0; setuptree(); if(stdio){ s.infd = 0; s.outfd = 1; srv(&s); }else{ print("posting service %s\n", service); if(postsrv(&s, service) < 0) sysfatal("couldn't postsrv: %r"); } } void ctlclear(void) { if(ctlbuf != nil){ free(ctlbuf); ctlbuf = nil; } } char * newqueue(char *name) { int i; Queue *queue; for(i = 0; i < queueslen && queues[i].file != nil; i++); print("pre-realloc %d\n", queueslen); if(i == queueslen) queues = realloc(queues, (++queueslen) * sizeof(QueueDesc)); print("post-realloc %d\n", queueslen); queue = calloc(1, sizeof(Queue) + sizeof(Fid *)); queue->length = 1; queues[i].file = createfile(s.tree->root, name, "none", 0444, queue); queues[i].name = name; ctlclear(); return nil; } uint newmember(Fid *fid) { File *file; Queue *queue; Inflight *inflight; uint i; file = fid->file; queue = file->aux; // append member to queue for(i = 0; i < queue->length && queue->members[i] != nil; i++); // reallocate queue if necessary if(i == queue->length){ queue->length *= 2; queue = realloc(queue, sizeof(Queue) + sizeof(Fid *) * queue->length); file->aux = queue; } queue->members[i] = fid; // setup read queue inflight = calloc(1, sizeof(Inflight) + sizeof(Req *)); inflight->length = 1; fid->aux = inflight; ctlclear(); // return spot in queue return i; } char * delqueue(QueueDesc* qd) { uint i; File *file; Queue *queue; file = qd->file; queue = file->aux; for(i = 0; i < queue->length && queue->members[i] == nil; i++); removefile(file); if(i == queue->length){ free(queue); }else queue->state = QDeleted; free(qd->name); qd->file = nil; qd->name = nil; ctlclear(); return nil; } void registerreq(Req *r) { Inflight *inflight; uint i; inflight = r->fid->aux; for(i = 0; i < inflight->length && inflight->members[i] != nil; i++); if(i == inflight->length){ inflight->length *= 2; inflight = realloc(inflight, sizeof(Inflight) + sizeof(Req *) * inflight->length); r->fid->aux = inflight; } inflight->members[i] = r; } // FIXME: global buffer could race on small read size. store data in fid->aux. void ctlregen(void) { char *p; usize blen; uint nmems, i; QueueDesc *qd; Queue *queue; blen = 10; for(i = 0; i < queueslen; i++){ qd = queues + i; if(qd->file != nil && qd->name != nil) blen += strlen(qd->name) + 20; } p = ctlbuf = malloc(blen); p[0] = 0; for(i = 0; i < queueslen; i++){ qd = queues + i; if(qd->file == nil) continue; queue = qd->file->aux; for(nmems = 0; nmems < queue->length && queue->members[nmems] != nil; nmems++); p += sprint(p, "%s\t%d\n", qd->name, nmems); } } void ctlread(Req *r) { if(ctlbuf == nil) ctlregen(); readstr(r, ctlbuf); respond(r, nil); } // FIXME: errors if command split between multiple writes void ctlwrite(Req *r) { char *s, *p, *resp; int i; //FIXME: break parsing logic out into a function if(r->ifcall.count > 5 && strncmp(r->ifcall.data, "new ", 4) == 0){ p = s = malloc(r->ifcall.count - 3); strncpy(s, r->ifcall.data + 4, r->ifcall.count - 4); do{ if(isspace(*p)) *p = '\0'; } while(*p++ != '\0'); if(strlen(s) == 0){ respond(r, "new: missing argument"); return; } for(i = 0; i < queueslen; i++){ if(queues[i].file == nil) continue; if(strcmp(s, queues[i].name) == 0) break; } if(i == queueslen){ resp = newqueue(s); respond(r, resp); if(resp != nil) free(s); }else respond(r, nil); }else if(r->ifcall.count > 8 && strncmp(r->ifcall.data, "delete ", 7) == 0){ p = s = malloc(r->ifcall.count - 6); strncpy(s, r->ifcall.data + 7, r->ifcall.count - 7); do{ if(isspace(*p)) *p = '\0'; } while(*p++ != '\0'); if(strlen(s) == 0){ respond(r, "delete: missing argument"); return; } print("trying to delete %s\n", s); for(i = 0; i < queueslen; i++){ if(queues[i].file == nil) continue; if(strcmp(s, queues[i].name) == 0) break; } if(i != queueslen){ respond(r, delqueue(&(queues[i]))); free(s); }else respond(r, "queue does not exist"); }else respond(r, "bad command"); } void queueread(Req *r) { uint spot; char buf[32]; if(r->fid->aux == nil){ spot = newmember(r->fid); sprint(buf, "%ud\n", spot); readstr(r, buf); respond(r, nil); } else registerreq(r); } void queuewrite(Req *r) { respond(r, "queues are not writable"); } void srvread(Req *r) { if(r->fid->file->aux == nil) ctlread(r); else queueread(r); } void srvwrite(Req *r) { if(r->fid->file->aux == nil) ctlwrite(r); else queuewrite(r); } void srvdestroyfid(Fid *fid) { Queue *queue; Inflight *inflight; Req *r; uint i, j; char buf[32]; if(fid->aux == nil || fid->file->aux == nil) return; queue = fid->file->aux; for(i = 0; i < queue->length && queue->members[i] != fid && queue->members[i] != nil; i++); if(i == queue->length || queue->members[i] == nil) sysfatal("bugcheck: fid->aux != nil but fid not in queue->members"); free(fid->aux); for(; (i + 1) < queue->length && queue->members[i + 1] != nil; i++){ queue->members[i] = queue->members[i + 1]; inflight = queue->members[i]->aux; sprint(buf, "%ud\n", i); for(j = 0; j < inflight->length; j++){ r = inflight->members[j]; if(r == nil) continue; r->ifcall.offset = 0; readstr(r, buf); respond(r, nil); inflight->members[j] = nil; } } queue->members[i] = nil; if(i == 0 && queue->state == QDeleted){ removefile(fid->file); free(queue); } ctlclear(); } void srvflush(Req *r) { if(r->fid != nil) srvdestroyfid(r->fid); respond(r->oldreq, nil); respond(r, nil); } Srv s = { .read = srvread, .write = srvwrite, .flush = srvflush, .destroyfid = srvdestroyfid, };