multithreading improvements
authorMiklos Szeredi <miklos@szeredi.hu>
Mon, 19 Nov 2001 17:55:51 +0000 (17:55 +0000)
committerMiklos Szeredi <miklos@szeredi.hu>
Mon, 19 Nov 2001 17:55:51 +0000 (17:55 +0000)
BUGS
kernel/Makefile.am
kernel/dev.c
lib/fuse.c
lib/fuse_i.h
lib/fuse_mt.c
python/Makefile
python/fuse.py
python/xmp.py [new file with mode: 0755]

diff --git a/BUGS b/BUGS
index df5f428fa597552cde4fbaab508903a9eb677187..cd5664836396917492d7839d48d88064326c96b2 100644 (file)
--- a/BUGS
+++ b/BUGS
@@ -2,3 +2,6 @@
 
 - When a non-directory is mounted the root inode is not filled in, only at
   the first getattr
+
+- I want really low priority for my cached pages.  Can they start out
+  'old' so they will be thrown out on the first oportunity?
index 2ea0e3ba8504b3e41d6bd2d51a37643c4c7858d6..033c8cb3352e8af1e13874a6c735dd0c196d1c51 100644 (file)
@@ -25,7 +25,6 @@ uninstall-local:
 clean-local:
        rm -f *.o *.s
 
-
 .c.o:
        $(CC) $(CFLAGS) $(CPPFAGS) -c $<
 
@@ -33,3 +32,11 @@ fuse_objs = dev.o dir.o file.o inode.o util.o
 
 fuse.o: $(fuse_objs)
        ld -r -o fuse.o $(fuse_objs)
