From 33232032423dcc06716537204f1995afa5a73940 Mon Sep 17 00:00:00 2001
From: Miklos Szeredi <miklos@szeredi.hu>
Date: Mon, 19 Nov 2001 17:55:51 +0000
Subject: [PATCH] multithreading improvements

---
 BUGS               |  3 ++
 kernel/Makefile.am |  9 ++++-
 kernel/dev.c       | 18 +++++-----
 lib/fuse.c         | 12 ++++++-
 lib/fuse_i.h       |  2 ++
 lib/fuse_mt.c      | 25 +++++++++-----
 python/Makefile    |  7 +---
 python/fuse.py     | 72 ----------------------------------------
 python/xmp.py      | 83 ++++++++++++++++++++++++++++++++++++++++++++++
 9 files changed, 134 insertions(+), 97 deletions(-)
 create mode 100755 python/xmp.py

diff --git a/BUGS b/BUGS
index df5f428..cd56648 100644
--- a/BUGS
+++ b/BUGS
@@ -2,3 +2,6 @@
 
 - When a non-directory is mounted the root inode is not filled in, only at
   the first getattr
+
+- I want really low priority for my cached pages.  Can they start out
+  'old' so they will be thrown out on the first oportunity?
diff --git a/kernel/Makefile.am b/kernel/Makefile.am
index 2ea0e3b..033c8cb 100644
--- a/kernel/Makefile.am
+++ b/kernel/Makefile.am
@@ -25,7 +25,6 @@ uninstall-local:
 clean-local:
 	rm -f *.o *.s
 
-
 .c.o:
 	$(CC) $(CFLAGS) $(CPPFAGS) -c $<
 
@@ -33,3 +32,11 @@ fuse_objs = dev.o dir.o file.o inode.o util.o
 
 fuse.o: $(fuse_objs)
 	ld -r -o fuse.o $(fuse_objs)
