From fff56ab1242e3ad7cddf15e7e981da55d06c4da5 Mon Sep 17 00:00:00 2001 From: Miklos Szeredi Date: Fri, 16 Nov 2001 10:12:59 +0000 Subject: [PATCH] better thread management --- example/fusexmp.c | 12 +++-- include/fuse.h | 28 +++++++++-- kernel/dev.c | 4 +- lib/Makefile.am | 1 + lib/fuse.c | 89 +++++++++++++--------------------- lib/fuse_i.h | 6 +++ lib/fuse_mt.c | 111 +++++++++++++++++++++++++++++++++++++++++++ python/_fusemodule.c | 61 +++++++++++++++++------- 8 files changed, 229 insertions(+), 83 deletions(-) create mode 100644 lib/fuse_mt.c diff --git a/example/fusexmp.c b/example/fusexmp.c index 2ed35dc..aeb4f88 100644 --- a/example/fusexmp.c +++ b/example/fusexmp.c @@ -290,6 +290,7 @@ int main(int argc, char *argv[]) { int argctr; int flags; + int multithreaded; struct fuse *fuse; if(argc < 2) { @@ -308,7 +309,8 @@ int main(int argc, char *argv[]) set_signal_handlers(); atexit(cleanup); - flags = FUSE_MULTITHREAD; + flags = 0; + multithreaded = 1; for(; argctr < argc && argv[argctr][0] == '-'; argctr ++) { switch(argv[argctr][1]) { case 'd': @@ -316,7 +318,7 @@ int main(int argc, char *argv[]) break; case 's': - flags &= ~FUSE_MULTITHREAD; + multithreaded = 0; break; default: @@ -331,7 +333,11 @@ int main(int argc, char *argv[]) fuse = fuse_new(0, flags); fuse_set_operations(fuse, &xmp_oper); - fuse_loop(fuse); + + if(multithreaded) + fuse_loop_mt(fuse); + else + fuse_loop(fuse); return 0; } diff --git a/include/fuse.h b/include/fuse.h index 60cd378..e284469 100644 --- a/include/fuse.h +++ b/include/fuse.h @@ -78,9 +78,6 @@ struct fuse_operations { /* FUSE flags: */ -/** Process requests in multiple threads */ -#define FUSE_MULTITHREAD (1 << 0) - /** Enable debuging output */ #define FUSE_DEBUG (1 << 1) @@ -114,6 +111,20 @@ void fuse_set_operations(struct fuse *f, const struct fuse_operations *op); */ void fuse_loop(struct fuse *f); +/** + * FUSE event loop with multiple threads + * + * Requests from the kernel are processed, and the apropriate + * operations are called. Request are processed in parallel by + * distributing them between multiple threads. + * + * Calling this function requires the pthreads library to be linked to + * the application. + * + * @param f the FUSE handle + */ +void fuse_loop_mt(struct fuse *f); + /** * Destroy the FUSE handle. * @@ -122,3 +133,14 @@ void fuse_loop(struct fuse *f); * @param f the FUSE handle */ void fuse_destroy(struct fuse *f); + + +/* --------------------------------------------------- * + * Advanced API, usually you need not bother with this * + * --------------------------------------------------- */ + +struct fuse_cmd; + +struct fuse_cmd *__fuse_read_cmd(struct fuse *f); + +void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd); diff --git a/kernel/dev.c b/kernel/dev.c index d183616..1cc12a2 100644 --- a/kernel/dev.c +++ b/kernel/dev.c @@ -163,8 +163,8 @@ static int request_wait(struct fuse_conn *fc) { int ret = 0; DECLARE_WAITQUEUE(wait, current); - - add_wait_queue(&fc->waitq, &wait); + + add_wait_queue_exclusive(&fc->waitq, &wait); while(list_empty(&fc->pending)) { set_current_state(TASK_INTERRUPTIBLE); if(signal_pending(current)) { diff --git a/lib/Makefile.am b/lib/Makefile.am index f970b11..7f289f7 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -4,4 +4,5 @@ lib_LIBRARIES = libfuse.a libfuse_a_SOURCES = \ fuse.c \ + fuse_mt.c \ fuse_i.h diff --git a/lib/fuse.c b/lib/fuse.c index ed352c6..6bd32ae 100644 --- a/lib/fuse.c +++ b/lib/fuse.c @@ -301,6 +301,7 @@ static void convert_stat(struct stat *stbuf, struct fuse_attr *attr) attr->atime = stbuf->st_atime; attr->mtime = stbuf->st_mtime; attr->ctime = stbuf->st_ctime; + attr->_dummy = 4096; } static int fill_dir(struct fuse_dirhandle *dh, char *name, int type) @@ -745,19 +746,11 @@ static void do_write(struct fuse *f, struct fuse_in_header *in, send_reply(f, in, res, NULL, 0); } -struct cmd { - struct fuse *f; - char *buf; - size_t buflen; -}; - -static void *do_command(void *data) +void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd) { - struct cmd *cmd = (struct cmd *) data; struct fuse_in_header *in = (struct fuse_in_header *) cmd->buf; void *inarg = cmd->buf + sizeof(struct fuse_in_header); size_t argsize; - struct fuse *f = cmd->f; if((f->flags & FUSE_DEBUG)) { printf("unique: %i, opcode: %i, ino: %li, insize: %i\n", in->unique, @@ -836,66 +829,46 @@ static void *do_command(void *data) if(in->unique != 0) send_reply(f, in, -ENOSYS, NULL, 0); } - + free(cmd->buf); free(cmd); - - return NULL; } -/* This hack makes it possible to link FUSE with or without the - pthread library */ -__attribute__((weak)) -int pthread_create(pthread_t *thrid __attribute__((unused)), - const pthread_attr_t *attr __attribute__((unused)), - void *(*func)(void *) __attribute__((unused)), - void *arg __attribute__((unused))) +struct fuse_cmd *__fuse_read_cmd(struct fuse *f) { - return ENOSYS; + ssize_t res; + char inbuf[FUSE_MAX_IN]; + struct fuse_cmd *cmd; + + res = read(f->fd, inbuf, sizeof(inbuf)); + if(res == -1) { + perror("reading fuse device"); + /* BAD... This will happen again */ + return NULL; + } + if((size_t) res < sizeof(struct fuse_in_header)) { + fprintf(stderr, "short read on fuse device\n"); + /* Cannot happen */ + return NULL; + } + + cmd = (struct fuse_cmd *) malloc(sizeof(*cmd)); + cmd->buflen = res; + cmd->buf = (char *) malloc(cmd->buflen); + memcpy(cmd->buf, inbuf, cmd->buflen); + + return cmd; } + void fuse_loop(struct fuse *f) { - int res; - char inbuf[FUSE_MAX_IN]; - pthread_attr_t attr; - pthread_t thrid; - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - while(1) { - struct cmd *cmd; - - res = read(f->fd, inbuf, sizeof(inbuf)); - if(res == -1) { - perror("reading fuse device"); - /* BAD... This will happen again */ - exit(1); - } - if((size_t) res < sizeof(struct fuse_in_header)) { - fprintf(stderr, "short read on fuse device\n"); - /* Cannot happen */ + struct fuse_cmd *cmd = __fuse_read_cmd(f); + if(cmd == NULL) exit(1); - } - - cmd = (struct cmd *) malloc(sizeof(struct cmd)); - cmd->f = f; - cmd->buflen = res; - cmd->buf = (char *) malloc(cmd->buflen); - memcpy(cmd->buf, inbuf, cmd->buflen); - - if(f->flags & FUSE_MULTITHREAD) { - res = pthread_create(&thrid, &attr, do_command, cmd); - if(res == 0) - continue; - - fprintf(stderr, "Error creating thread: %s\n", strerror(res)); - fprintf(stderr, "Will run in single thread mode\n"); - f->flags &= ~FUSE_MULTITHREAD; - } - do_command(cmd); + __fuse_process_cmd(f, cmd); } } @@ -909,6 +882,7 @@ struct fuse *fuse_new(int fd, int flags) f->flags = flags; f->fd = fd; f->ctr = 0; + /* FIXME: Dynamic hash table */ f->name_table_size = 14057; f->name_table = (struct node **) calloc(1, sizeof(struct node *) * f->name_table_size); @@ -934,6 +908,7 @@ void fuse_set_operations(struct fuse *f, const struct fuse_operations *op) void fuse_destroy(struct fuse *f) { + /* FIXME: Kill all threads... */ size_t i; for(i = 0; i < f->ino_table_size; i++) { struct node *node; diff --git a/lib/fuse_i.h b/lib/fuse_i.h index 4d3e042..6740608 100644 --- a/lib/fuse_i.h +++ b/lib/fuse_i.h @@ -40,3 +40,9 @@ struct fuse_dirhandle { fino_t dir; FILE *fp; }; + +struct fuse_cmd { + struct fuse *f; + char *buf; + size_t buflen; +}; diff --git a/lib/fuse_mt.c b/lib/fuse_mt.c new file mode 100644 index 0000000..ac616fe --- /dev/null +++ b/lib/fuse_mt.c @@ -0,0 +1,111 @@ +/* + FUSE: Filesystem in Userspace + Copyright (C) 2001 Miklos Szeredi (mszeredi@inf.bme.hu) + + This program can be distributed under the terms of the GNU GPL. + See the file COPYING. +*/ + +#include "fuse.h" + +#include +#include +#include +#include +#include +#include +#include + + +struct thread_common { + struct fuse *f; + struct fuse_cmd *cmd; + pthread_mutex_t lock; + pthread_cond_t cond; + int avail; +}; + +/* Called with c->lock held */ +static void *do_work(void *data) +{ + struct thread_common *c = (struct thread_common *) data; + struct fuse *f = c->f; + + c->avail ++; + while(1) { + int res; + struct timespec timeout; + struct timeval now; + struct fuse_cmd *cmd; + + gettimeofday(&now, NULL); + timeout.tv_sec = now.tv_sec + 1; + timeout.tv_nsec = now.tv_usec * 1000; + + res = 0; + while(c->cmd == NULL && res != ETIMEDOUT) + res = pthread_cond_timedwait(&c->cond, &c->lock, &timeout); + if(res == ETIMEDOUT) + break; + + cmd = c->cmd; + c->cmd = NULL; + c->avail --; + pthread_mutex_unlock(&c->lock); + __fuse_process_cmd(f, cmd); + pthread_mutex_lock(&c->lock); + c->avail ++; + } + + c->avail --; + pthread_mutex_unlock(&c->lock); + return NULL; +} + +static void start_thread(struct thread_common *c) +{ + pthread_attr_t attr; + pthread_t thrid; + sigset_t oldset; + sigset_t newset; + int res; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + + /* Disallow signal reception in worker threads */ + sigfillset(&newset); + sigprocmask(SIG_SETMASK, &newset, &oldset); + res = pthread_create(&thrid, &attr, do_work, c); + sigprocmask(SIG_SETMASK, &oldset, NULL); + pthread_mutex_lock(&c->lock); + if(res != 0) { + fprintf(stderr, "Error creating thread: %s\n", strerror(res)); + exit(1); + } +} + +void fuse_loop_mt(struct fuse *f) +{ + struct thread_common *c; + + c = (struct thread_common *) malloc(sizeof(struct thread_common)); + c->f = f; + c->cmd = NULL; + pthread_cond_init(&c->cond, NULL); + pthread_mutex_init(&c->lock, NULL); + c->avail = 0; + + while(1) { + struct fuse_cmd *cmd = __fuse_read_cmd(f); + if(cmd == NULL) + exit(1); + + pthread_mutex_lock(&c->lock); + c->cmd = cmd; + while(c->avail == 0) + start_thread(c); + pthread_cond_signal(&c->cond); + pthread_mutex_unlock(&c->lock); + } +} diff --git a/python/_fusemodule.c b/python/_fusemodule.c index eee4fda..2f5e79c 100644 --- a/python/_fusemodule.c +++ b/python/_fusemodule.c @@ -68,29 +68,58 @@ static int readlink_func(const char *path, char *link, size_t size) { EPILOGUE } +static int getdir_add_entry(PyObject *w, fuse_dirh_t dh, fuse_dirfil_t df) +{ + PyObject *o0; + PyObject *o1; + int ret = -EINVAL; + + if(!PySequence_Check(w)) { + printf("getdir item not sequence\n"); + goto out; + } + if(PySequence_Length(w) != 2) { + printf("getdir item not len 2\n"); + goto out; + } + o0 = PySequence_GetItem(w, 0); + o1 = PySequence_GetItem(w, 1); + + if(!PyString_Check(o0)) { + printf("getdir item[0] not string\n"); + goto out_decref; + } + if(!PyInt_Check(o1)) { + printf("getdir item[1] not int\n"); + goto out_decref; + } + + ret = df(dh, PyString_AsString(o0), PyInt_AsLong(o1)); + +out_decref: + Py_DECREF(o0); + Py_DECREF(o1); + +out: + return ret; +} + static int getdir_func(const char *path, fuse_dirh_t dh, fuse_dirfil_t df) { PyObject *v = PyObject_CallFunction(getdir_cb, "s", path); int i; PROLOGUE - if(!PySequence_Check(v)) { printf("getdir_func not sequence\n");goto OUT_DECREF; } - for(i=0; i < PySequence_Length(v); i++) { - PyObject *w = PySequence_GetItem(v, i); - printf("getdir_func validate %d\n", i); - if(!PySequence_Check(w)) { printf("getdir item not sequence\n"); goto OUT_DECREF; } - if(PySequence_Length(w) != 2) { printf("getdir item not len 2\n"); goto OUT_DECREF; } - if(!PyString_Check(PySequence_GetItem(w,0))){ printf("getdir item[0] not string"); goto OUT_DECREF; } - if(!PyInt_Check(PySequence_GetItem(w, 1))) { printf("getdir item[1] not int"); goto OUT_DECREF; } + if(!PySequence_Check(v)) { + printf("getdir_func not sequence\n"); + goto OUT_DECREF; } - for(i=0; i < PySequence_Length(v); i++) { PyObject *w = PySequence_GetItem(v, i); - printf("getdir_func %d\n", i); - ret = df(dh, PyString_AsString(PySequence_GetItem(w, 0)), - PyInt_AsLong(PySequence_GetItem(w, 1))); - if(ret) goto OUT_DECREF; + ret = getdir_add_entry(w, dh, df); + Py_DECREF(w); + if(ret != 0) + goto OUT_DECREF; } - ret = 0; EPILOGUE @@ -191,8 +220,6 @@ int open_func(const char *path, int mode) { static PyObject * Fuse_main(PyObject *self, PyObject *args, PyObject *kw) { - PyObject *list, *item; - int flags=0; struct fuse_operations op; @@ -260,7 +287,5 @@ init_fuse(void) d = PyModule_GetDict(m); ErrorObject = PyErr_NewException("fuse.error", NULL, NULL); PyDict_SetItemString(d, "error", ErrorObject); - PyDict_SetItemString(d, "MULTITHREAD", PyInt_FromLong(FUSE_MULTITHREAD)); PyDict_SetItemString(d, "DEBUG", PyInt_FromLong(FUSE_DEBUG)); - } -- 2.30.2