self.cancel_and_wait(force=True)
result = self.vm.qmp('query-block')
self.assert_qmp(result, 'return[0]/inserted/file', test_img)
- self.vm.shutdown()
def test_cancel_after_ready(self):
self.assert_no_active_block_jobs()
self.assert_qmp(result, 'return[0]/node-name', 'top')
self.assert_qmp(result, 'return[0]/backing/node-name', 'base')
- self.vm.shutdown()
-
def test_medium_not_found(self):
if iotests.qemu_default_machine != 'pc':
return
self.assert_qmp(event, 'data/id', 'drive0')
self.assert_no_active_block_jobs()
- self.vm.shutdown()
def test_ignore_read(self):
self.assert_no_active_block_jobs()
result = self.vm.qmp('query-block-jobs')
self.assert_qmp(result, 'return[0]/paused', False)
self.complete_and_wait()
- self.vm.shutdown()
def test_large_cluster(self):
self.assert_no_active_block_jobs()
self.complete_and_wait(wait_ready=False)
self.assert_no_active_block_jobs()
- self.vm.shutdown()
class TestWriteErrors(iotests.QMPTestCase):
image_len = 2 * 1024 * 1024 # MB
completed = True
self.assert_no_active_block_jobs()
- self.vm.shutdown()
def test_ignore_write(self):
self.assert_no_active_block_jobs()
result = self.vm.qmp('query-block-jobs')
self.assert_qmp(result, 'return[0]/paused', False)
self.complete_and_wait()
- self.vm.shutdown()
def test_stop_write(self):
self.assert_no_active_block_jobs()
self.complete_and_wait(wait_ready=False)
self.assert_no_active_block_jobs()
- self.vm.shutdown()
class TestSetSpeed(iotests.QMPTestCase):
image_len = 80 * 1024 * 1024 # MB
# here we check that the last registered quorum file has not been
# swapped out and unref
self.assert_has_block_node(None, quorum_img3)
- self.vm.shutdown()
def test_cancel_after_ready(self):
self.assert_no_active_block_jobs()
self.assert_has_block_node("repair0", quorum_repair_img)
# TODO: a better test requiring some QEMU infrastructure will be added
# to check that this file is really driven by quorum
- self.vm.shutdown()
# Test mirroring with a source that does not have any parents (not even a
# BlockBackend)