tst_append(src_dir, work_dir)
tst_seek(src_dir, work_dir)
tst_mkdir(work_dir)
- tst_rmdir(src_dir, work_dir)
- tst_unlink(src_dir, work_dir)
+ tst_rmdir(work_dir, src_dir)
+ tst_unlink(work_dir, src_dir)
tst_symlink(work_dir)
if os.getuid() == 0:
tst_chown(work_dir)
tst_append(src_dir, mnt_dir)
tst_seek(src_dir, mnt_dir)
tst_mkdir(mnt_dir)
- tst_rmdir(src_dir, mnt_dir)
- tst_unlink(src_dir, mnt_dir)
+ if cache:
+ # if cache is enabled, no operations should go through
+ # src_dir as the cache will become stale.
+ tst_rmdir(mnt_dir)
+ tst_unlink(mnt_dir)
+ else:
+ tst_rmdir(mnt_dir, src_dir)
+ tst_unlink(mnt_dir, src_dir)
tst_symlink(mnt_dir)
if os.getuid() == 0:
tst_chown(mnt_dir)
def os_create(name):
os.close(os.open(name, os.O_CREAT | os.O_RDWR))
-def tst_unlink(src_dir, mnt_dir):
+def tst_unlink(mnt_dir, src_dir=None):
name = name_generator()
fullname = mnt_dir + "/" + name
- with open(pjoin(src_dir, name), 'wb') as fh:
+ srcname = fullname
+ if src_dir is not None:
+ srcname = pjoin(src_dir, name)
+ with open(srcname, 'wb') as fh:
fh.write(b'hello')
assert name in os.listdir(mnt_dir)
os.unlink(fullname)
assert fstat.st_nlink in (1,2)
assert dirname in os.listdir(mnt_dir)
-def tst_rmdir(src_dir, mnt_dir):
+def tst_rmdir(mnt_dir, src_dir=None):
name = name_generator()
fullname = mnt_dir + "/" + name
- os.mkdir(pjoin(src_dir, name))
+ srcname = fullname
+ if src_dir is not None:
+ srcname = pjoin(src_dir, name)
+ os.mkdir(srcname)
assert name in os.listdir(mnt_dir)
os.rmdir(fullname)
with pytest.raises(OSError) as exc_info: