From: Miklos Szeredi Date: Sun, 3 Sep 2006 18:28:52 +0000 (+0000) Subject: fix X-Git-Tag: fuse_2_6_0_rc1~4 X-Git-Url: http://git.maquefel.me/?a=commitdiff_plain;h=38f152c72decfc8a995c8a9fa7f932f348d0e1e2;p=qemu-gpiodev%2Flibfuse.git fix --- diff --git a/ChangeLog b/ChangeLog index acee040..d35312f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,10 @@ +2006-09-03 Miklos Szeredi + + * lib: Multithreaded loop now allows unlimited number of threads. + This is needed for locking operations which may block + indefinitely. Also the kernel now doesn't limit the number of + outstanding requests so the library shouldn't do so either. + 2006-09-01 Miklos Szeredi * Fix recursive lock bug in interrupt handling diff --git a/lib/Makefile.am b/lib/Makefile.am index 049589c..5206493 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -16,6 +16,7 @@ libfuse_la_SOURCES = \ fuse_loop.c \ fuse_loop_mt.c \ fuse_lowlevel.c \ + fuse_misc.h \ fuse_mt.c \ fuse_opt.c \ fuse_session.c \ diff --git a/lib/fuse.c b/lib/fuse.c index a072f0c..cb492d3 100644 --- a/lib/fuse.c +++ b/lib/fuse.c @@ -10,10 +10,10 @@ /* For pthread_rwlock_t */ #define _GNU_SOURCE -#include "config.h" #include "fuse_i.h" #include "fuse_lowlevel.h" #include "fuse_opt.h" +#include "fuse_misc.h" #include #include @@ -25,7 +25,6 @@ #include #include #include -#include #include #include @@ -110,19 +109,6 @@ static void fuse_do_release(struct fuse *, char *, struct fuse_file_info *); static int fuse_do_opendir(struct fuse *, char *, struct fuse_file_info *); static int fuse_do_statfs(struct fuse *, struct statvfs *); -#ifndef USE_UCLIBC -#define mutex_init(mut) pthread_mutex_init(mut, NULL) -#else -static void mutex_init(pthread_mutex_t *mut) -{ - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP); - pthread_mutex_init(mut, &attr); - pthread_mutexattr_destroy(&attr); -} -#endif - static struct node *get_node_nocheck(struct fuse *f, fuse_ino_t nodeid) { size_t hash = nodeid % f->id_table_size; @@ -1432,7 +1418,7 @@ static void fuse_opendir(fuse_req_t req, fuse_ino_t ino, dh->len = 0; dh->filled = 0; dh->nodeid = ino; - mutex_init(&dh->lock); + fuse_mutex_init(&dh->lock); llfi->fh = (uintptr_t) dh; @@ -2146,7 +2132,7 @@ struct fuse *fuse_new_common(struct fuse_chan *ch, struct fuse_args *args, goto out_free_name_table; } - mutex_init(&f->lock); + fuse_mutex_init(&f->lock); pthread_rwlock_init(&f->tree_lock, NULL); f->compat = compat; diff --git a/lib/fuse_loop_mt.c b/lib/fuse_loop_mt.c index 8327f12..47fca46 100644 --- a/lib/fuse_loop_mt.c +++ b/lib/fuse_loop_mt.c @@ -6,145 +6,131 @@ See the file COPYING.LIB. */ -#include "config.h" #include "fuse_lowlevel.h" +#include "fuse_misc.h" #include #include #include -#include #include #include #include #include -#define FUSE_MAX_WORKERS 10 - struct fuse_worker { + struct fuse_worker *prev; + struct fuse_worker *next; + pthread_t thread_id; + size_t bufsize; + char *buf; + struct fuse_mt *mt; +}; + +struct fuse_mt { pthread_mutex_t lock; int numworker; int numavail; struct fuse_session *se; struct fuse_chan *prevch; - pthread_t threads[FUSE_MAX_WORKERS]; - pthread_t main_thread; + struct fuse_worker main; int exit; int error; }; -struct fuse_wchan { - struct fuse_worker *w; - struct fuse_chan *prevch; -}; - -#ifndef USE_UCLIBC -#define mutex_init(mut) pthread_mutex_init(mut, NULL) -#else -static void mutex_init(pthread_mutex_t *mut) +static void list_add_worker(struct fuse_worker *w, struct fuse_worker *next) { - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP); - pthread_mutex_init(mut, &attr); - pthread_mutexattr_destroy(&attr); + struct fuse_worker *prev = next->prev; + w->next = next; + w->prev = prev; + prev->next = w; + next->prev = w; } -#endif -static int fuse_loop_mt_send(struct fuse_chan *ch, const struct iovec iov[], - size_t count) +static void list_del_worker(struct fuse_worker *w) { - int res; - struct fuse_wchan *wchan_data = (struct fuse_wchan *) fuse_chan_data(ch); - pthread_mutex_lock(&wchan_data->w->lock); - wchan_data->w->numavail ++; - pthread_mutex_unlock(&wchan_data->w->lock); - res = fuse_chan_send(wchan_data->prevch, iov, count); - fuse_chan_destroy(ch); - free(wchan_data); - return res; + struct fuse_worker *prev = w->prev; + struct fuse_worker *next = w->next; + prev->next = next; + next->prev = prev; } -static int start_thread(struct fuse_worker *w, pthread_t *thread_id); +static int fuse_start_thread(struct fuse_mt *mt); -static void *do_work(void *data) +static void *fuse_do_work(void *data) { struct fuse_worker *w = (struct fuse_worker *) data; - size_t bufsize = fuse_chan_bufsize(w->prevch); - char *buf = (char *) malloc(bufsize); - if (!buf) { - fprintf(stderr, "fuse: failed to allocate read buffer\n"); - fuse_session_exit(w->se); - w->error = -1; - return NULL; - } + struct fuse_mt *mt = w->mt; - pthread_cleanup_push(free, buf); - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); - pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); - - while (!fuse_session_exited(w->se)) { - struct fuse_chan *ch = w->prevch; - struct fuse_chan *wchan; - struct fuse_wchan *wchan_data; - struct fuse_chan_ops cop = { .send = fuse_loop_mt_send }; - int res = fuse_chan_recv(&ch, buf, bufsize); + while (!fuse_session_exited(mt->se)) { + struct fuse_chan *ch = mt->prevch; + int res = fuse_chan_recv(&ch, w->buf, w->bufsize); if (res == -EINTR) continue; if (res <= 0) { if (res < 0) { - fuse_session_exit(w->se); - w->error = -1; + fuse_session_exit(mt->se); + mt->error = -1; } break; } - pthread_mutex_lock(&w->lock); - if (w->exit) { - pthread_mutex_unlock(&w->lock); - break; + pthread_mutex_lock(&mt->lock); + if (mt->exit) { + pthread_mutex_unlock(&mt->lock); + return NULL; } - w->numavail--; - if (w->numavail == 0 && w->numworker < FUSE_MAX_WORKERS) { - if (w->numworker < FUSE_MAX_WORKERS) { - /* FIXME: threads should be stored in a list instead - of an array */ - int start_res; - pthread_t *thread_id = &w->threads[w->numworker]; - w->numavail ++; - w->numworker ++; - start_res = start_thread(w, thread_id); - if (start_res == -1) - w->numavail --; + mt->numavail--; + if (mt->numavail == 0) + fuse_start_thread(mt); + pthread_mutex_unlock(&mt->lock); + + fuse_session_process(mt->se, w->buf, res, ch); + + pthread_mutex_lock(&mt->lock); + mt->numavail ++; + if (mt->numavail > 10) { + if (mt->exit) { + pthread_mutex_unlock(&mt->lock); + return NULL; } + list_del_worker(w); + mt->numavail--; + mt->numworker--; + pthread_mutex_unlock(&mt->lock); + + pthread_detach(w->thread_id); + free(w->buf); + free(w); + return NULL; } - pthread_mutex_unlock(&w->lock); - wchan_data = malloc(sizeof(struct fuse_wchan)); - wchan = fuse_chan_new(&cop, -1, fuse_chan_bufsize(ch), wchan_data); - if (!wchan_data || !wchan) { - free(wchan_data); - fuse_session_exit(w->se); - w->error = -1; - break; - } - wchan_data->w = w; - wchan_data->prevch = ch; - fuse_session_process(w->se, buf, res, wchan); + pthread_mutex_unlock(&mt->lock); } - pthread_cleanup_pop(1); - if (pthread_self() != w->main_thread) { - pthread_kill(w->main_thread, SIGTERM); - pause(); - } + pthread_kill(mt->main.thread_id, SIGHUP); + pause(); return NULL; } -static int start_thread(struct fuse_worker *w, pthread_t *thread_id) +static int fuse_start_thread(struct fuse_mt *mt) { sigset_t oldset; sigset_t newset; int res; + struct fuse_worker *w = malloc(sizeof(struct fuse_worker)); + if (!w) { + fprintf(stderr, "fuse: failed to allocate worker structure\n"); + return -1; + } + memset(w, 0, sizeof(struct fuse_worker)); + w->bufsize = fuse_chan_bufsize(mt->prevch); + w->buf = malloc(w->bufsize); + w->mt = mt; + if (!w->buf) { + fprintf(stderr, "fuse: failed to allocate read buffer\n"); + free(w); + return -1; + } /* Disallow signal reception in worker threads */ sigemptyset(&newset); @@ -153,47 +139,61 @@ static int start_thread(struct fuse_worker *w, pthread_t *thread_id) sigaddset(&newset, SIGHUP); sigaddset(&newset, SIGQUIT); pthread_sigmask(SIG_BLOCK, &newset, &oldset); - res = pthread_create(thread_id, NULL, do_work, w); + res = pthread_create(&w->thread_id, NULL, fuse_do_work, w); pthread_sigmask(SIG_SETMASK, &oldset, NULL); if (res != 0) { fprintf(stderr, "fuse: error creating thread: %s\n", strerror(res)); return -1; } + list_add_worker(w, &mt->main); + mt->numavail ++; + mt->numworker ++; return 0; } +static void fuse_join_worker(struct fuse_mt *mt, struct fuse_worker *w) +{ + pthread_join(w->thread_id, NULL); + pthread_mutex_lock(&mt->lock); + list_del_worker(w); + pthread_mutex_unlock(&mt->lock); + free(w->buf); + free(w); +} + int fuse_session_loop_mt(struct fuse_session *se) { - int i; int err; + struct fuse_mt mt; struct fuse_worker *w; - w = (struct fuse_worker *) malloc(sizeof(struct fuse_worker)); - if (w == NULL) { - fprintf(stderr, "fuse: failed to allocate worker structure\n"); - return -1; - } - memset(w, 0, sizeof(struct fuse_worker)); - w->se = se; - w->prevch = fuse_session_next_chan(se, NULL); - w->error = 0; - w->numworker = 1; - w->numavail = 1; - w->main_thread = pthread_self(); - mutex_init(&w->lock); - - do_work(w); - - pthread_mutex_lock(&w->lock); - for (i = 1; i < w->numworker; i++) - pthread_cancel(w->threads[i]); - w->exit = 1; - pthread_mutex_unlock(&w->lock); - for (i = 1; i < w->numworker; i++) - pthread_join(w->threads[i], NULL); - pthread_mutex_destroy(&w->lock); - err = w->error; - free(w); + + memset(&mt, 0, sizeof(struct fuse_mt)); + mt.se = se; + mt.prevch = fuse_session_next_chan(se, NULL); + mt.error = 0; + mt.numworker = 0; + mt.numavail = 0; + mt.main.thread_id = pthread_self(); + mt.main.prev = mt.main.next = &mt.main; + fuse_mutex_init(&mt.lock); + + pthread_mutex_lock(&mt.lock); + fuse_start_thread(&mt); + pthread_mutex_unlock(&mt.lock); + while (!fuse_session_exited(se)) + pause(); + + for (w = mt.main.next; w != &mt.main; w = w->next) + pthread_cancel(w->thread_id); + mt.exit = 1; + pthread_mutex_unlock(&mt.lock); + + while (mt.main.next != &mt.main) + fuse_join_worker(&mt, mt.main.next); + + pthread_mutex_destroy(&mt.lock); + err = mt.error; fuse_session_reset(se); return err; } diff --git a/lib/fuse_lowlevel.c b/lib/fuse_lowlevel.c index cd1e571..8550165 100644 --- a/lib/fuse_lowlevel.c +++ b/lib/fuse_lowlevel.c @@ -6,11 +6,11 @@ See the file COPYING.LIB */ -#include "config.h" #include "fuse_lowlevel.h" #include "fuse_kernel.h" #include "fuse_opt.h" #include "fuse_i.h" +#include "fuse_misc.h" #include #include @@ -19,7 +19,6 @@ #include #include #include -#include #define PARAM(inarg) (((char *)(inarg)) + sizeof(*(inarg))) #define OFFSET_MAX 0x7fffffffffffffffLL @@ -58,20 +57,6 @@ struct fuse_ll { pthread_mutex_t lock; }; -#ifndef USE_UCLIBC -#define mutex_init(mut) pthread_mutex_init(mut, NULL) -#else -static void mutex_init(pthread_mutex_t *mut) -{ - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP); - pthread_mutex_init(mut, &attr); - pthread_mutexattr_destroy(&attr); -} -#endif - - static void convert_stat(const struct stat *stbuf, struct fuse_attr *attr) { attr->ino = stbuf->st_ino; @@ -1183,7 +1168,7 @@ struct fuse_session *fuse_lowlevel_new_common(struct fuse_args *args, f->conn.max_readahead = UINT_MAX; list_init_req(&f->list); list_init_req(&f->interrupts); - mutex_init(&f->lock); + fuse_mutex_init(&f->lock); if (fuse_opt_parse(args, f, fuse_ll_opts, fuse_ll_opt_proc) == -1) goto out_free; diff --git a/lib/fuse_misc.h b/lib/fuse_misc.h new file mode 100644 index 0000000..3a02655 --- /dev/null +++ b/lib/fuse_misc.h @@ -0,0 +1,27 @@ +/* + FUSE: Filesystem in Userspace + Copyright (C) 2001-2006 Miklos Szeredi + + This program can be distributed under the terms of the GNU LGPL. + See the file COPYING.LIB +*/ + +#include "config.h" +#include + +#ifndef USE_UCLIBC +#define fuse_mutex_init(mut) pthread_mutex_init(mut, NULL) +#else +static inline void fuse_mutex_init(pthread_mutex_t *mut) +{ + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ADAPTIVE_NP); + pthread_mutex_init(mut, &attr); + pthread_mutexattr_destroy(&attr); +} +#endif + + + +