Add fuse_reply_fd() reply function to the low level interface
authorMiklos Szeredi <miklos@szeredi.hu>
Thu, 17 Jun 2010 11:54:26 +0000 (11:54 +0000)
committerMiklos Szeredi <miklos@szeredi.hu>
Thu, 17 Jun 2010 11:54:26 +0000 (11:54 +0000)
ChangeLog
include/fuse_common.h
include/fuse_lowlevel.h
lib/fuse_i.h
lib/fuse_lowlevel.c
lib/fuse_versionscript

index 30e228e22d71c0cbf7920a860b42cbea7d25b1f8..f7f9db2b76f3ca5aac7e8fe98e016332be2e7664 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,15 @@
+2010-06-17  Miklos Szeredi <miklos@szeredi.hu>
+
+       * Add fuse_reply_fd() reply function to the low level interface.
+       On linux version 2.6.35 or greater this will use splice() to move
+       data directly from a file descriptor to the fuse device without
+       needing to go though a userspace buffer.  With the
+       FUSE_REPLY_FD_MOVE flag the kernel will attempt to move the data
+       directly into the filesystem's cache.  On earlier kernels it will
+       fall back to an intermediate buffer.  The options
+       "no_splice_write" and "no_splice_move" can be used to disable
+       splicing and moving respectively.
 2010-06-15  Miklos Szeredi <miklos@szeredi.hu>
 
        * Fix out-of-source build.  Patch by Jörg Faschingbauer