+
+fuse_headers = fuse_i.h ../include/linux/fuse.h
+
+dev.o: $(fuse_headers)
+dir.o: $(fuse_headers)
+file.o: $(fuse_headers)
+inode.o: $(fuse_headers)
+util.o: $(fuse_headers)
diff --git a/kernel/dev.c b/kernel/dev.c
index 4395736..1bff1eb 100644
--- a/kernel/dev.c
+++ b/kernel/dev.c
@@ -246,15 +246,17 @@ static ssize_t fuse_dev_read(struct file *file, char *buf, size_t nbytes,
 
 	ret = copy_in_args(req->in, buf, nbytes);
 	spin_lock(&fuse_lock);
-	if(req->issync || ret < 0) {
-		if(ret < 0) 
-			list_add_tail(&req->list, &fc->pending);
+	if(req->issync) {
+		if(ret < 0) {
+			req->out->h.error = -EPROTO;
+			req->finished = 1;
+		}
 		else {
 			list_add_tail(&req->list, &fc->processing);
 			req->sent = 1;
 		}
 		req->locked = 0;
-		if(req->interrupted)
+		if(ret < 0 || req->interrupted)
 			wake_up(&req->waitq);
 		
 		req = NULL;
@@ -395,16 +397,15 @@ static ssize_t fuse_dev_write(struct file *file, const char *buf,
 
 	spin_lock(&fuse_lock);
 	if(err)
-		list_add_tail(&fc->processing, &req->list);
+		req->out->h.error = -EPROTO;
 	else {
 		/* fget() needs to be done in this context */
 		if(req->in->h.opcode == FUSE_GETDIR && !oh.error)
 			process_getdir(req);
-		req->finished = 1;
 	}	
+	req->finished = 1;
 	req->locked = 0;
-	if(!err || req->interrupted)
-		wake_up(&req->waitq);
+	wake_up(&req->waitq);
 	spin_unlock(&fuse_lock);
 
 	if(!err)
@@ -473,6 +474,7 @@ static void end_requests(struct fuse_conn *fc, struct list_head *head)
 		list_del_init(&req->list);
 		if(req->issync) {
 			req->out->h.error = -ECONNABORTED;
+			req->finished = 1;
 			wake_up(&req->waitq);
 		}
 		else
diff --git a/lib/fuse.c b/lib/fuse.c
index 2ed5169..b1b0749 100644
--- a/lib/fuse.c
+++ b/lib/fuse.c
@@ -319,7 +319,11 @@ static void send_reply_raw(struct fuse *f, char *outbuf, size_t outsize)
                out->error, strerror(-out->error), outsize);
         fflush(stdout);
     }
-                
+          
+    pthread_mutex_lock(&f->lock);
+    f->numavail ++;
+    pthread_mutex_unlock(&f->lock);
+      
     res = write(f->fd, outbuf, outsize);
     if(res == -1) {
         /* ENOENT means the operation was interrupted */
@@ -762,6 +766,10 @@ void __fuse_process_cmd(struct fuse *f, struct fuse_cmd *cmd)
     void *inarg = cmd->buf + sizeof(struct fuse_in_header);
     size_t argsize;
 
+    pthread_mutex_lock(&f->lock);
+    f->numavail --;
+    pthread_mutex_unlock(&f->lock);
+
     if((f->flags & FUSE_DEBUG)) {
         printf("unique: %i, opcode: %i, ino: %li, insize: %i\n", in->unique,
                in->opcode, in->ino, cmd->buflen);
@@ -904,6 +912,8 @@ struct fuse *fuse_new(int fd, int flags)
     f->ino_table = (struct node **)
         calloc(1, sizeof(struct node *) * f->ino_table_size);
     pthread_mutex_init(&f->lock, NULL);
+    f->numworker = 0;
+    f->numavail = 0;
 
     root = (struct node *) calloc(1, sizeof(struct node));
     root->mode = 0;
diff --git a/lib/fuse_i.h b/lib/fuse_i.h
index 6740608..6e1453e 100644
--- a/lib/fuse_i.h
+++ b/lib/fuse_i.h
@@ -33,6 +33,8 @@ struct fuse {
     size_t ino_table_size;
     fino_t ctr;
     pthread_mutex_t lock;
+    int numworker;
+    int numavail;
 };
 
 struct fuse_dirhandle {
diff --git a/lib/fuse_mt.c b/lib/fuse_mt.c
index 1dc4dcd..d33682c 100644
--- a/lib/fuse_mt.c
+++ b/lib/fuse_mt.c
@@ -6,7 +6,7 @@
     See the file COPYING.
 */
 
-#include "fuse.h"
+#include "fuse_i.h"
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -16,7 +16,7 @@
 #include <errno.h>
 #include <sys/time.h>
 
-#define FUSE_NUM_WORKERS 5
+#define FUSE_MAX_WORKERS 10
 
 struct fuse_worker {
     struct fuse *f;
@@ -24,17 +24,27 @@ struct fuse_worker {
     fuse_processor_t proc;
 };
 
+static void start_thread(struct fuse_worker *w);
+
 static void *do_work(void *data)
 {
     struct fuse_worker *w = (struct fuse_worker *) data;
-    
+    struct fuse *f = w->f;
+
     while(1) {
         struct fuse_cmd *cmd = __fuse_read_cmd(w->f);
         if(cmd == NULL)
             exit(1);
 
-        w->proc(w->f, cmd, w->data);
+        if(f->numavail == 0 && f->numworker < FUSE_MAX_WORKERS) {
+            pthread_mutex_lock(&f->lock);
+            f->numavail ++;
+            f->numworker ++;
+            pthread_mutex_unlock(&f->lock);
+            start_thread(w);
+        }
 
+        w->proc(w->f, cmd, w->data);
     }
 
     return NULL;
@@ -46,7 +56,7 @@ static void start_thread(struct fuse_worker *w)
     sigset_t oldset;
     sigset_t newset;
     int res;
-    
+
     /* Disallow signal reception in worker threads */
     sigfillset(&newset);
     pthread_sigmask(SIG_SETMASK, &newset, &oldset);
@@ -62,16 +72,13 @@ static void start_thread(struct fuse_worker *w)
 void __fuse_loop_mt(struct fuse *f, fuse_processor_t proc, void *data)
 {
     struct fuse_worker *w;
-    int i;
 
     w = malloc(sizeof(struct fuse_worker));    
     w->f = f;
     w->data = data;
     w->proc = proc;
 
-    for(i = 1; i < FUSE_NUM_WORKERS; i++)
-        start_thread(w);
-
+    f->numworker = 1;
     do_work(w);
 }
 
diff --git a/python/Makefile b/python/Makefile
index dfaf492..8b7ea80 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -1,10 +1,5 @@
 _fusemodule.so: _fusemodule.c
 	gcc -g3 -I/usr/include/python1.5 _fusemodule.c -Wl,-shared -o _fusemodule.so -Wimplicit -lfuse && python -c 'import _fuse'
 
-demo: _fusemodule.so
-	-sudo umount tmp
-	fusermount tmp ./fuse.py
-
-
 clean:
-	rm _fusemodule.so *.pyc *.pyo
+	rm -f _fusemodule.so *.pyc *.pyo
diff --git a/python/fuse.py b/python/fuse.py
index 74b2380..ec1e633 100644
--- a/python/fuse.py
+++ b/python/fuse.py
@@ -1,4 +1,3 @@
-#!/usr/bin/env python
 #
 #    Copyright (C) 2001  Jeff Epler  <jepler@unpythonic.dhs.org>
 #
@@ -8,7 +7,6 @@
 
 from _fuse import main, DEBUG
 import os
-from stat import *
 from errno import *
 
 class ErrnoWrapper:
@@ -38,73 +36,3 @@ class Fuse:
 				d[a] = ErrnoWrapper(getattr(self, a))
 		apply(main, (), d)
 
-class Xmp(Fuse):
-	flags = 1
-
-	def getattr(self, path):
-		return os.lstat(path)
-
-	def readlink(self, path):
-		return os.readlink(path)
-
-	def getdir(self, path):
-		return map(lambda x: (x,0), os.listdir(path))
-
-	def unlink(self, path):
-		return os.unlink(path)
-
-	def rmdir(self, path):
-		return os.rmdir(path)
-
-	def symlink(self, path, path1):
-		return os.symlink(path, path1)
-
-	def rename(self, path, path1):
-		return os.rename(path, path1)
-
-	def link(self, path, path1):
-		return os.link(path, path1)
-
-	def chmod(self, path, mode):
-		return os.chmod(path, mode)
-
-	def chown(self, path, user, group):
-		return os.chown(path, user, group)
-
-	def truncate(self, path, size):
-		f = open(path, "w+")
-		return f.truncate(size)
-
-	def mknod(self, path, mode, dev):
-		""" Python has no os.mknod, so we can only do some things """
-		if S_ISREG(mode):
-			open(path, "w")
-		else:
-			return -EINVAL
-
-	def mkdir(self, path, mode):
-		return os.mkdir(path, mode)
-
-	def utime(self, path, times):
-		return os.utime(path, times)
-
-	def open(self, path, flags):
-		os.close(os.open(path, flags))
-		return 0
-
-	def read(self, path, len, offset):
-		f = open(path, "r")
-		f.seek(offset)
-		return f.read(len)
-
-	def write(self, path, buf, off):
-		f = open(path, "r+")
-		f.seek(off)
-		f.write(buf)
-		return len(buf)
-
-if __name__ == '__main__':
-	server = Xmp()
-	server.flags = 0
-	server.multithreaded = 1;
-	server.main()
diff --git a/python/xmp.py b/python/xmp.py
new file mode 100755
index 0000000..271e269
--- /dev/null
+++ b/python/xmp.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+#
+#    Copyright (C) 2001  Jeff Epler  <jepler@unpythonic.dhs.org>
+#
+#    This program can be distributed under the terms of the GNU GPL.
+#    See the file COPYING.
+#
+
+from fuse import Fuse
+import os
+from errno import *
+from stat import *
+
+class Xmp(Fuse):
+	flags = 1
+
+	def getattr(self, path):
+		return os.lstat(path)
+
+	def readlink(self, path):
+		return os.readlink(path)
+
+	def getdir(self, path):
+		return map(lambda x: (x,0), os.listdir(path))
+
+	def unlink(self, path):
+		return os.unlink(path)
+
+	def rmdir(self, path):
+		return os.rmdir(path)
+
+	def symlink(self, path, path1):
+		return os.symlink(path, path1)
+
+	def rename(self, path, path1):
+		return os.rename(path, path1)
+
+	def link(self, path, path1):
+		return os.link(path, path1)
+
+	def chmod(self, path, mode):
+		return os.chmod(path, mode)
+
+	def chown(self, path, user, group):
+		return os.chown(path, user, group)
+
+	def truncate(self, path, size):
+		f = open(path, "w+")
+		return f.truncate(size)
+
+	def mknod(self, path, mode, dev):
+		""" Python has no os.mknod, so we can only do some things """
+		if S_ISREG(mode):
+			open(path, "w")
+		else:
+			return -EINVAL
+
+	def mkdir(self, path, mode):
+		return os.mkdir(path, mode)
+
+	def utime(self, path, times):
+		return os.utime(path, times)
+
+	def open(self, path, flags):
+		os.close(os.open(path, flags))
+		return 0
+
+	def read(self, path, len, offset):
+		f = open(path, "r")
+		f.seek(offset)
+		return f.read(len)
+
+	def write(self, path, buf, off):
+		f = open(path, "r+")
+		f.seek(off)
+		f.write(buf)
+		return len(buf)
+
+if __name__ == '__main__':
+	server = Xmp()
+	server.flags = 0
+	server.multithreaded = 1;
+	server.main()
-- 
2.30.2