+
+fuse_headers = fuse_i.h ../include/linux/fuse.h
+
+dev.o: $(fuse_headers)
+dir.o: $(fuse_headers)
+file.o: $(fuse_headers)
+inode.o: $(fuse_headers)
+util.o: $(fuse_headers)
index 4395736a0d58b293c7129b712e26187d918f95e9..1bff1eb6c0ff8322214976d9fb611d30c1ba026f 100644 (file)
@@ -246,15 +246,17 @@ static ssize_t fuse_dev_read(struct file *file, char *buf, size_t nbytes,
 
        ret = copy_in_args(req->in, buf, nbytes);
        spin_lock(&fuse_lock);
-       if(req->issync || ret < 0) {
-               if(ret < 0) 
-                       list_add_tail(&req->list, &fc->pending);
+       if(req->issync) {
+               if(ret < 0) {
+                       req->out->h.error = -EPROTO;
+                       req->finished = 1;
+               }
                else {
                        list_add_tail(&req->list, &fc->processing);
                        req->sent = 1;
                }
                req->locked = 0;
-               if(req->interrupted)
+               if(ret < 0 || req->interrupted)
                        wake_up(&req->waitq);
                
                req = NULL;
@@ -395,16 +397,15 @@ static ssize_t fuse_dev_write(struct file *file, const char *buf,
 
        spin_lock(&fuse_lock);
        if(err)
-               list_add_tail(&fc->processing, &req->list);
+               req->out->h.error = -EPROTO;
        else {
                /* fget() needs to be done in this context */
                if(req->in->h.opcode == FUSE_GETDIR && !oh.error)
                        process_getdir(req);
-               req->finished = 1;
        }       
+       req->finished = 1;
        req->locked = 0;
-       if(!err || req->interrupted)
-               wake_up(&req->waitq);
+       wake_up(&req->waitq);
        spin_unlock(&fuse_lock);
 
        if(!err)
@@ -473,6 +474,7 @@ static void end_requests(struct fuse_conn *fc, struct list_head *head)
                list_del_init(&req->list);
                if(req->issync) {
                        req->out->h.error = -ECONNABORTED;
+                       req->finished = 1;
                        wake_up(&req->waitq);
                }
                else
index 2ed516906caa87e9919023d73731e1cb1ab5225b..b1b074903c18f903f065aa53793b26c98d2a633e 100644 (file)
@@ -319,7 +319,11 @@ static void send_reply_raw(struct fuse *f, char *outbuf, size_t outsize)
                out->error, strerror(-out->error), outsize);
         fflush(stdout);
     }
-                
+          
+    pthread_mutex_lock(&f->lock);
+    f->numavail ++;
+    pthread_mutex_unlock(&f->lock);
+      
     res = write(f->fd, outbuf, outsize);
     if(res == -1) {
         /* ENOENT means the operation was interrupted */
@@ -762,6 +766,10 @@ void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd)
     void *inarg = cmd->buf + sizeof(struct fuse_in_header);
     size_t argsize;
 
+    pthread_mutex_lock(&f->lock);
+    f->numavail --;
+    pthread_mutex_unlock(&f->lock);
+
     if((f->flags & FUSE_DEBUG)) {
         printf("unique: %i, opcode: %i, ino: %li, insize: %i\n", in->unique,
                in->opcode, in->ino, cmd->buflen);
@@ -904,6 +912,8 @@ struct fuse *fuse_new(int fd, int flags)
     f->ino_table = (struct node **)
         calloc(1, sizeof(struct node *) * f->ino_table_size);
     pthread_mutex_init(&f->lock, NULL);
+    f->numworker = 0;
+    f->numavail = 0;
 
     root = (struct node *) calloc(1, sizeof(struct node));
     root->mode = 0;
index 67406081cff80f9c2ebd1feac05cbda0fc76c458..6e1453e72190a0e7e99134c9e1248af2fb90527b 100644 (file)
@@ -33,6 +33,8 @@ struct fuse {
     size_t ino_table_size;
     fino_t ctr;
     pthread_mutex_t lock;
+    int numworker;
+    int numavail;
 };
 
 struct fuse_dirhandle {
index 1dc4dcd0ecdb04bde40e5183b1a29e3a7946b389..d33682c9494332755972e029d14425775f418ff5 100644 (file)
@@ -6,7 +6,7 @@
     See the file COPYING.
 */
 
-#include "fuse.h"
+#include "fuse_i.h"
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -16,7 +16,7 @@
 #include <errno.h>
 #include <sys/time.h>
 
-#define FUSE_NUM_WORKERS 5
+#define FUSE_MAX_WORKERS 10
 
 struct fuse_worker {
     struct fuse *f;
@@ -24,17 +24,27 @@ struct fuse_worker {
     fuse_processor_t proc;
 };
 
+static void start_thread(struct fuse_worker *w);
+
 static void *do_work(void *data)
 {
     struct fuse_worker *w = (struct fuse_worker *) data;
-    
+    struct fuse *f = w->f;
+
     while(1) {
         struct fuse_cmd *cmd = __fuse_read_cmd(w->f);
         if(cmd == NULL)
             exit(1);
 
-        w->proc(w->f, cmd, w->data);
+        if(f->numavail == 0 && f->numworker < FUSE_MAX_WORKERS) {
+            pthread_mutex_lock(&f->lock);
+            f->numavail ++;
+            f->numworker ++;
+            pthread_mutex_unlock(&f->lock);
+            start_thread(w);
+        }
 
+        w->proc(w->f, cmd, w->data);
     }
 
     return NULL;
@@ -46,7 +56,7 @@ static void start_thread(struct fuse_worker *w)
     sigset_t oldset;
     sigset_t newset;
     int res;
-    
+
     /* Disallow signal reception in worker threads */
     sigfillset(&newset);
     pthread_sigmask(SIG_SETMASK, &newset, &oldset);
@@ -62,16 +72,13 @@ static void start_thread(struct fuse_worker *w)
 void __fuse_loop_mt(struct fuse *f, fuse_processor_t proc, void *data)
 {
     struct fuse_worker *w;
-    int i;
 
     w = malloc(sizeof(struct fuse_worker));    
     w->f = f;
     w->data = data;
     w->proc = proc;
 
-    for(i = 1; i < FUSE_NUM_WORKERS; i++)
-        start_thread(w);
-
+    f->numworker = 1;
     do_work(w);
 }
 
index dfaf49248f3f146be091dcf6eaceb6c4790032e9..8b7ea807f371bea701084591456cb679de68fb68 100644 (file)
@@ -1,10 +1,5 @@
 _fusemodule.so: _fusemodule.c
        gcc -g3 -I/usr/include/python1.5 _fusemodule.c -Wl,-shared -o _fusemodule.so -Wimplicit -lfuse && python -c 'import _fuse'
 
-demo: _fusemodule.so
-       -sudo umount tmp
-       fusermount tmp ./fuse.py
-
-
 clean:
-       rm _fusemodule.so *.pyc *.pyo
+       rm -f _fusemodule.so *.pyc *.pyo
index 74b238001ffd353c14d85a90810e068371f682d3..ec1e6330efd25ec4000cb2aca7634f59a6680dbf 100644 (file)
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 #
 #    Copyright (C) 2001  Jeff Epler  <jepler@unpythonic.dhs.org>
 #
@@ -8,7 +7,6 @@
 
 from _fuse import main, DEBUG
 import os
-from stat import *
 from errno import *
 
 class ErrnoWrapper:
@@ -38,73 +36,3 @@ class Fuse:
                                d[a] = ErrnoWrapper(getattr(self, a))
                apply(main, (), d)
 
-class Xmp(Fuse):
-       flags = 1
-
-       def getattr(self, path):
-               return os.lstat(path)
-
-       def readlink(self, path):
-               return os.readlink(path)
-
-       def getdir(self, path):
-               return map(lambda x: (x,0), os.listdir(path))
-
-       def unlink(self, path):
-               return os.unlink(path)
-
-       def rmdir(self, path):
-               return os.rmdir(path)
-
-       def symlink(self, path, path1):
-               return os.symlink(path, path1)
-
-       def rename(self, path, path1):
-               return os.rename(path, path1)
-
-       def link(self, path, path1):
-               return os.link(path, path1)
-
-       def chmod(self, path, mode):
-               return os.chmod(path, mode)
-
-       def chown(self, path, user, group):
-               return os.chown(path, user, group)
-
-       def truncate(self, path, size):
-               f = open(path, "w+")
-               return f.truncate(size)
-
-       def mknod(self, path, mode, dev):
-               """ Python has no os.mknod, so we can only do some things """
-               if S_ISREG(mode):
-                       open(path, "w")
-               else:
-                       return -EINVAL
-
-       def mkdir(self, path, mode):
-               return os.mkdir(path, mode)
-
-       def utime(self, path, times):
-               return os.utime(path, times)
-
-       def open(self, path, flags):
-               os.close(os.open(path, flags))
-               return 0
-
-       def read(self, path, len, offset):
-               f = open(path, "r")
-               f.seek(offset)
-               return f.read(len)
-
-       def write(self, path, buf, off):
-               f = open(path, "r+")
-               f.seek(off)
-               f.write(buf)
-               return len(buf)
-
-if __name__ == '__main__':
-       server = Xmp()
-       server.flags = 0
-       server.multithreaded = 1;
-       server.main()
diff --git a/python/xmp.py b/python/xmp.py
new file mode 100755 (executable)
index 0000000..271e269
--- /dev/null
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+#
+#    Copyright (C) 2001  Jeff Epler  <jepler@unpythonic.dhs.org>
+#
+#    This program can be distributed under the terms of the GNU GPL.
+#    See the file COPYING.
+#
+
+from fuse import Fuse
+import os
+from errno import *
+from stat import *
+
+class Xmp(Fuse):
+       flags = 1
+
+       def getattr(self, path):
+               return os.lstat(path)
+
+       def readlink(self, path):
+               return os.readlink(path)
+
+       def getdir(self, path):
+               return map(lambda x: (x,0), os.listdir(path))
+
+       def unlink(self, path):
+               return os.unlink(path)
+
+       def rmdir(self, path):
+               return os.rmdir(path)
+
+       def symlink(self, path, path1):
+               return os.symlink(path, path1)
+
+       def rename(self, path, path1):
+               return os.rename(path, path1)
+
+       def link(self, path, path1):
+               return os.link(path, path1)
+
+       def chmod(self, path, mode):
+               return os.chmod(path, mode)
+
+       def chown(self, path, user, group):
+               return os.chown(path, user, group)
+
+       def truncate(self, path, size):
+               f = open(path, "w+")
+               return f.truncate(size)
+
+       def mknod(self, path, mode, dev):
+               """ Python has no os.mknod, so we can only do some things """
+               if S_ISREG(mode):
+                       open(path, "w")
+               else:
+                       return -EINVAL
+
+       def mkdir(self, path, mode):
+               return os.mkdir(path, mode)
+
+       def utime(self, path, times):
+               return os.utime(path, times)
+
+       def open(self, path, flags):
+               os.close(os.open(path, flags))
+               return 0
+
+       def read(self, path, len, offset):
+               f = open(path, "r")
+               f.seek(offset)
+               return f.read(len)
+
+       def write(self, path, buf, off):
+               f = open(path, "r+")
+               f.seek(off)
+               f.write(buf)
+               return len(buf)
+
+if __name__ == '__main__':
+       server = Xmp()
+       server.flags = 0
+       server.multithreaded = 1;
+       server.main()