index c263f6f1fba1fda933ff83f330a8e19df33c1702..c547ac8ee0088b18e057761de95664737a0a2c70 100644 (file)
@@ -89,6 +89,8 @@ struct fuse_file_info {
  * FUSE_CAP_EXPORT_SUPPORT: filesystem handles lookups of "." and ".."
  * FUSE_CAP_BIG_WRITES: filesystem can handle write size larger than 4kB
  * FUSE_CAP_DONT_MASK: don't apply umask to file mode on create operations
+ * FUSE_CAP_SPLICE_WRITE: ability to use splice() to write to the fuse device
+ * FUSE_CAP_SPLICE_MOVE: ability to move data to the fuse device with splice()
  */
 #define FUSE_CAP_ASYNC_READ    (1 << 0)
 #define FUSE_CAP_POSIX_LOCKS   (1 << 1)
@@ -96,6 +98,8 @@ struct fuse_file_info {
 #define FUSE_CAP_EXPORT_SUPPORT        (1 << 4)
 #define FUSE_CAP_BIG_WRITES    (1 << 5)
 #define FUSE_CAP_DONT_MASK     (1 << 6)
+#define FUSE_CAP_SPLICE_WRITE  (1 << 7)
+#define FUSE_CAP_SPLICE_MOVE   (1 << 8)
 
 /**
  * Ioctl flags
index 2855e51d868ab3acf42ad3e1c026845174b50b44..ad17b07eb4926f9b18e7e8c83d343e8944cd2508 100644 (file)
@@ -124,6 +124,14 @@ struct fuse_ctx {
 #define FUSE_SET_ATTR_ATIME_NOW        (1 << 7)
 #define FUSE_SET_ATTR_MTIME_NOW        (1 << 8)
 
+/**
+ * flags for fuse_reply_fd()
+ *
+ * FUSE_REPLY_FD_MOVE: attempt to move the data instead of copying
+ *                     (see SPLICE_F_MOVE flag for splice(2)
+ */
+#define FUSE_REPLY_FD_MOVE     (1 << 0)
+
 /* ----------------------------------------------------------- *
  * Request methods and replies                                *
  * ----------------------------------------------------------- */
@@ -412,6 +420,7 @@ struct fuse_lowlevel_ops {
         * Valid replies:
         *   fuse_reply_buf
         *   fuse_reply_iov
+        *   fuse_reply_fd
         *   fuse_reply_err
         *
         * @param req request handle
@@ -561,6 +570,7 @@ struct fuse_lowlevel_ops {
         *
         * Valid replies:
         *   fuse_reply_buf
+        *   fuse_reply_fd
         *   fuse_reply_err
         *
         * @param req request handle
@@ -646,6 +656,7 @@ struct fuse_lowlevel_ops {
         *
         * Valid replies:
         *   fuse_reply_buf
+        *   fuse_reply_fd
         *   fuse_reply_xattr
         *   fuse_reply_err
         *
@@ -672,6 +683,7 @@ struct fuse_lowlevel_ops {
         *
         * Valid replies:
         *   fuse_reply_buf
+        *   fuse_reply_fd
         *   fuse_reply_xattr
         *   fuse_reply_err
         *
@@ -995,6 +1007,22 @@ int fuse_reply_write(fuse_req_t req, size_t count);
  */
 int fuse_reply_buf(fuse_req_t req, const char *buf, size_t size);
 
+/**
+ * Reply with data copied/moved from a file descriptor
+ *
+ * Possible requests:
+ *   read, readdir, getxattr, listxattr
+ *
+ * @param req request handle
+ * @param fd file descriptor
+ * @param off offset pointer, may be NULL
+ * @param len length of data in bytes
+ * @param flags FUSE_REPLY_FD_* flags
+ * @return zero for success, -errno for failure to send reply
+ */
+int fuse_reply_fd(fuse_req_t req, int fd, loff_t *off, size_t len,
+                 unsigned int flags);
+
 /**
  * Reply with data vector
  *
index 7b991255fe5bae48b7009c8ed0d257f0592f0af9..edd66f8a3188f10afbf4b6d76c279191e8c82330 100644 (file)
@@ -49,6 +49,8 @@ struct fuse_ll {
        int atomic_o_trunc;
        int no_remote_lock;
        int big_writes;
+       int no_splice_write;
+       int no_splice_move;
        struct fuse_lowlevel_ops op;
        int got_init;
        struct cuse_data *cuse_data;
@@ -59,6 +61,7 @@ struct fuse_ll {
        struct fuse_req interrupts;
        pthread_mutex_t lock;
        int got_destroy;
+       pthread_key_t pipe_key;
 };
 
 struct fuse_cmd {
index c519bfbbb73bc2aa8335792247775b6a148eb281..76eaa3f2dc34baec8e1676d69443d932025e5b69 100644 (file)
@@ -6,6 +6,8 @@
   See the file COPYING.LIB
 */
 
+#define _GNU_SOURCE
+
 #include "fuse_i.h"
 #include "fuse_kernel.h"
 #include "fuse_opt.h"
 #include <limits.h>
 #include <errno.h>
 
+#ifndef F_LINUX_SPECIFIC_BASE
+#define F_LINUX_SPECIFIC_BASE       1024
+#endif
+#ifndef F_SETPIPE_SZ
+#define F_SETPIPE_SZ   (F_LINUX_SPECIFIC_BASE + 7)
+#endif
+
+
 #define PARAM(inarg) (((char *)(inarg)) + sizeof(*(inarg)))
 #define OFFSET_MAX 0x7fffffffffffffffLL
 
@@ -177,7 +187,7 @@ int fuse_reply_iov(fuse_req_t req, const struct iovec *iov, int count)
 
        padded_iov = malloc((count + 1) * sizeof(struct iovec));
        if (padded_iov == NULL)
-               return fuse_reply_err(req, -ENOMEM);
+               return fuse_reply_err(req, ENOMEM);
 
        memcpy(padded_iov + 1, iov, count * sizeof(struct iovec));
        count++;
@@ -375,6 +385,168 @@ int fuse_reply_buf(fuse_req_t req, const char *buf, size_t size)
        return send_reply_ok(req, buf, size);
 }
 
+struct fuse_ll_pipe {
+       size_t size;
+       int can_grow;
+       int pipe[2];
+};
+
+static void fuse_ll_pipe_free(struct fuse_ll_pipe *llp)
+{
+       close(llp->pipe[0]);
+       close(llp->pipe[1]);
+       free(llp);
+}
+
+static struct fuse_ll_pipe *fuse_ll_get_pipe(struct fuse_ll *f)
+{
+       struct fuse_ll_pipe *llp = pthread_getspecific(f->pipe_key);
+       if (llp == NULL) {
+               int res;
+
+               llp = malloc(sizeof(struct fuse_ll_pipe));
+               if (llp == NULL)
+                       return NULL;
+
+               res = pipe(llp->pipe);
+               if (res == -1) {
+                       free(llp);
+                       return NULL;
+               }
+               /* the default size is 16 pages on linux
+                */
+               llp->size = getpagesize() * 16;
+               llp->can_grow = 1;
+
+               pthread_setspecific(f->pipe_key, llp);
+       }
+
+       return llp;
+}
+
+int fuse_reply_fd(fuse_req_t req, int fd, loff_t *off, size_t len,
+                 unsigned int flags)
+{
+       int res;
+       void *buf;
+       struct fuse_out_header out;
+       struct iovec iov;
+       struct fuse_ll_pipe *llp;
+       int splice_flags;
+       size_t pipesize;
+
+       static size_t pagesize = 0;
+       if (!pagesize)
+               pagesize = getpagesize();
+
+       if (req->f->conn.proto_minor < 14 ||
+           !(req->f->conn.want & FUSE_CAP_SPLICE_WRITE))
+               goto fallback;
+
+       llp = fuse_ll_get_pipe(req->f);
+       if (llp == NULL)
+               goto fallback;
+
+
+       /*
+        * Heuristic for the required pipe size, does not work if the
+        * source contains less than page size fragments
+        */
+       pipesize = pagesize * 2 + len;
+
+       if (llp->size < pipesize) {
+               if (llp->can_grow) {
+                       res = fcntl(llp->pipe[0], F_SETPIPE_SZ, pipesize);
+                       if (res == -1) {
+                               llp->can_grow = 0;
+                               goto fallback;
+                       }
+                       llp->size = res;
+               }
+               if (llp->size < pipesize)
+                       goto fallback;
+       }
+
+       out.unique = req->unique;
+       out.error = 0;
+       out.len = len + sizeof(struct fuse_out_header);
+
+       iov.iov_base = &out;
+       iov.iov_len = sizeof(struct fuse_out_header);
+
+       res = vmsplice(llp->pipe[1], &iov, 1, 0);
+       if (res == -1) {
+               res = -errno;
+               perror("fuse: vmsplice to pipe");
+               return res;
+       }
+       if (res != sizeof(struct fuse_out_header)) {
+               res = -EIO;
+               fprintf(stderr, "fuse: short vmsplice to pipe: %u/%zu\n", res,
+                       sizeof(struct fuse_out_header));
+               goto clear_pipe;
+       }
+
+       res = splice(fd, off, llp->pipe[1], NULL, len, 0);
+       if (res == -1) {
+               res = fuse_reply_err(req, errno);
+               goto clear_pipe;
+       }
+       len = res;
+       out.len = len + sizeof(struct fuse_out_header);
+
+       if (req->f->debug) {
+               fprintf(stderr,
+                       "   unique: %llu, success, outsize: %i (splice)\n",
+                       (unsigned long long) out.unique, out.len);
+       }
+
+       splice_flags = 0;
+       if ((flags & FUSE_REPLY_FD_MOVE) &&
+           (req->f->conn.want & FUSE_CAP_SPLICE_MOVE))
+               splice_flags |= SPLICE_F_MOVE;
+
+       res = splice(llp->pipe[0], NULL,
+                    fuse_chan_fd(req->ch), NULL, out.len, flags);
+       if (res == -1) {
+               res = -errno;
+               perror("fuse: splice from pipe");
+               goto clear_pipe;
+       }
+       if (res != out.len) {
+               res = -EIO;
+               fprintf(stderr, "fuse: short splice from pipe: %u/%u\n",
+                       res, out.len);
+               goto clear_pipe;
+       }
+       return 0;
+
+clear_pipe:
+       pthread_setspecific(req->f->pipe_key, NULL);
+       fuse_ll_pipe_free(llp);
+       return res;
+
+fallback:
+       res = posix_memalign(&buf, pagesize, len);
+       if (res != 0)
+               return fuse_reply_err(req, res);
+
+       if (off != NULL) {
+               res = pread(fd, buf, len, *off);
+               if (res > 0)
+                       *off += res;
+       } else {
+               res = read(fd, buf, len);
+       }
+       if (res == -1)
+               res = fuse_reply_err(req, errno);
+       else
+               res = fuse_reply_buf(req, buf, res);
+       free(buf);
+
+       return res;
+}
+
 int fuse_reply_statfs(fuse_req_t req, const struct statvfs *stbuf)
 {
        struct fuse_statfs_out arg;
@@ -485,7 +657,7 @@ int fuse_reply_ioctl_iov(fuse_req_t req, int result, const struct iovec *iov,
 
        padded_iov = malloc((count + 2) * sizeof(struct iovec));
        if (padded_iov == NULL)
-               return fuse_reply_err(req, -ENOMEM);
+               return fuse_reply_err(req, ENOMEM);
 
        memset(&arg, 0, sizeof(arg));
        arg.result = result;
@@ -1201,6 +1373,14 @@ static void do_init(fuse_req_t req, fuse_ino_t nodeid, const void *inarg)
                f->conn.max_readahead = 0;
        }
 
+       if (req->f->conn.proto_minor >= 14) {
+               f->conn.capable |= FUSE_CAP_SPLICE_WRITE | FUSE_CAP_SPLICE_MOVE;
+               if (!f->no_splice_write)
+                       f->conn.want |= FUSE_CAP_SPLICE_WRITE;
+               if (!f->no_splice_move)
+                       f->conn.want |= FUSE_CAP_SPLICE_MOVE;
+       }
+
        if (f->atomic_o_trunc)
                f->conn.want |= FUSE_CAP_ATOMIC_O_TRUNC;
        if (f->op.getlk && f->op.setlk && !f->no_remote_lock)
@@ -1534,6 +1714,8 @@ static struct fuse_opt fuse_ll_opts[] = {
        { "atomic_o_trunc", offsetof(struct fuse_ll, atomic_o_trunc), 1},
        { "no_remote_lock", offsetof(struct fuse_ll, no_remote_lock), 1},
        { "big_writes", offsetof(struct fuse_ll, big_writes), 1},
+       { "no_splice_write", offsetof(struct fuse_ll, no_splice_write), 1},
+       { "no_splice_move", offsetof(struct fuse_ll, no_splice_move), 1},
        FUSE_OPT_KEY("max_read=", FUSE_OPT_KEY_DISCARD),
        FUSE_OPT_KEY("-h", KEY_HELP),
        FUSE_OPT_KEY("--help", KEY_HELP),
@@ -1557,7 +1739,10 @@ static void fuse_ll_help(void)
 "    -o sync_read           perform reads synchronously\n"
 "    -o atomic_o_trunc      enable atomic open+truncate support\n"
 "    -o big_writes          enable larger than 4kB writes\n"
-"    -o no_remote_lock      disable remote file locking\n");
+"    -o no_remote_lock      disable remote file locking\n"
+"    -o no_splice_write     don't use splice to write to the fuse device\n"
+"    -o no_splice_move      don't move data while splicing to the fuse device\n"
+);
 }
 
 static int fuse_ll_opt_proc(void *data, const char *arg, int key,
@@ -1589,17 +1774,27 @@ int fuse_lowlevel_is_lib_option(const char *opt)
 static void fuse_ll_destroy(void *data)
 {
        struct fuse_ll *f = (struct fuse_ll *) data;
+       struct fuse_ll_pipe *llp;
 
        if (f->got_init && !f->got_destroy) {
                if (f->op.destroy)
                        f->op.destroy(f->userdata);
        }
-
+       llp = pthread_getspecific(f->pipe_key);
+       if (llp != NULL)
+               fuse_ll_pipe_free(llp);
+       pthread_key_delete(f->pipe_key);
        pthread_mutex_destroy(&f->lock);
        free(f->cuse_data);
        free(f);
 }
 
+static void fuse_ll_pipe_destructor(void *data)
+{
+       struct fuse_ll_pipe *llp = data;
+       fuse_ll_pipe_free(llp);
+}
+
 /*
  * always call fuse_lowlevel_new_common() internally, to work around a
  * misfeature in the FreeBSD runtime linker, which links the old
@@ -1609,6 +1804,7 @@ struct fuse_session *fuse_lowlevel_new_common(struct fuse_args *args,
                                              const struct fuse_lowlevel_ops *op,
                                              size_t op_size, void *userdata)
 {
+       int err;
        struct fuse_ll *f;
        struct fuse_session *se;
        struct fuse_session_ops sop = {
@@ -1635,8 +1831,15 @@ struct fuse_session *fuse_lowlevel_new_common(struct fuse_args *args,
        list_init_req(&f->interrupts);
        fuse_mutex_init(&f->lock);
 
-       if (fuse_opt_parse(args, f, fuse_ll_opts, fuse_ll_opt_proc) == -1)
+       err = pthread_key_create(&f->pipe_key, fuse_ll_pipe_destructor);
+       if (err) {
+               fprintf(stderr, "fuse: failed to create thread specific key: %s\n",
+                       strerror(err));
                goto out_free;
+       }
+
+       if (fuse_opt_parse(args, f, fuse_ll_opts, fuse_ll_opt_proc) == -1)
+               goto out_key_destroy;
 
        if (f->debug)
                fprintf(stderr, "FUSE library version: %s\n", PACKAGE_VERSION);
@@ -1647,11 +1850,14 @@ struct fuse_session *fuse_lowlevel_new_common(struct fuse_args *args,
 
        se = fuse_session_new(&sop, f);
        if (!se)
-               goto out_free;
+               goto out_key_destroy;
 
        return se;
 
+out_key_destroy:
+       pthread_key_delete(f->pipe_key);
 out_free:
+       pthread_mutex_destroy(&f->lock);
        free(f);
 out:
        return NULL;
index 7db299bdf3cabdfc84e76637a62489184971c782..a919870ad0c603ccbfafebd9338c6c1c667d1f66 100644 (file)
@@ -179,7 +179,12 @@ FUSE_2.8 {
                fuse_req_ctx;
                fuse_req_getgroups;
                fuse_session_data;
+} FUSE_2.7.5;
+
+FUSE_2.9 {
+       global:
+               fuse_reply_fd;
 
        local:
                *;
-} FUSE_2.7.5;
+} FUSE_2.8;