From 41a67f231be44cf679a8ad6df2f6133cd8340c61 Mon Sep 17 00:00:00 2001 From: glenda Date: Wed, 29 Jan 2025 03:52:16 +0000 Subject: [PATCH] queuesrv works!! --- mkfile | 2 +- queuesrv.c | 166 ++++++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 146 insertions(+), 22 deletions(-) diff --git a/mkfile b/mkfile index 23c2d14a5ea4d966db7fac4789f9f13c7325a4c4..327b2b423618cd0b6c2a5e8bb41566e752483c29 100644 --- a/mkfile +++ b/mkfile @@ -22,5 +22,5 @@ qclean:V: unmount /n || echo -n qdev:V: mk queuesrv - queuesrv || echo -n + queuesrv -D || echo -n mount /srv/queue /n diff --git a/queuesrv.c b/queuesrv.c index e70f1dccced00c328d7da0e666910b5ed0f49662..7365fbe8c5c5253c07caf605994581286243dcf2 100644 --- a/queuesrv.c +++ b/queuesrv.c @@ -1,12 +1,18 @@ #include #include +#include #include #include #include <9p.h> +enum { + QNew = 0, + QDeleted +}; + // typeof(file->aux) typedef struct { - int deleted; + int state; usize length; Fid* members[]; } Queue; @@ -17,7 +23,7 @@ typedef struct { } Inflight; typedef struct { char* name; - Queue* queue; + File* file; } QueueDesc; Srv s; @@ -83,6 +89,7 @@ main(int argc, char* argv[]) fmtinstall('D', dirfmt); fmtinstall('M', dirmodefmt); + #pragma varargck type "F" Fcall* fmtinstall('F', fcallfmt); queues = calloc(maxqueues, sizeof(QueueDesc)); @@ -105,19 +112,77 @@ char* newqueue(char* name) { int i; + Queue* queue; - for(i = 0; i < maxqueues && queues[i].queue != nil; i++); + for(i = 0; i < maxqueues && queues[i].file != nil; i++); if(i == maxqueues) return "new: too many queues"; - queues[i].queue = calloc(1, sizeof(Queue)); + queue = calloc(1, sizeof(Queue) + sizeof(Fid*)); + queue->length = 1; + + queues[i].file = createfile(s.tree->root, name, "none", 0444, queue); queues[i].name = name; - createfile(s.tree->root, name, "none", 0444, queues[i].queue); - ctlbuf = nil; + if(ctlbuf != nil){ + free(ctlbuf); + ctlbuf = nil; + } return nil; } +uint +newmember(Fid* fid) +{ + File* file; + Queue* queue; + Inflight* inflight; + uint i; + + file = fid->file; + queue = file->aux; + + // append member to queue + for(i = 0; i < queue->length && queue->members[i] != nil; i++); + // reallocate queue if necessary + if(i == queue->length){ + queue->length *= 2; + queue = realloc(queue, sizeof(Queue) + sizeof(Fid*) * queue->length); + file->aux = queue; + } + queue->members[i] = fid; + + // setup read queue + inflight = calloc(1, sizeof(Inflight) + sizeof(Req*)); + inflight->length = 1; + fid->aux = inflight; + + // invalidate ctlbuf + if(ctlbuf != nil) + free(ctlbuf); + ctlbuf = nil; + + // return spot in queue + return i; +} + +void +registerreq(Req* r) +{ + Inflight* inflight; + uint i; + + inflight = r->fid->aux; + + for(i = 0; i < inflight->length && inflight->members[i] != nil; i++); + if(i == inflight->length){ + inflight->length *= 2; + inflight = realloc(inflight, sizeof(Inflight) + sizeof(Req*) * inflight->length); + r->fid->aux = inflight; + } + + inflight->members[i] = r; +} // FIXME: global buffer could race on small read size. store data in fid->aux. void @@ -125,20 +190,24 @@ ctlregen(void) { char* p; usize blen; - int nmems, i; + uint nmems, i; + QueueDesc* qd; + Queue* queue; - blen = (maxname + 1 + 20 + 1) * maxqueues + 10; + blen = (maxname + 20) * maxqueues + 10; p = ctlbuf = calloc(blen, 1); for(i = 0; i < maxqueues; i++){ - if(queues[i].queue == nil) + qd = queues + i; + if(qd->file == nil) continue; + queue = qd->file->aux; for(nmems = 0; - nmems < queues[i].queue->length && queues[i].queue->members[nmems] != nil; + nmems < queue->length && queue->members[nmems] != nil; nmems++); - p += sprint(p, "%s %d\n", queues[i].name, nmems); + p += sprint(p, "%s %d\n", qd->name, nmems); } } void @@ -161,14 +230,8 @@ ctlwrite(Req* r) p = s = malloc(r->ifcall.count - 3); strncpy(s, r->ifcall.data + 4, r->ifcall.count - 4); do{ - switch(*p){ - case ' ': - case '\n': - case '\t': - case '\0': + if(isspace(*p)) *p = '\0'; - break; - } } while(*p++ != '\0'); @@ -178,7 +241,7 @@ ctlwrite(Req* r) } for(i = 0; i < maxqueues; i++){ - if(queues[i].queue == nil) + if(queues[i].file == nil) continue; if(strcmp(s, queues[i].name) == 0) break; @@ -197,12 +260,22 @@ ctlwrite(Req* r) void queueread(Req* r) { - respond(r, "not implemented"); + uint spot; + char buf[32]; + + if(r->fid->aux == nil){ + spot = newmember(r->fid); + sprint(buf, "%ud\n", spot); + readstr(r, buf); + respond(r, nil); + } + else + registerreq(r); } void queuewrite(Req* r) { - respond(r, "not implemented"); + respond(r, "queues are not writable"); } void @@ -213,6 +286,7 @@ srvread(Req* r) else queueread(r); } + void srvwrite(Req* r) { @@ -221,7 +295,57 @@ srvwrite(Req* r) else queuewrite(r); } + +void +srvdestroyfid(Fid* fid) +{ + print("dfid %p\n", fid); + + Queue* queue; + Inflight* inflight; + Req* r; + uint i, j; + char buf[32]; + + if(fid->aux == nil || fid->file->aux == nil) + return; + + queue = fid->file->aux; + for(i = 0; i < queue->length && queue->members[i] != fid && queue->members[i] != nil; i++); + if(i == queue->length || queue->members[i] == nil) + sysfatal("bugcheck: fid->aux != nil but fid not in queue->members"); + + for(; (i + 1) < queue->length && queue->members[i + 1] != nil; i++){ + queue->members[i] = queue->members[i + 1]; + inflight = queue->members[i]->aux; + + sprint(buf, "%ud\n", i); + for(j = 0; j < inflight->length; j++){ + r = inflight->members[j]; + if(r == nil) + continue; + + r->ifcall.offset = 0; + readstr(r, buf); + respond(r, nil); + } + } + + queue->members[i] = nil; +} + +void +srvflush(Req* r) +{ + if(r->fid != nil) + srvdestroyfid(r->fid); + respond(r->oldreq, nil); + respond(r, nil); +} + Srv s = { .read = srvread, .write = srvwrite, + .flush = srvflush, + .destroyfid = srvdestroyfid, };