multithreading works in Python
authorMiklos Szeredi <miklos@szeredi.hu>
Fri, 16 Nov 2001 17:46:45 +0000 (17:46 +0000)
committerMiklos Szeredi <miklos@szeredi.hu>
Fri, 16 Nov 2001 17:46:45 +0000 (17:46 +0000)
example/fusexmp.c
include/fuse.h
lib/fuse_mt.c
python/_fusemodule.c
python/fuse.py

index 2bcdb37c5b38323d038b20cafc5ad3d599b7a52f..1f1f2a32ee5f069ad60e6144abd9f848d02a3d72 100644 (file)
@@ -1,3 +1,11 @@
+/*
+    FUSE: Filesystem in Userspace
+    Copyright (C) 2001  Miklos Szeredi (mszeredi@inf.bme.hu)
+
+    This program can be distributed under the terms of the GNU GPL.
+    See the file COPYING.
+*/
+
 #ifdef linux
 /* For pread()/pwrite() */
 #define _XOPEN_SOURCE 500
index 0bf09e1e253e6abcaf2db7d2a3bf5064a6efc6aa..abdb45b139027a1f7496433d93140224796110f6 100644 (file)
@@ -140,7 +140,8 @@ void fuse_destroy(struct fuse *f);
  * ----------------------------------------------------------- */
 
 struct fuse_cmd;
-
+typedef void (*fuse_processor_t)(struct fuse *, struct fuse_cmd *, void *);
 struct fuse_cmd *__fuse_read_cmd(struct fuse *f);
-
 void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd);
+void __fuse_loop_mt(struct fuse *f, fuse_processor_t proc, void *data);
+
index 6afc3bc4547512ece59dd2ff4adc270058e3daa0..19cc33cd6eafdb5d5e2d050e79cde3d279160659 100644 (file)
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
-#include <errno.h>
 #include <pthread.h>
-#include <semaphore.h>
 #include <signal.h>
-#include <sys/time.h>
 
