Added unit tests for fusexmp and hello
authorNikolaus Rath <Nikolaus@rath.org>
Tue, 29 Mar 2016 23:07:29 +0000 (16:07 -0700)
committerNikolaus Rath <Nikolaus@rath.org>
Tue, 29 Mar 2016 23:07:29 +0000 (16:07 -0700)
test/test_examples.py [new file with mode: 0755]

diff --git a/test/test_examples.py b/test/test_examples.py
new file mode 100755 (executable)
index 0000000..9da7bb1
--- /dev/null
@@ -0,0 +1,306 @@
+#!/usr/bin/env python3
+
+if __name__ == '__main__':
+    import pytest
+    import sys
+    sys.exit(pytest.main([__file__] + sys.argv[1:]))
+
+import subprocess
+import os
+import sys
+import pytest
+import stat
+import shutil
+import filecmp
+import errno
+from tempfile import NamedTemporaryFile
+from util import wait_for_mount, umount, cleanup
+
+basename = os.path.join(os.path.dirname(__file__), '..')
+TEST_FILE = __file__
+
+with open(TEST_FILE, 'rb') as fh:
+    TEST_DATA = fh.read()
+
+def name_generator(__ctr=[0]):
+    __ctr[0] += 1
+    return 'testfile_%d' % __ctr[0]
+
+@pytest.mark.parametrize("name", ('hello', 'hello_ll'))
+def test_hello(tmpdir, name):
+    mnt_dir = str(tmpdir)
+    cmdline = [os.path.join(basename, 'example', name),
+               '-f', mnt_dir ]
+    mount_process = subprocess.Popen(cmdline)
+    try:
+        wait_for_mount(mount_process, mnt_dir)
+        assert os.listdir(mnt_dir) == [ 'hello' ]
+        filename = os.path.join(mnt_dir, 'hello')
+        with open(filename, 'r') as fh:
+            assert fh.read() == 'Hello World!\n'
+        with pytest.raises(IOError) as exc_info:
+            open(filename, 'r+')
+        assert exc_info.value.errno == errno.EACCES
+        with pytest.raises(IOError) as exc_info:
+            open(filename + 'does-not-exist', 'r+')
+        assert exc_info.value.errno == errno.ENOENT
+    except:
+        cleanup(mnt_dir)
+        raise
+    else:
+        umount(mount_process, mnt_dir)
+
+@pytest.mark.parametrize("name", ('fusexmp', 'fusexmp_fh'))
+def test_fusexmp_fh(tmpdir, name):
+    mnt_dir = str(tmpdir.mkdir('mnt'))
+    src_dir = str(tmpdir.mkdir('src'))
+
+    cmdline = [os.path.join(basename, 'example', name),
+                '-f', '-o' , 'use_ino,readdir_ino,kernel_cache',
+                mnt_dir ]
+    mount_process = subprocess.Popen(cmdline)
+    try:
+        wait_for_mount(mount_process, mnt_dir)
+        work_dir = os.path.join(mnt_dir, src_dir)
+        tst_write(work_dir)
+        tst_mkdir(work_dir)
+        tst_symlink(work_dir)
+        tst_mknod(work_dir)
+        if os.getuid() == 0:
+            tst_chown(work_dir)
+        # Underlying fs may not have full nanosecond resolution
+        tst_utimens(work_dir, ns_tol=1000)
+        tst_link(work_dir)
+        tst_readdir(work_dir)
+        tst_statvfs(work_dir)
+        tst_truncate_path(work_dir)
+        tst_truncate_fd(work_dir)
+        tst_unlink(work_dir)
+        tst_passthrough(src_dir, work_dir)
+    except:
+        cleanup(mnt_dir)
+        raise
+    else:
+        umount(mount_process, mnt_dir)
+
+def checked_unlink(filename, path, isdir=False):
+    fullname = os.path.join(path, filename)
+    if isdir:
+        os.rmdir(fullname)
+    else:
+        os.unlink(fullname)
+    with pytest.raises(OSError) as exc_info:
+        os.stat(fullname)
+    assert exc_info.value.errno == errno.ENOENT
+    assert filename not in os.listdir(path)
+
+def tst_mkdir(mnt_dir):
+    dirname = name_generator()
+    fullname = mnt_dir + "/" + dirname
+    os.mkdir(fullname)
+    fstat = os.stat(fullname)
+    assert stat.S_ISDIR(fstat.st_mode)
+    assert os.listdir(fullname) ==  []
+    assert fstat.st_nlink in (1,2)
+    assert dirname in os.listdir(mnt_dir)
+    checked_unlink(dirname, mnt_dir, isdir=True)
+
+def tst_symlink(mnt_dir):
+    linkname = name_generator()
+    fullname = mnt_dir + "/" + linkname
+    os.symlink("/imaginary/dest", fullname)
+    fstat = os.lstat(fullname)
+    assert stat.S_ISLNK(fstat.st_mode)
+    assert os.readlink(fullname) == "/imaginary/dest"
+    assert fstat.st_nlink == 1
+    assert linkname in os.listdir(mnt_dir)
+    checked_unlink(linkname, mnt_dir)
+
+def tst_mknod(mnt_dir):
+    filename = os.path.join(mnt_dir, name_generator())
+    shutil.copyfile(TEST_FILE, filename)
+    fstat = os.lstat(filename)
+    assert stat.S_ISREG(fstat.st_mode)
+    assert fstat.st_nlink == 1
+    assert os.path.basename(filename) in os.listdir(mnt_dir)
+    assert filecmp.cmp(TEST_FILE, filename, False)
+    checked_unlink(filename, mnt_dir)
+
+def tst_chown(mnt_dir):
+    filename = os.path.join(mnt_dir, name_generator())
+    os.mkdir(filename)
+    fstat = os.lstat(filename)
+    uid = fstat.st_uid
+    gid = fstat.st_gid
+
+    uid_new = uid + 1
+    os.chown(filename, uid_new, -1)
+    fstat = os.lstat(filename)
+    assert fstat.st_uid == uid_new
+    assert fstat.st_gid == gid
+
+    gid_new = gid + 1
+    os.chown(filename, -1, gid_new)
+    fstat = os.lstat(filename)
+    assert fstat.st_uid == uid_new
+    assert fstat.st_gid == gid_new
+
+    checked_unlink(filename, mnt_dir, isdir=True)
+
+def tst_write(mnt_dir):
+    name = os.path.join(mnt_dir, name_generator())
+    shutil.copyfile(TEST_FILE, name)
+    assert filecmp.cmp(name, TEST_FILE, False)
+    checked_unlink(name, mnt_dir)
+
+def tst_unlink(mnt_dir):
+    name = os.path.join(mnt_dir, name_generator())
+    data1 = b'foo'
+    data2 = b'bar'
+
+    with open(os.path.join(mnt_dir, name), 'wb+', buffering=0) as fh:
+        fh.write(data1)
+        checked_unlink(name, mnt_dir)
+        fh.write(data2)
+        fh.seek(0)
+        assert fh.read() == data1+data2
+
+def tst_statvfs(mnt_dir):
+    os.statvfs(mnt_dir)
+
+def tst_link(mnt_dir):
+    name1 = os.path.join(mnt_dir, name_generator())
+    name2 = os.path.join(mnt_dir, name_generator())
+    shutil.copyfile(TEST_FILE, name1)
+    assert filecmp.cmp(name1, TEST_FILE, False)
+    os.link(name1, name2)
+
+    fstat1 = os.lstat(name1)
+    fstat2 = os.lstat(name2)
+
+    assert fstat1 == fstat2
+    assert fstat1.st_nlink == 2
+
+    assert os.path.basename(name2) in os.listdir(mnt_dir)
+    assert filecmp.cmp(name1, name2, False)
+    os.unlink(name2)
+    fstat1 = os.lstat(name1)
+    assert fstat1.st_nlink == 1
+    os.unlink(name1)
+
+def tst_readdir(mnt_dir):
+    dir_ = os.path.join(mnt_dir, name_generator())
+    file_ = dir_ + "/" + name_generator()
+    subdir = dir_ + "/" + name_generator()
+    subfile = subdir + "/" + name_generator()
+
+    os.mkdir(dir_)
+    shutil.copyfile(TEST_FILE, file_)
+    os.mkdir(subdir)
+    shutil.copyfile(TEST_FILE, subfile)
+
+    listdir_is = os.listdir(dir_)
+    listdir_is.sort()
+    listdir_should = [ os.path.basename(file_), os.path.basename(subdir) ]
+    listdir_should.sort()
+    assert listdir_is == listdir_should
+
+    os.unlink(file_)
+    os.unlink(subfile)
+    os.rmdir(subdir)
+    os.rmdir(dir_)
+
+def tst_truncate_path(mnt_dir):
+    assert len(TEST_DATA) > 1024
+
+    filename = os.path.join(mnt_dir, name_generator())
+    with open(filename, 'wb') as fh:
+        fh.write(TEST_DATA)
+
+    fstat = os.stat(filename)
+    size = fstat.st_size
+    assert size == len(TEST_DATA)
+
+    # Add zeros at the end
+    os.truncate(filename, size + 1024)
+    assert os.stat(filename).st_size == size + 1024
+    with open(filename, 'rb') as fh:
+        assert fh.read(size) == TEST_DATA
+        assert fh.read(1025) == b'\0' * 1024
+
+    # Truncate data
+    os.truncate(filename, size - 1024)
+    assert os.stat(filename).st_size == size - 1024
+    with open(filename, 'rb') as fh:
+        assert fh.read(size) == TEST_DATA[:size-1024]
+
+    os.unlink(filename)
+
+def tst_truncate_fd(mnt_dir):
+    assert len(TEST_DATA) > 1024
+    with NamedTemporaryFile('w+b', 0, dir=mnt_dir) as fh:
+        fd = fh.fileno()
+        fh.write(TEST_DATA)
+        fstat = os.fstat(fd)
+        size = fstat.st_size
+        assert size == len(TEST_DATA)
+
+        # Add zeros at the end
+        os.ftruncate(fd, size + 1024)
+        assert os.fstat(fd).st_size == size + 1024
+        fh.seek(0)
+        assert fh.read(size) == TEST_DATA
+        assert fh.read(1025) == b'\0' * 1024
+
+        # Truncate data
+        os.ftruncate(fd, size - 1024)
+        assert os.fstat(fd).st_size == size - 1024
+        fh.seek(0)
+        assert fh.read(size) == TEST_DATA[:size-1024]
+
+def tst_utimens(mnt_dir, ns_tol=0):
+    filename = os.path.join(mnt_dir, name_generator())
+    os.mkdir(filename)
+    fstat = os.lstat(filename)
+
+    atime = fstat.st_atime + 42.28
+    mtime = fstat.st_mtime - 42.23
+    if sys.version_info < (3,3):
+        os.utime(filename, (atime, mtime))
+    else:
+        atime_ns = fstat.st_atime_ns + int(42.28*1e9)
+        mtime_ns = fstat.st_mtime_ns - int(42.23*1e9)
+        os.utime(filename, None, ns=(atime_ns, mtime_ns))
+
+    fstat = os.lstat(filename)
+
+    assert abs(fstat.st_atime - atime) < 1e-3
+    assert abs(fstat.st_mtime - mtime) < 1e-3
+    if sys.version_info >= (3,3):
+        assert abs(fstat.st_atime_ns - atime_ns) <= ns_tol
+        assert abs(fstat.st_mtime_ns - mtime_ns) <= ns_tol
+
+    checked_unlink(filename, mnt_dir, isdir=True)
+
+def tst_passthrough(src_dir, mnt_dir):
+    name = name_generator()
+    src_name = os.path.join(src_dir, name)
+    mnt_name = os.path.join(src_dir, name)
+    assert name not in os.listdir(src_dir)
+    assert name not in os.listdir(mnt_dir)
+    with open(src_name, 'w') as fh:
+        fh.write('Hello, world')
+    assert name in os.listdir(src_dir)
+    assert name in os.listdir(mnt_dir)
+    assert os.stat(src_name) == os.stat(mnt_name)
+
+    name = name_generator()
+    src_name = os.path.join(src_dir, name)
+    mnt_name = os.path.join(src_dir, name)
+    assert name not in os.listdir(src_dir)
+    assert name not in os.listdir(mnt_dir)
+    with open(mnt_name, 'w') as fh:
+        fh.write('Hello, world')
+    assert name in os.listdir(src_dir)
+    assert name in os.listdir(mnt_dir)
+    assert os.stat(src_name) == os.stat(mnt_name)