From f830a7f84dda931307785dd9653c3627c9cc6386 Mon Sep 17 00:00:00 2001 From: Miklos Szeredi Date: Fri, 16 Nov 2001 17:46:45 +0000 Subject: [PATCH] multithreading works in Python --- example/fusexmp.c | 8 +++ include/fuse.h | 5 +- lib/fuse_mt.c | 92 ++++++++----------------------- python/_fusemodule.c | 126 ++++++++++++++++++++++++++++++++----------- python/fuse.py | 9 ++++ 5 files changed, 137 insertions(+), 103 deletions(-) diff --git a/example/fusexmp.c b/example/fusexmp.c index 2bcdb37..1f1f2a3 100644 --- a/example/fusexmp.c +++ b/example/fusexmp.c @@ -1,3 +1,11 @@ +/* + FUSE: Filesystem in Userspace + Copyright (C) 2001 Miklos Szeredi (mszeredi@inf.bme.hu) + + This program can be distributed under the terms of the GNU GPL. + See the file COPYING. +*/ + #ifdef linux /* For pread()/pwrite() */ #define _XOPEN_SOURCE 500 diff --git a/include/fuse.h b/include/fuse.h index 0bf09e1..abdb45b 100644 --- a/include/fuse.h +++ b/include/fuse.h @@ -140,7 +140,8 @@ void fuse_destroy(struct fuse *f); * ----------------------------------------------------------- */ struct fuse_cmd; - +typedef void (*fuse_processor_t)(struct fuse *, struct fuse_cmd *, void *); struct fuse_cmd *__fuse_read_cmd(struct fuse *f); - void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd); +void __fuse_loop_mt(struct fuse *f, fuse_processor_t proc, void *data); + diff --git a/lib/fuse_mt.c b/lib/fuse_mt.c index 6afc3bc..19cc33c 100644 --- a/lib/fuse_mt.c +++ b/lib/fuse_mt.c @@ -11,108 +11,62 @@ #include #include #include -#include #include -#include #include -#include -struct thread_common { +struct fuse_thr_data { struct fuse *f; + void *data; + fuse_processor_t proc; struct fuse_cmd *cmd; - pthread_mutex_t lock; - pthread_cond_t cond; - sem_t started; - int avail; }; static void *do_work(void *data) { - struct thread_common *c = (struct thread_common *) data; - - pthread_mutex_lock(&c->lock); - sem_post(&c->started); - c->avail ++; - - while(1) { - int res; - struct timespec timeout; - struct timeval now; - struct fuse_cmd *cmd; - - gettimeofday(&now, NULL); - timeout.tv_sec = now.tv_sec + 1; - timeout.tv_nsec = now.tv_usec * 1000; - - res = 0; - while(c->cmd == NULL && res != ETIMEDOUT) - res = pthread_cond_timedwait(&c->cond, &c->lock, &timeout); - if(res == ETIMEDOUT) - break; - - cmd = c->cmd; - c->cmd = NULL; - c->avail --; - pthread_mutex_unlock(&c->lock); - __fuse_process_cmd(c->f, cmd); - pthread_mutex_lock(&c->lock); - c->avail ++; - } - - c->avail --; - pthread_mutex_unlock(&c->lock); + struct fuse_thr_data *d = (struct fuse_thr_data *) data; + d->proc(d->f, d->cmd, d->data); + free(d); return NULL; } -static void start_thread(struct thread_common *c) +static void start_thread(struct fuse_thr_data *d) { - pthread_attr_t attr; pthread_t thrid; sigset_t oldset; sigset_t newset; int res; - pthread_mutex_unlock(&c->lock); - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - /* Disallow signal reception in worker threads */ sigfillset(&newset); pthread_sigmask(SIG_SETMASK, &newset, &oldset); - res = pthread_create(&thrid, &attr, do_work, c); + res = pthread_create(&thrid, NULL, do_work, d); pthread_sigmask(SIG_SETMASK, &oldset, NULL); if(res != 0) { fprintf(stderr, "Error creating thread: %s\n", strerror(res)); exit(1); } - - sem_wait(&c->started); - pthread_mutex_lock(&c->lock); + pthread_detach(thrid); } -void fuse_loop_mt(struct fuse *f) +void __fuse_loop_mt(struct fuse *f, fuse_processor_t proc, void *data) { - struct thread_common *c; - - c = (struct thread_common *) malloc(sizeof(struct thread_common)); - c->f = f; - c->cmd = NULL; - pthread_cond_init(&c->cond, NULL); - pthread_mutex_init(&c->lock, NULL); - sem_init(&c->started, 0, 0); - c->avail = 0; - while(1) { + struct fuse_thr_data *d; struct fuse_cmd *cmd = __fuse_read_cmd(f); if(cmd == NULL) exit(1); - pthread_mutex_lock(&c->lock); - while(c->avail == 0) - start_thread(c); - c->cmd = cmd; - pthread_cond_signal(&c->cond); - pthread_mutex_unlock(&c->lock); + d = malloc(sizeof(struct fuse_thr_data)); + d->proc = proc; + d->f = f; + d->cmd = cmd; + d->data = data; + + start_thread(d); } } + +void fuse_loop_mt(struct fuse *f) +{ + __fuse_loop_mt(f, (fuse_processor_t) __fuse_process_cmd, NULL); +} diff --git a/python/_fusemodule.c b/python/_fusemodule.c index 2f5e79c..d4dd0ff 100644 --- a/python/_fusemodule.c +++ b/python/_fusemodule.c @@ -1,16 +1,19 @@ -#include "Python.h" -#include "fuse.h" -#include +/* + Copyright (C) 2001 Jeff Epler -static PyObject *ErrorObject; + This program can be distributed under the terms of the GNU GPL. + See the file COPYING. +*/ -PyObject *getattr_cb=NULL, *readlink_cb=NULL, *getdir_cb=NULL, - *mknod_cb=NULL, *mkdir_cb=NULL, *unlink_cb=NULL, *rmdir_cb=NULL, - *symlink_cb=NULL, *rename_cb=NULL, *link_cb=NULL, *chmod_cb=NULL, - *chown_cb=NULL, *truncate_cb=NULL, *utime_cb=NULL, - *open_cb=NULL, *read_cb=NULL, *write_cb=NULL; -struct fuse *fuse=NULL; +#include +#include +#include +static PyObject *getattr_cb=NULL, *readlink_cb=NULL, *getdir_cb=NULL, + *mknod_cb=NULL, *mkdir_cb=NULL, *unlink_cb=NULL, *rmdir_cb=NULL, + *symlink_cb=NULL, *rename_cb=NULL, *link_cb=NULL, *chmod_cb=NULL, + *chown_cb=NULL, *truncate_cb=NULL, *utime_cb=NULL, + *open_cb=NULL, *read_cb=NULL, *write_cb=NULL; #define PROLOGUE \ int ret = -EINVAL; \ @@ -23,7 +26,8 @@ struct fuse *fuse=NULL; Py_DECREF(v); \ OUT: \ return ret; -static int getattr_func(const char *path, struct stat *st) { +static int getattr_func(const char *path, struct stat *st) +{ int i; PyObject *v = PyObject_CallFunction(getattr_cb, "s", path); PROLOGUE @@ -54,7 +58,8 @@ static int getattr_func(const char *path, struct stat *st) { EPILOGUE } -static int readlink_func(const char *path, char *link, size_t size) { +static int readlink_func(const char *path, char *link, size_t size) +{ PyObject *v = PyObject_CallFunction(readlink_cb, "s", path); char *s; PROLOGUE @@ -104,7 +109,8 @@ out: return ret; } -static int getdir_func(const char *path, fuse_dirh_t dh, fuse_dirfil_t df) { +static int getdir_func(const char *path, fuse_dirh_t dh, fuse_dirfil_t df) +{ PyObject *v = PyObject_CallFunction(getdir_cb, "s", path); int i; PROLOGUE @@ -125,67 +131,77 @@ static int getdir_func(const char *path, fuse_dirh_t dh, fuse_dirfil_t df) { EPILOGUE } -int mknod_func(const char *path, mode_t m, dev_t d) { +static int mknod_func(const char *path, mode_t m, dev_t d) +{ PyObject *v = PyObject_CallFunction(mknod_cb, "sii", path, m, d); PROLOGUE EPILOGUE } -int mkdir_func(const char *path, mode_t m) { +static int mkdir_func(const char *path, mode_t m) +{ PyObject *v = PyObject_CallFunction(mkdir_cb, "si", path, m); PROLOGUE EPILOGUE } -int unlink_func(const char *path) { +static int unlink_func(const char *path) +{ PyObject *v = PyObject_CallFunction(unlink_cb, "s", path); PROLOGUE EPILOGUE } -int rmdir_func(const char *path) { +static int rmdir_func(const char *path) +{ PyObject *v = PyObject_CallFunction(rmdir_cb, "s", path); PROLOGUE EPILOGUE } -int symlink_func(const char *path, const char *path1) { +static int symlink_func(const char *path, const char *path1) +{ PyObject *v = PyObject_CallFunction(symlink_cb, "ss", path, path1); PROLOGUE EPILOGUE } -int rename_func(const char *path, const char *path1) { +static int rename_func(const char *path, const char *path1) +{ PyObject *v = PyObject_CallFunction(rename_cb, "ss", path, path1); PROLOGUE EPILOGUE } -int link_func(const char *path, const char *path1) { +static int link_func(const char *path, const char *path1) +{ PyObject *v = PyObject_CallFunction(link_cb, "ss", path, path1); PROLOGUE EPILOGUE } -int chmod_func(const char *path, mode_t m) { +static int chmod_func(const char *path, mode_t m) +{ PyObject *v = PyObject_CallFunction(chmod_cb, "si", path, m); PROLOGUE EPILOGUE } -int chown_func(const char *path, uid_t u, gid_t g) { +static int chown_func(const char *path, uid_t u, gid_t g) +{ PyObject *v = PyObject_CallFunction(chown_cb, "sii", path, u, g); PROLOGUE EPILOGUE } -int truncate_func(const char *path, off_t o) { +static int truncate_func(const char *path, off_t o) +{ PyObject *v = PyObject_CallFunction(truncate_cb, "si", path, o); PROLOGUE EPILOGUE } -int utime_func(const char *path, struct utimbuf *u) { +static int utime_func(const char *path, struct utimbuf *u) { int actime = u ? u->actime : time(NULL); int modtime = u ? u->modtime : actime; PyObject *v = PyObject_CallFunction(utime_cb, "s(ii)", @@ -194,7 +210,8 @@ int utime_func(const char *path, struct utimbuf *u) { EPILOGUE } -int read_func(const char *path, char *buf, size_t s, off_t off) { +static int read_func(const char *path, char *buf, size_t s, off_t off) +{ PyObject *v = PyObject_CallFunction(read_cb, "sii", path, s, off); PROLOGUE if(PyString_Check(v)) { @@ -205,22 +222,55 @@ int read_func(const char *path, char *buf, size_t s, off_t off) { EPILOGUE } -int write_func(const char *path, const char *buf, size_t t, off_t off) { +static int write_func(const char *path, const char *buf, size_t t, off_t off) +{ PyObject *v = PyObject_CallFunction(write_cb,"ss#i", path, buf, t, off); PROLOGUE EPILOGUE } -int open_func(const char *path, int mode) { +static int open_func(const char *path, int mode) +{ PyObject *v = PyObject_CallFunction(open_cb, "si", path, mode); PROLOGUE EPILOGUE } +static void process_cmd(struct fuse *f, struct fuse_cmd *cmd, void *data) +{ + PyInterpreterState *interp = (PyInterpreterState *) data; + PyThreadState *state; + + PyEval_AcquireLock(); + state = PyThreadState_New(interp); + PyThreadState_Swap(state); + __fuse_process_cmd(f, cmd); + PyThreadState_Clear(state); + PyThreadState_Swap(NULL); + PyThreadState_Delete(state); + PyEval_ReleaseLock(); +} + +static void pyfuse_loop_mt(struct fuse *f) +{ + PyInterpreterState *interp; + PyThreadState *save; + + PyEval_InitThreads(); + interp = PyThreadState_Get()->interp; + save = PyEval_SaveThread(); + __fuse_loop_mt(f, process_cmd, interp); + /* Not yet reached: */ + PyEval_RestoreThread(save); +} + + static PyObject * Fuse_main(PyObject *self, PyObject *args, PyObject *kw) { int flags=0; + int multithreaded=0; + static struct fuse *fuse=NULL; struct fuse_operations op; @@ -228,13 +278,13 @@ Fuse_main(PyObject *self, PyObject *args, PyObject *kw) "getattr", "readlink", "getdir", "mknod", "mkdir", "unlink", "rmdir", "symlink", "rename", "link", "chmod", "chown", "truncate", "utime", - "open", "read", "write", "flags", NULL}; + "open", "read", "write", "flags", "multithreaded", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kw, "|OOOOOOOOOOOOOOOOOi", + if (!PyArg_ParseTupleAndKeywords(args, kw, "|OOOOOOOOOOOOOOOOOii", kwlist, &getattr_cb, &readlink_cb, &getdir_cb, &mknod_cb, &mkdir_cb, &unlink_cb, &rmdir_cb, &symlink_cb, &rename_cb, &link_cb, &chmod_cb, &chown_cb, &truncate_cb, &utime_cb, - &open_cb, &read_cb, &write_cb, &flags)) + &open_cb, &read_cb, &write_cb, &flags, &multithreaded)) return NULL; #define DO_ONE_ATTR(name) if(name ## _cb) { Py_INCREF(name ## _cb); op.name = name ## _func; } else { op.name = NULL; } @@ -259,7 +309,10 @@ Fuse_main(PyObject *self, PyObject *args, PyObject *kw) fuse = fuse_new(0, flags); fuse_set_operations(fuse, &op); - fuse_loop(fuse); + if(multithreaded) + pyfuse_loop_mt(fuse); + else + fuse_loop(fuse); Py_INCREF(Py_None); return Py_None; @@ -279,7 +332,8 @@ DL_EXPORT(void) init_fuse(void) { PyObject *m, *d; - + static PyObject *ErrorObject; + /* Create the module and add the functions */ m = Py_InitModule("_fuse", Fuse_methods); @@ -289,3 +343,11 @@ init_fuse(void) PyDict_SetItemString(d, "error", ErrorObject); PyDict_SetItemString(d, "DEBUG", PyInt_FromLong(FUSE_DEBUG)); } + + +/* + * Local Variables: + * indent-tabs-mode: t + * c-basic-offset: 8 + * End: + */ diff --git a/python/fuse.py b/python/fuse.py index ec40262..dff6f47 100644 --- a/python/fuse.py +++ b/python/fuse.py @@ -1,4 +1,10 @@ #!/usr/bin/env python +# +# Copyright (C) 2001 Jeff Epler +# +# This program can be distributed under the terms of the GNU GPL. +# See the file COPYING. +# from _fuse import main, DEBUG import os @@ -23,8 +29,10 @@ class Fuse: 'chown', 'truncate', 'utime', 'open', 'read', 'write'] flags = 0 + multithreaded = 0 def main(self): d = {'flags': self.flags} + d['multithreaded'] = self.multithreaded for a in self._attrs: if hasattr(self,a): d[a] = ErrnoWrapper(getattr(self, a)) @@ -98,4 +106,5 @@ class Xmp(Fuse): if __name__ == '__main__': server = Xmp() server.flags = DEBUG + server.multithreaded = 1; server.main() -- 2.30.2