#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>
// typeof(file->aux)
typedef struct {
int deleted;
usize length;
Fid* members[];
} Queue;
// typeof(fid->aux)
typedef struct {
usize length;
Req* members[];
} Inflight;
typedef struct {
char* name;
Queue* queue;
} QueueDesc;
Srv s;
int stdio;
char* service;
// FIXME: dynamically allocate queues
int maxname;
int maxqueues;
QueueDesc* queues;
char* ctlbuf;
void
usage(void)
{
fprint(2,"usage: queuefs [ -Ds ] [ -n maxname ] [ -q maxqueues ] [ service ]\n");
exits("usage");
}
void
setuptree(void)
{
s.tree = alloctree("none", "none", 0666, nil);
createfile(s.tree->root, "ctl", "none", 0666, nil);
}
void
main(int argc, char* argv[])
{
ARGBEGIN{
case 's':
stdio = 1;
break;
case 'D':
chatty9p++;
break;
case 'n':
maxname = atoi(EARGF(usage()));
break;
case 'q':
maxqueues = atoi(EARGF(usage()));
break;
default:
usage();
} ARGEND
switch(argc){
case 0:
service = "queue";
break;
case 1:
service = argv[0];
break;
default:
usage();
}
if(maxname <= 0)
maxname = 64;
if(maxqueues <= 0)
maxqueues = 256;
fmtinstall('D', dirfmt);
fmtinstall('M', dirmodefmt);
fmtinstall('F', fcallfmt);
queues = calloc(maxqueues, sizeof(QueueDesc));
setuptree();
if(stdio){
s.infd = 0;
s.outfd = 1;
srv(&s);
}
else{
print("posting service %s\n", service);
if(postsrv(&s, service) < 0)
sysfatal("couldn't postsrv: %r");
}
}
char*
newqueue(char* name)
{
int i;
for(i = 0; i < maxqueues && queues[i].queue != nil; i++);
if(i == maxqueues)
return "new: too many queues";
queues[i].queue = calloc(1, sizeof(Queue));
queues[i].name = name;
createfile(s.tree->root, name, "none", 0444, queues[i].queue);
ctlbuf = nil;
return nil;
}
// FIXME: global buffer could race on small read size. store data in fid->aux.
void
ctlregen(void)
{
char* p;
usize blen;
int nmems, i;
blen = (maxname + 1 + 20 + 1) * maxqueues + 10;
p = ctlbuf = calloc(blen, 1);
for(i = 0; i < maxqueues; i++){
if(queues[i].queue == nil)
continue;
for(nmems = 0;
nmems < queues[i].queue->length && queues[i].queue->members[nmems] != nil;
nmems++);
p += sprint(p, "%s %d\n", queues[i].name, nmems);
}
}
void
ctlread(Req* r)
{
if(ctlbuf == nil)
ctlregen();
readstr(r, ctlbuf);
respond(r, nil);
}
// FIXME: errors if command split between multiple writes
void
ctlwrite(Req* r)
{
char *s, *p;
int i;
if(r->ifcall.count > 5 && strncmp(r->ifcall.data, "new ", 4) == 0){
p = s = malloc(r->ifcall.count - 3);
strncpy(s, r->ifcall.data + 4, r->ifcall.count - 4);
do{
switch(*p){
case ' ':
case '\n':
case '\t':
case '\0':
*p = '\0';
break;
}
}
while(*p++ != '\0');
if(strlen(s) == 0){
respond(r, "new: missing argument");
return;
}
for(i = 0; i < maxqueues; i++){
if(queues[i].queue == nil)
continue;
if(strcmp(s, queues[i].name) == 0)
break;
}
// FIXME: might leak s here
if(i == maxqueues)
respond(r, newqueue(s));
else
respond(r, nil);
}
else
respond(r, "bad command");
}
void
queueread(Req* r)
{
respond(r, "not implemented");
}
void
queuewrite(Req* r)
{
respond(r, "not implemented");
}
void
srvread(Req* r)
{
if(r->fid->file->aux == nil)
ctlread(r);
else
queueread(r);
}
void
srvwrite(Req* r)
{
if(r->fid->file->aux == nil)
ctlwrite(r);
else
queuewrite(r);
}
Srv s = {
.read = srvread,
.write = srvwrite,
};