better thread management
authorMiklos Szeredi <miklos@szeredi.hu>
Fri, 16 Nov 2001 10:12:59 +0000 (10:12 +0000)
committerMiklos Szeredi <miklos@szeredi.hu>
Fri, 16 Nov 2001 10:12:59 +0000 (10:12 +0000)
example/fusexmp.c
include/fuse.h
kernel/dev.c
lib/Makefile.am
lib/fuse.c
lib/fuse_i.h
lib/fuse_mt.c [new file with mode: 0644]
python/_fusemodule.c

index 2ed35dc18f24e6ebdb840c2bcbd5cd52f28fac82..aeb4f884b1cf81be2f0c20331377e64a7f9f02e6 100644 (file)
@@ -290,6 +290,7 @@ int main(int argc, char *argv[])
 {
     int argctr;
     int flags;
+    int multithreaded;
     struct fuse *fuse;
 
     if(argc < 2) {
@@ -308,7 +309,8 @@ int main(int argc, char *argv[])
     set_signal_handlers();
     atexit(cleanup);
 
-    flags = FUSE_MULTITHREAD;
+    flags = 0;
+    multithreaded = 1;
     for(; argctr < argc && argv[argctr][0] == '-'; argctr ++) {
         switch(argv[argctr][1]) {
         case 'd':
@@ -316,7 +318,7 @@ int main(int argc, char *argv[])
             break;
 
         case 's':
-            flags &= ~FUSE_MULTITHREAD;
+            multithreaded = 0;
             break;
 
         default:
@@ -331,7 +333,11 @@ int main(int argc, char *argv[])
 
     fuse = fuse_new(0, flags);
     fuse_set_operations(fuse, &xmp_oper);
-    fuse_loop(fuse);
+
+    if(multithreaded)
+        fuse_loop_mt(fuse);
+    else
+        fuse_loop(fuse);
 
     return 0;
 }
index 60cd37844267ba7f4c8ca6708591e3d8f5b255ad..e284469ac3506e1e146f4340fe50264219604865 100644 (file)
@@ -78,9 +78,6 @@ struct fuse_operations {
 
 /* FUSE flags: */
 
-/** Process requests in multiple threads */
-#define FUSE_MULTITHREAD (1 << 0)
-
 /** Enable debuging output */
 #define FUSE_DEBUG       (1 << 1)
 
@@ -114,6 +111,20 @@ void fuse_set_operations(struct fuse *f, const struct fuse_operations *op);
  */
 void fuse_loop(struct fuse *f);
 
+/**
+ * FUSE event loop with multiple threads
+ *
+ * Requests from the kernel are processed, and the apropriate
+ * operations are called.  Request are processed in parallel by
+ * distributing them between multiple threads.
+ *
+ * Calling this function requires the pthreads library to be linked to
+ * the application.
+ *
+ * @param f the FUSE handle
+ */
+void fuse_loop_mt(struct fuse *f);
+
 /**
  * Destroy the FUSE handle. 
  *
@@ -122,3 +133,14 @@ void fuse_loop(struct fuse *f);
  * @param f the FUSE handle
  */
 void fuse_destroy(struct fuse *f);
+
+
+/* --------------------------------------------------- *
+ * Advanced API, usually you need not bother with this *
+ * --------------------------------------------------- */
+
+struct fuse_cmd;
+
+struct fuse_cmd *__fuse_read_cmd(struct fuse *f);
+
+void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd);
index d183616745ca05d1e27e9bf44dbc2b345ed7ba47..1cc12a2c4e18326671da6cddee173a543fdb2a83 100644 (file)
@@ -163,8 +163,8 @@ static int request_wait(struct fuse_conn *fc)
 {
        int ret = 0;
        DECLARE_WAITQUEUE(wait, current);
-
-       add_wait_queue(&fc->waitq, &wait);
+       
+       add_wait_queue_exclusive(&fc->waitq, &wait);
        while(list_empty(&fc->pending)) {
                set_current_state(TASK_INTERRUPTIBLE);
                if(signal_pending(current)) {
index f970b112fa4b85b597ef25c56124132f9419e8ea..7f289f72f5aa87a97c6ffc1731db5461aa29438f 100644 (file)
@@ -4,4 +4,5 @@ lib_LIBRARIES = libfuse.a
 
 libfuse_a_SOURCES =    \
        fuse.c          \
+       fuse_mt.c       \
        fuse_i.h
index ed352c6cde097fb83bf2c70e62f484355e6761b3..6bd32aeec3455c60adf61d670ebb36ed606ad2b9 100644 (file)
@@ -301,6 +301,7 @@ static void convert_stat(struct stat *stbuf, struct fuse_attr *attr)
     attr->atime   = stbuf->st_atime;
     attr->mtime   = stbuf->st_mtime;
     attr->ctime   = stbuf->st_ctime;
+    attr->_dummy  = 4096;
 }
 
 static int fill_dir(struct fuse_dirhandle *dh, char *name, int type)
@@ -745,19 +746,11 @@ static void do_write(struct fuse *f, struct fuse_in_header *in,
     send_reply(f, in, res, NULL, 0);
 }
 
-struct cmd {
-    struct fuse *f;
-    char *buf;
-    size_t buflen;
-};
-
-static void *do_command(void *data)
+void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd)
 {
-    struct cmd *cmd = (struct cmd *) data;
     struct fuse_in_header *in = (struct fuse_in_header *) cmd->buf;
     void *inarg = cmd->buf + sizeof(struct fuse_in_header);
     size_t argsize;
-    struct fuse *f = cmd->f;
 
     if((f->flags & FUSE_DEBUG)) {
         printf("unique: %i, opcode: %i, ino: %li, insize: %i\n", in->unique,
@@ -836,66 +829,46 @@ static void *do_command(void *data)
         if(in->unique != 0)
             send_reply(f, in, -ENOSYS, NULL, 0);
     }
-
+    
     free(cmd->buf);
     free(cmd);
-
-    return NULL;
 }
 
-/* This hack makes it possible to link FUSE with or without the
-   pthread library */
-__attribute__((weak))
-int pthread_create(pthread_t *thrid           __attribute__((unused)), 
-                   const pthread_attr_t *attr __attribute__((unused)), 
-                   void *(*func)(void *)      __attribute__((unused)),
-                   void *arg                  __attribute__((unused)))
+struct fuse_cmd *__fuse_read_cmd(struct fuse *f)
 {
-    return ENOSYS;
+    ssize_t res;
+    char inbuf[FUSE_MAX_IN];
+    struct fuse_cmd *cmd;
+    
+    res = read(f->fd, inbuf, sizeof(inbuf));
+    if(res == -1) {
+        perror("reading fuse device");
+        /* BAD... This will happen again */
+        return NULL;
+    }
+    if((size_t) res < sizeof(struct fuse_in_header)) {
+        fprintf(stderr, "short read on fuse device\n");
+        /* Cannot happen */
+        return NULL;
+    }
+
+    cmd = (struct fuse_cmd *) malloc(sizeof(*cmd));
+    cmd->buflen = res;
+    cmd->buf = (char *) malloc(cmd->buflen);
+    memcpy(cmd->buf, inbuf, cmd->buflen);
+
+    return cmd;
 }
 
+
 void fuse_loop(struct fuse *f)
 {
-    int res;
-    char inbuf[FUSE_MAX_IN];
-    pthread_attr_t attr;
-    pthread_t thrid;
-
-    pthread_attr_init(&attr);
-    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    
     while(1) {
-        struct cmd *cmd;
-
-        res = read(f->fd, inbuf, sizeof(inbuf));
-        if(res == -1) {
-            perror("reading fuse device");
-            /* BAD... This will happen again */
-            exit(1);
-        }
-        if((size_t) res < sizeof(struct fuse_in_header)) {
-            fprintf(stderr, "short read on fuse device\n");
-            /* Cannot happen */
+        struct fuse_cmd *cmd = __fuse_read_cmd(f);
+        if(cmd == NULL)
             exit(1);
-        }
-
-        cmd = (struct cmd *) malloc(sizeof(struct cmd));
-        cmd->f = f;
-        cmd->buflen = res;
-        cmd->buf = (char *) malloc(cmd->buflen);
-        memcpy(cmd->buf, inbuf, cmd->buflen);
-        
-        if(f->flags & FUSE_MULTITHREAD) {
-            res = pthread_create(&thrid, &attr, do_command, cmd);
-            if(res == 0)
-                continue;
-            
-            fprintf(stderr, "Error creating thread: %s\n", strerror(res));
-            fprintf(stderr, "Will run in single thread mode\n");
-            f->flags &= ~FUSE_MULTITHREAD;
-        }
 
-        do_command(cmd);
+        __fuse_process_cmd(f, cmd);
     }
 }
 
@@ -909,6 +882,7 @@ struct fuse *fuse_new(int fd, int flags)
     f->flags = flags;
     f->fd = fd;
     f->ctr = 0;
+    /* FIXME: Dynamic hash table */
     f->name_table_size = 14057;
     f->name_table = (struct node **)
         calloc(1, sizeof(struct node *) * f->name_table_size);
@@ -934,6 +908,7 @@ void fuse_set_operations(struct fuse *f, const struct fuse_operations *op)
 
 void fuse_destroy(struct fuse *f)
 {
+    /* FIXME: Kill all threads... */
     size_t i;
     for(i = 0; i < f->ino_table_size; i++) {
         struct node *node;
index 4d3e0422096307059892ebdef0eef510f64ddab0..67406081cff80f9c2ebd1feac05cbda0fc76c458 100644 (file)
@@ -40,3 +40,9 @@ struct fuse_dirhandle {
     fino_t dir;
     FILE *fp;
 };
+
+struct fuse_cmd {
+    struct fuse *f;
+    char *buf;
+    size_t buflen;
+};
diff --git a/lib/fuse_mt.c b/lib/fuse_mt.c
new file mode 100644 (file)
index 0000000..ac616fe
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+    FUSE: Filesystem in Userspace
+    Copyright (C) 2001  Miklos Szeredi (mszeredi@inf.bme.hu)
+
+    This program can be distributed under the terms of the GNU GPL.
+    See the file COPYING.
+*/
+
+#include "fuse.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <pthread.h>
+#include <signal.h>
+#include <sys/time.h>
+
+
+struct thread_common {
+    struct fuse *f;
+    struct fuse_cmd *cmd;
+    pthread_mutex_t lock;
+    pthread_cond_t cond;
+    int avail;
+};
+
+/* Called with c->lock held */
+static void *do_work(void *data)
+{
+    struct thread_common *c = (struct thread_common *) data;
+    struct fuse *f = c->f;
+
+    c->avail ++;
+    while(1) {
+        int res;
+        struct timespec timeout;
+        struct timeval now;
+        struct fuse_cmd *cmd;
+
+        gettimeofday(&now, NULL);
+        timeout.tv_sec = now.tv_sec + 1;
+        timeout.tv_nsec = now.tv_usec * 1000;
+        
+        res = 0;
+        while(c->cmd == NULL && res != ETIMEDOUT) 
+            res = pthread_cond_timedwait(&c->cond, &c->lock, &timeout);
+        if(res == ETIMEDOUT)
+            break;
+
+        cmd = c->cmd;
+        c->cmd = NULL;
+        c->avail --;
+        pthread_mutex_unlock(&c->lock);
+        __fuse_process_cmd(f, cmd);
+        pthread_mutex_lock(&c->lock);
+        c->avail ++;
+    }
+
+    c->avail --;
+    pthread_mutex_unlock(&c->lock);
+    return NULL;
+}
+
+static void start_thread(struct thread_common *c)
+{
+    pthread_attr_t attr;
+    pthread_t thrid;
+    sigset_t oldset;
+    sigset_t newset;
+    int res;
+    
+    pthread_attr_init(&attr);
+    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+    
+    /* Disallow signal reception in worker threads */
+    sigfillset(&newset);
+    sigprocmask(SIG_SETMASK, &newset, &oldset);
+    res = pthread_create(&thrid, &attr, do_work, c);
+    sigprocmask(SIG_SETMASK, &oldset, NULL);
+    pthread_mutex_lock(&c->lock);
+    if(res != 0) {
+        fprintf(stderr, "Error creating thread: %s\n", strerror(res));
+        exit(1);
+    }
+}
+
+void fuse_loop_mt(struct fuse *f)
+{
+    struct thread_common *c;
+
+    c = (struct thread_common *) malloc(sizeof(struct thread_common));
+    c->f = f;
+    c->cmd = NULL;
+    pthread_cond_init(&c->cond, NULL);
+    pthread_mutex_init(&c->lock, NULL);
+    c->avail = 0;
+
+    while(1) {
+        struct fuse_cmd *cmd = __fuse_read_cmd(f);
+        if(cmd == NULL)
+            exit(1);
+
+        pthread_mutex_lock(&c->lock);
+        c->cmd = cmd;
+        while(c->avail == 0)
+            start_thread(c);
+        pthread_cond_signal(&c->cond);
+        pthread_mutex_unlock(&c->lock);
+    }
+}
index eee4fdaebf46dbd624c756da98a580f80f45cb4d..2f5e79c156041f4fca7121e179381c7002dc83ba 100644 (file)
@@ -68,29 +68,58 @@ static int readlink_func(const char *path, char *link, size_t size) {
        EPILOGUE
 }
 
+static int getdir_add_entry(PyObject *w, fuse_dirh_t dh, fuse_dirfil_t df)
+{
+       PyObject *o0;
+       PyObject *o1;
+       int ret = -EINVAL;
+
+       if(!PySequence_Check(w)) {
+               printf("getdir item not sequence\n");
+               goto out;
+       }
+       if(PySequence_Length(w) != 2) {
+               printf("getdir item not len 2\n");
+               goto out;
+       }
+       o0 = PySequence_GetItem(w, 0);
+       o1 = PySequence_GetItem(w, 1);
+
+       if(!PyString_Check(o0)) {
+               printf("getdir item[0] not string\n");
+               goto out_decref;
+       }
+       if(!PyInt_Check(o1)) {
+               printf("getdir item[1] not int\n");
+               goto out_decref;
+       }
+
+       ret = df(dh, PyString_AsString(o0), PyInt_AsLong(o1));  
+
+out_decref:
+       Py_DECREF(o0);
+       Py_DECREF(o1);
+
+out:
+       return ret;
+}
+
 static int getdir_func(const char *path, fuse_dirh_t dh, fuse_dirfil_t df) {
        PyObject *v = PyObject_CallFunction(getdir_cb, "s", path);
        int i;
        PROLOGUE
 
-       if(!PySequence_Check(v)) { printf("getdir_func not sequence\n");goto OUT_DECREF; }
-       for(i=0; i < PySequence_Length(v); i++) {
-               PyObject *w = PySequence_GetItem(v, i);
-               printf("getdir_func validate %d\n", i);
-               if(!PySequence_Check(w)) { printf("getdir item not sequence\n"); goto OUT_DECREF; }
-               if(PySequence_Length(w) != 2) { printf("getdir item not len 2\n"); goto OUT_DECREF; }
-               if(!PyString_Check(PySequence_GetItem(w,0))){ printf("getdir item[0] not string"); goto OUT_DECREF; }
-               if(!PyInt_Check(PySequence_GetItem(w, 1))) { printf("getdir item[1] not int"); goto OUT_DECREF; }
+       if(!PySequence_Check(v)) {
+               printf("getdir_func not sequence\n");
+               goto OUT_DECREF;
        }
-
        for(i=0; i < PySequence_Length(v); i++) {
                PyObject *w = PySequence_GetItem(v, i);
-               printf("getdir_func %d\n", i);
-               ret = df(dh, PyString_AsString(PySequence_GetItem(w, 0)),
-                       PyInt_AsLong(PySequence_GetItem(w, 1)));        
-               if(ret) goto OUT_DECREF;
+               ret = getdir_add_entry(w, dh, df);
+               Py_DECREF(w);
+               if(ret != 0)
+                       goto OUT_DECREF;
        }
-
        ret = 0;
 
        EPILOGUE
@@ -191,8 +220,6 @@ int open_func(const char *path, int mode) {
 static PyObject *
 Fuse_main(PyObject *self, PyObject *args, PyObject *kw)
 {
-       PyObject *list, *item;
-
        int flags=0;
 
        struct fuse_operations op;
@@ -260,7 +287,5 @@ init_fuse(void)
        d = PyModule_GetDict(m);
        ErrorObject = PyErr_NewException("fuse.error", NULL, NULL);
        PyDict_SetItemString(d, "error", ErrorObject);
-       PyDict_SetItemString(d, "MULTITHREAD", PyInt_FromLong(FUSE_MULTITHREAD));
        PyDict_SetItemString(d, "DEBUG", PyInt_FromLong(FUSE_DEBUG));
-
 }