-struct thread_common {
+struct fuse_thr_data {
     struct fuse *f;
+    void *data;
+    fuse_processor_t proc;
     struct fuse_cmd *cmd;
-    pthread_mutex_t lock;
-    pthread_cond_t cond;
-    sem_t started;
-    int avail;
 };
 
 static void *do_work(void *data)
 {
-    struct thread_common *c = (struct thread_common *) data;
-
-    pthread_mutex_lock(&c->lock);
-    sem_post(&c->started);
-    c->avail ++;
-
-    while(1) {
-        int res;
-        struct timespec timeout;
-        struct timeval now;
-        struct fuse_cmd *cmd;
-
-        gettimeofday(&now, NULL);
-        timeout.tv_sec = now.tv_sec + 1;
-        timeout.tv_nsec = now.tv_usec * 1000;
-        
-        res = 0;
-        while(c->cmd == NULL && res != ETIMEDOUT) 
-            res = pthread_cond_timedwait(&c->cond, &c->lock, &timeout);
-        if(res == ETIMEDOUT)
-            break;
-
-        cmd = c->cmd;
-        c->cmd = NULL;
-        c->avail --;
-        pthread_mutex_unlock(&c->lock);
-        __fuse_process_cmd(c->f, cmd);
-        pthread_mutex_lock(&c->lock);
-        c->avail ++;
-    }
-
-    c->avail --;
-    pthread_mutex_unlock(&c->lock);    
+    struct fuse_thr_data *d = (struct fuse_thr_data *) data;
+    d->proc(d->f, d->cmd, d->data);
+    free(d);
     return NULL;
 }
 
-static void start_thread(struct thread_common *c)
+static void start_thread(struct fuse_thr_data *d)
 {
-    pthread_attr_t attr;
     pthread_t thrid;
     sigset_t oldset;
     sigset_t newset;
     int res;
     
-    pthread_mutex_unlock(&c->lock);
-
-    pthread_attr_init(&attr);
-    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-    
     /* Disallow signal reception in worker threads */
     sigfillset(&newset);
     pthread_sigmask(SIG_SETMASK, &newset, &oldset);
-    res = pthread_create(&thrid, &attr, do_work, c);
+    res = pthread_create(&thrid, NULL, do_work, d);
     pthread_sigmask(SIG_SETMASK, &oldset, NULL);
     if(res != 0) {
         fprintf(stderr, "Error creating thread: %s\n", strerror(res));
         exit(1);
     }
-
-    sem_wait(&c->started);
-    pthread_mutex_lock(&c->lock);
+    pthread_detach(thrid);
 }
 
-void fuse_loop_mt(struct fuse *f)
+void __fuse_loop_mt(struct fuse *f, fuse_processor_t proc, void *data)
 {
-    struct thread_common *c;
-
-    c = (struct thread_common *) malloc(sizeof(struct thread_common));
-    c->f = f;
-    c->cmd = NULL;
-    pthread_cond_init(&c->cond, NULL);
-    pthread_mutex_init(&c->lock, NULL);
-    sem_init(&c->started, 0, 0);
-    c->avail = 0;
-
     while(1) {
+        struct fuse_thr_data *d;
         struct fuse_cmd *cmd = __fuse_read_cmd(f);
         if(cmd == NULL)
             exit(1);
 
-        pthread_mutex_lock(&c->lock);
-        while(c->avail == 0)
-            start_thread(c);
-        c->cmd = cmd;
-        pthread_cond_signal(&c->cond);
-        pthread_mutex_unlock(&c->lock);
+        d = malloc(sizeof(struct fuse_thr_data));
+        d->proc = proc;
+        d->f = f;
+        d->cmd = cmd;
+        d->data = data;
+        
+        start_thread(d);
     }
 }
+
+void fuse_loop_mt(struct fuse *f)
+{
+    __fuse_loop_mt(f, (fuse_processor_t) __fuse_process_cmd, NULL);
+}
index 2f5e79c156041f4fca7121e179381c7002dc83ba..d4dd0ff627bfd77066faa94aea992efb505a0dfc 100644 (file)
@@ -1,16 +1,19 @@
-#include "Python.h"
-#include "fuse.h"
-#include <time.h>
+/*
+    Copyright (C) 2001  Jeff Epler  <jepler@unpythonic.dhs.org>
 
-static PyObject *ErrorObject;
+    This program can be distributed under the terms of the GNU GPL.
+    See the file COPYING.
+*/
 
-PyObject *getattr_cb=NULL, *readlink_cb=NULL, *getdir_cb=NULL,
-        *mknod_cb=NULL, *mkdir_cb=NULL, *unlink_cb=NULL, *rmdir_cb=NULL,
-        *symlink_cb=NULL, *rename_cb=NULL, *link_cb=NULL, *chmod_cb=NULL,
-        *chown_cb=NULL, *truncate_cb=NULL, *utime_cb=NULL,
-        *open_cb=NULL, *read_cb=NULL, *write_cb=NULL;
-struct fuse *fuse=NULL;
+#include <Python.h>
+#include <fuse.h>
+#include <time.h>
 
+static PyObject *getattr_cb=NULL, *readlink_cb=NULL, *getdir_cb=NULL,
+       *mknod_cb=NULL, *mkdir_cb=NULL, *unlink_cb=NULL, *rmdir_cb=NULL,
+       *symlink_cb=NULL, *rename_cb=NULL, *link_cb=NULL, *chmod_cb=NULL,
+       *chown_cb=NULL, *truncate_cb=NULL, *utime_cb=NULL,
+       *open_cb=NULL, *read_cb=NULL, *write_cb=NULL;
 
 #define PROLOGUE \
        int ret = -EINVAL; \
@@ -23,7 +26,8 @@ struct fuse *fuse=NULL;
                Py_DECREF(v); \
        OUT: \
                return ret; 
-static int getattr_func(const char *path, struct stat *st) {
+static int getattr_func(const char *path, struct stat *st)
+{
        int i;
        PyObject *v = PyObject_CallFunction(getattr_cb, "s", path);
        PROLOGUE
@@ -54,7 +58,8 @@ static int getattr_func(const char *path, struct stat *st) {
        EPILOGUE
 }
 
-static int readlink_func(const char *path, char *link, size_t size) {
+static int readlink_func(const char *path, char *link, size_t size)
+{
        PyObject *v = PyObject_CallFunction(readlink_cb, "s", path);
        char *s;
        PROLOGUE
@@ -104,7 +109,8 @@ out:
        return ret;
 }
 
-static int getdir_func(const char *path, fuse_dirh_t dh, fuse_dirfil_t df) {
+static int getdir_func(const char *path, fuse_dirh_t dh, fuse_dirfil_t df)
+{
        PyObject *v = PyObject_CallFunction(getdir_cb, "s", path);
        int i;
        PROLOGUE
@@ -125,67 +131,77 @@ static int getdir_func(const char *path, fuse_dirh_t dh, fuse_dirfil_t df) {
        EPILOGUE
 }
 
-int mknod_func(const char *path, mode_t m, dev_t d) {
+static int mknod_func(const char *path, mode_t m, dev_t d)
+{
        PyObject *v = PyObject_CallFunction(mknod_cb, "sii", path, m, d);
        PROLOGUE
        EPILOGUE
 }
 
-int mkdir_func(const char *path, mode_t m) {
+static int mkdir_func(const char *path, mode_t m)
+{
        PyObject *v = PyObject_CallFunction(mkdir_cb, "si", path, m);
        PROLOGUE
        EPILOGUE
 }
 
-int unlink_func(const char *path) {
+static int unlink_func(const char *path)
+{
        PyObject *v = PyObject_CallFunction(unlink_cb, "s", path);
        PROLOGUE
        EPILOGUE
 }
 
-int rmdir_func(const char *path) {
+static int rmdir_func(const char *path)
+{
        PyObject *v = PyObject_CallFunction(rmdir_cb, "s", path);
        PROLOGUE
        EPILOGUE
 }
 
-int symlink_func(const char *path, const char *path1) {
+static int symlink_func(const char *path, const char *path1)
+{
        PyObject *v = PyObject_CallFunction(symlink_cb, "ss", path, path1);
        PROLOGUE
        EPILOGUE
 }
 
-int rename_func(const char *path, const char *path1) {
+static int rename_func(const char *path, const char *path1)
+{
        PyObject *v = PyObject_CallFunction(rename_cb, "ss", path, path1);
        PROLOGUE
        EPILOGUE
 }
 
-int link_func(const char *path, const char *path1) {
+static int link_func(const char *path, const char *path1)
+{
        PyObject *v = PyObject_CallFunction(link_cb, "ss", path, path1);
        PROLOGUE
        EPILOGUE
 }
 
-int chmod_func(const char *path, mode_t m) {
+static int chmod_func(const char *path, mode_t m) 
+{
        PyObject *v = PyObject_CallFunction(chmod_cb, "si", path, m);
        PROLOGUE
        EPILOGUE
 }
 
-int chown_func(const char *path, uid_t u, gid_t g) {
+static int chown_func(const char *path, uid_t u, gid_t g) 
+{
        PyObject *v = PyObject_CallFunction(chown_cb, "sii", path, u, g);
        PROLOGUE
        EPILOGUE
 }
 
-int truncate_func(const char *path, off_t o) {
+static int truncate_func(const char *path, off_t o)
+{
        PyObject *v = PyObject_CallFunction(truncate_cb, "si", path, o);
        PROLOGUE
        EPILOGUE
 }
 
-int utime_func(const char *path, struct utimbuf *u) {
+static int utime_func(const char *path, struct utimbuf *u) {
        int actime = u ? u->actime : time(NULL);
        int modtime = u ? u->modtime : actime;
        PyObject *v = PyObject_CallFunction(utime_cb, "s(ii)",
@@ -194,7 +210,8 @@ int utime_func(const char *path, struct utimbuf *u) {
        EPILOGUE
 }
 
-int read_func(const char *path, char *buf, size_t s, off_t off) {
+static int read_func(const char *path, char *buf, size_t s, off_t off)
+{
        PyObject *v = PyObject_CallFunction(read_cb, "sii", path, s, off);
        PROLOGUE
        if(PyString_Check(v)) {
@@ -205,22 +222,55 @@ int read_func(const char *path, char *buf, size_t s, off_t off) {
        EPILOGUE
 }
 
-int write_func(const char *path, const char *buf, size_t t, off_t off) {
+static int write_func(const char *path, const char *buf, size_t t, off_t off)
+{
        PyObject *v = PyObject_CallFunction(write_cb,"ss#i", path, buf, t, off);
        PROLOGUE
        EPILOGUE
 }
 
-int open_func(const char *path, int mode) {
+static int open_func(const char *path, int mode)
+{
        PyObject *v = PyObject_CallFunction(open_cb, "si", path, mode);
        PROLOGUE
        EPILOGUE
 }
 
+static void process_cmd(struct fuse *f, struct fuse_cmd *cmd, void *data)
+{
+       PyInterpreterState *interp = (PyInterpreterState *) data;
+       PyThreadState *state;
+
+       PyEval_AcquireLock();
+       state = PyThreadState_New(interp);
+       PyThreadState_Swap(state);
+       __fuse_process_cmd(f, cmd);
+       PyThreadState_Clear(state);
+       PyThreadState_Swap(NULL);
+       PyThreadState_Delete(state);
+       PyEval_ReleaseLock();
+}
+
+static void pyfuse_loop_mt(struct fuse *f)
+{
+       PyInterpreterState *interp;
+       PyThreadState *save;
+
+       PyEval_InitThreads();
+       interp = PyThreadState_Get()->interp;
+       save = PyEval_SaveThread();
+       __fuse_loop_mt(f, process_cmd, interp);
+       /* Not yet reached: */
+       PyEval_RestoreThread(save);
+}
+
+
 static PyObject *
 Fuse_main(PyObject *self, PyObject *args, PyObject *kw)
 {
        int flags=0;
+       int multithreaded=0;
+       static struct fuse *fuse=NULL;
 
        struct fuse_operations op;
 
@@ -228,13 +278,13 @@ Fuse_main(PyObject *self, PyObject *args, PyObject *kw)
                "getattr", "readlink", "getdir", "mknod",
                "mkdir", "unlink", "rmdir", "symlink", "rename",
                "link", "chmod", "chown", "truncate", "utime",
-               "open", "read", "write", "flags", NULL};
+               "open", "read", "write", "flags", "multithreaded", NULL};
                                
-       if (!PyArg_ParseTupleAndKeywords(args, kw, "|OOOOOOOOOOOOOOOOOi", 
+       if (!PyArg_ParseTupleAndKeywords(args, kw, "|OOOOOOOOOOOOOOOOOii", 
                kwlist, &getattr_cb, &readlink_cb, &getdir_cb, &mknod_cb,
                &mkdir_cb, &unlink_cb, &rmdir_cb, &symlink_cb, &rename_cb,
                &link_cb, &chmod_cb, &chown_cb, &truncate_cb, &utime_cb,
-               &open_cb, &read_cb, &write_cb, &flags))
+               &open_cb, &read_cb, &write_cb, &flags, &multithreaded))
                return NULL;
        
 #define DO_ONE_ATTR(name) if(name ## _cb) { Py_INCREF(name ## _cb); op.name = name ## _func; } else { op.name = NULL; }
@@ -259,7 +309,10 @@ Fuse_main(PyObject *self, PyObject *args, PyObject *kw)
 
        fuse = fuse_new(0, flags);
        fuse_set_operations(fuse, &op); 
-       fuse_loop(fuse);
+       if(multithreaded)
+               pyfuse_loop_mt(fuse);
+       else
+               fuse_loop(fuse);
 
        Py_INCREF(Py_None);
        return Py_None;
@@ -279,7 +332,8 @@ DL_EXPORT(void)
 init_fuse(void)
 {
        PyObject *m, *d;
-
+       static PyObject *ErrorObject;
        /* Create the module and add the functions */
        m = Py_InitModule("_fuse", Fuse_methods);
 
@@ -289,3 +343,11 @@ init_fuse(void)
        PyDict_SetItemString(d, "error", ErrorObject);
        PyDict_SetItemString(d, "DEBUG", PyInt_FromLong(FUSE_DEBUG));
 }
+
+
+/* 
+ * Local Variables:
+ * indent-tabs-mode: t
+ * c-basic-offset: 8
+ * End:
+ */
index ec40262494901851587684d3e9d72d76da2888ed..dff6f470cc4cfc14d15dc2a7592bd052a1b0a15e 100644 (file)
@@ -1,4 +1,10 @@
 #!/usr/bin/env python
+#
+#    Copyright (C) 2001  Jeff Epler  <jepler@unpythonic.dhs.org>
+#
+#    This program can be distributed under the terms of the GNU GPL.
+#    See the file COPYING.
+#
 
 from _fuse import main, DEBUG
 import os
@@ -23,8 +29,10 @@ class Fuse:
                  'chown', 'truncate', 'utime', 'open', 'read', 'write']
 
        flags = 0
+       multithreaded = 0
        def main(self):
                d = {'flags': self.flags}
+               d['multithreaded'] = self.multithreaded
                for a in self._attrs:
                        if hasattr(self,a):
                                d[a] = ErrnoWrapper(getattr(self, a))
@@ -98,4 +106,5 @@ class Xmp(Fuse):
 if __name__ == '__main__':
        server = Xmp()
        server.flags = DEBUG
+       server.multithreaded = 1;
        server.main()