--- /dev/null
+[settings]
+force_grid_wrap=4
+force_sort_within_sections=True
+include_trailing_comma=True
+line_length=72
+lines_after_imports=2
+multi_line_output=3
\ No newline at end of file
+++ /dev/null
-[flake8]
-extend-ignore = E722 # Pylint handles this, but smarter.
\ No newline at end of file
+++ /dev/null
-[settings]
-force_grid_wrap=4
-force_sort_within_sections=True
-include_trailing_comma=True
-line_length=72
-lines_after_imports=2
-multi_line_output=3
\ No newline at end of file
+++ /dev/null
-# QEMU library
-#
-# Copyright (C) 2015-2016 Red Hat Inc.
-# Copyright (C) 2012 IBM Corp.
-#
-# Authors:
-# Fam Zheng <famz@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2. See
-# the COPYING file in the top-level directory.
-#
+++ /dev/null
-"""
-QEMU accel module:
-
-This module provides utilities for discover and check the availability of
-accelerators.
-"""
-# Copyright (C) 2015-2016 Red Hat Inc.
-# Copyright (C) 2012 IBM Corp.
-#
-# Authors:
-# Fam Zheng <famz@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2. See
-# the COPYING file in the top-level directory.
-#
-
-import logging
-import os
-import subprocess
-from typing import List, Optional
-
-
-LOG = logging.getLogger(__name__)
-
-# Mapping host architecture to any additional architectures it can
-# support which often includes its 32 bit cousin.
-ADDITIONAL_ARCHES = {
- "x86_64": "i386",
- "aarch64": "armhf",
- "ppc64le": "ppc64",
-}
-
-
-def list_accel(qemu_bin: str) -> List[str]:
- """
- List accelerators enabled in the QEMU binary.
-
- @param qemu_bin (str): path to the QEMU binary.
- @raise Exception: if failed to run `qemu -accel help`
- @return a list of accelerator names.
- """
- if not qemu_bin:
- return []
- try:
- out = subprocess.check_output([qemu_bin, '-accel', 'help'],
- universal_newlines=True)
- except:
- LOG.debug("Failed to get the list of accelerators in %s", qemu_bin)
- raise
- # Skip the first line which is the header.
- return [acc.strip() for acc in out.splitlines()[1:]]
-
-
-def kvm_available(target_arch: Optional[str] = None,
- qemu_bin: Optional[str] = None) -> bool:
- """
- Check if KVM is available using the following heuristic:
- - Kernel module is present in the host;
- - Target and host arches don't mismatch;
- - KVM is enabled in the QEMU binary.
-
- @param target_arch (str): target architecture
- @param qemu_bin (str): path to the QEMU binary
- @return True if kvm is available, otherwise False.
- """
- if not os.access("/dev/kvm", os.R_OK | os.W_OK):
- return False
- if target_arch:
- host_arch = os.uname()[4]
- if target_arch != host_arch:
- if target_arch != ADDITIONAL_ARCHES.get(host_arch):
- return False
- if qemu_bin and "kvm" not in list_accel(qemu_bin):
- return False
- return True
-
-
-def tcg_available(qemu_bin: str) -> bool:
- """
- Check if TCG is available.
-
- @param qemu_bin (str): path to the QEMU binary
- """
- return 'tcg' in list_accel(qemu_bin)
+++ /dev/null
-"""
-QEMU Console Socket Module:
-
-This python module implements a ConsoleSocket object,
-which can drain a socket and optionally dump the bytes to file.
-"""
-# Copyright 2020 Linaro
-#
-# Authors:
-# Robert Foley <robert.foley@linaro.org>
-#
-# This code is licensed under the GPL version 2 or later. See
-# the COPYING file in the top-level directory.
-#
-
-from collections import deque
-import socket
-import threading
-import time
-from typing import Deque, Optional
-
-
-class ConsoleSocket(socket.socket):
- """
- ConsoleSocket represents a socket attached to a char device.
-
- Optionally (if drain==True), drains the socket and places the bytes
- into an in memory buffer for later processing.
-
- Optionally a file path can be passed in and we will also
- dump the characters to this file for debugging purposes.
- """
- def __init__(self, address: str, file: Optional[str] = None,
- drain: bool = False):
- self._recv_timeout_sec = 300.0
- self._sleep_time = 0.5
- self._buffer: Deque[int] = deque()
- socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
- self.connect(address)
- self._logfile = None
- if file:
- # pylint: disable=consider-using-with
- self._logfile = open(file, "bw")
- self._open = True
- self._drain_thread = None
- if drain:
- self._drain_thread = self._thread_start()
-
- def __repr__(self) -> str:
- tmp = super().__repr__()
- tmp = tmp.rstrip(">")
- tmp = "%s, logfile=%s, drain_thread=%s>" % (tmp, self._logfile,
- self._drain_thread)
- return tmp
-
- def _drain_fn(self) -> None:
- """Drains the socket and runs while the socket is open."""
- while self._open:
- try:
- self._drain_socket()
- except socket.timeout:
- # The socket is expected to timeout since we set a
- # short timeout to allow the thread to exit when
- # self._open is set to False.
- time.sleep(self._sleep_time)
-
- def _thread_start(self) -> threading.Thread:
- """Kick off a thread to drain the socket."""
- # Configure socket to not block and timeout.
- # This allows our drain thread to not block
- # on recieve and exit smoothly.
- socket.socket.setblocking(self, False)
- socket.socket.settimeout(self, 1)
- drain_thread = threading.Thread(target=self._drain_fn)
- drain_thread.daemon = True
- drain_thread.start()
- return drain_thread
-
- def close(self) -> None:
- """Close the base object and wait for the thread to terminate"""
- if self._open:
- self._open = False
- if self._drain_thread is not None:
- thread, self._drain_thread = self._drain_thread, None
- thread.join()
- socket.socket.close(self)
- if self._logfile:
- self._logfile.close()
- self._logfile = None
-
- def _drain_socket(self) -> None:
- """process arriving characters into in memory _buffer"""
- data = socket.socket.recv(self, 1)
- if self._logfile:
- self._logfile.write(data)
- self._logfile.flush()
- self._buffer.extend(data)
-
- def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
- """Return chars from in memory buffer.
- Maintains the same API as socket.socket.recv.
- """
- if self._drain_thread is None:
- # Not buffering the socket, pass thru to socket.
- return socket.socket.recv(self, bufsize, flags)
- assert not flags, "Cannot pass flags to recv() in drained mode"
- start_time = time.time()
- while len(self._buffer) < bufsize:
- time.sleep(self._sleep_time)
- elapsed_sec = time.time() - start_time
- if elapsed_sec > self._recv_timeout_sec:
- raise socket.timeout
- return bytes((self._buffer.popleft() for i in range(bufsize)))
-
- def setblocking(self, value: bool) -> None:
- """When not draining we pass thru to the socket,
- since when draining we control socket blocking.
- """
- if self._drain_thread is None:
- socket.socket.setblocking(self, value)
-
- def settimeout(self, value: Optional[float]) -> None:
- """When not draining we pass thru to the socket,
- since when draining we control the timeout.
- """
- if value is not None:
- self._recv_timeout_sec = value
- if self._drain_thread is None:
- socket.socket.settimeout(self, value)
+++ /dev/null
-"""
-QEMU machine module:
-
-The machine module primarily provides the QEMUMachine class,
-which provides facilities for managing the lifetime of a QEMU VM.
-"""
-
-# Copyright (C) 2015-2016 Red Hat Inc.
-# Copyright (C) 2012 IBM Corp.
-#
-# Authors:
-# Fam Zheng <famz@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2. See
-# the COPYING file in the top-level directory.
-#
-# Based on qmp.py.
-#
-
-import errno
-from itertools import chain
-import logging
-import os
-import shutil
-import signal
-import socket
-import subprocess
-import tempfile
-from types import TracebackType
-from typing import (
- Any,
- BinaryIO,
- Dict,
- List,
- Optional,
- Sequence,
- Tuple,
- Type,
-)
-
-from . import console_socket, qmp
-from .qmp import QMPMessage, QMPReturnValue, SocketAddrT
-
-
-LOG = logging.getLogger(__name__)
-
-
-class QEMUMachineError(Exception):
- """
- Exception called when an error in QEMUMachine happens.
- """
-
-
-class QEMUMachineAddDeviceError(QEMUMachineError):
- """
- Exception raised when a request to add a device can not be fulfilled
-
- The failures are caused by limitations, lack of information or conflicting
- requests on the QEMUMachine methods. This exception does not represent
- failures reported by the QEMU binary itself.
- """
-
-
-class AbnormalShutdown(QEMUMachineError):
- """
- Exception raised when a graceful shutdown was requested, but not performed.
- """
-
-
-class QEMUMachine:
- """
- A QEMU VM.
-
- Use this object as a context manager to ensure
- the QEMU process terminates::
-
- with VM(binary) as vm:
- ...
- # vm is guaranteed to be shut down here
- """
-
- def __init__(self,
- binary: str,
- args: Sequence[str] = (),
- wrapper: Sequence[str] = (),
- name: Optional[str] = None,
- base_temp_dir: str = "/var/tmp",
- monitor_address: Optional[SocketAddrT] = None,
- socket_scm_helper: Optional[str] = None,
- sock_dir: Optional[str] = None,
- drain_console: bool = False,
- console_log: Optional[str] = None):
- '''
- Initialize a QEMUMachine
-
- @param binary: path to the qemu binary
- @param args: list of extra arguments
- @param wrapper: list of arguments used as prefix to qemu binary
- @param name: prefix for socket and log file names (default: qemu-PID)
- @param base_temp_dir: default location where temp files are created
- @param monitor_address: address for QMP monitor
- @param socket_scm_helper: helper program, required for send_fd_scm()
- @param sock_dir: where to create socket (defaults to base_temp_dir)
- @param drain_console: (optional) True to drain console socket to buffer
- @param console_log: (optional) path to console log file
- @note: Qemu process is not started until launch() is used.
- '''
- # Direct user configuration
-
- self._binary = binary
- self._args = list(args)
- self._wrapper = wrapper
-
- self._name = name or "qemu-%d" % os.getpid()
- self._base_temp_dir = base_temp_dir
- self._sock_dir = sock_dir or self._base_temp_dir
- self._socket_scm_helper = socket_scm_helper
-
- if monitor_address is not None:
- self._monitor_address = monitor_address
- self._remove_monitor_sockfile = False
- else:
- self._monitor_address = os.path.join(
- self._sock_dir, f"{self._name}-monitor.sock"
- )
- self._remove_monitor_sockfile = True
-
- self._console_log_path = console_log
- if self._console_log_path:
- # In order to log the console, buffering needs to be enabled.
- self._drain_console = True
- else:
- self._drain_console = drain_console
-
- # Runstate
- self._qemu_log_path: Optional[str] = None
- self._qemu_log_file: Optional[BinaryIO] = None
- self._popen: Optional['subprocess.Popen[bytes]'] = None
- self._events: List[QMPMessage] = []
- self._iolog: Optional[str] = None
- self._qmp_set = True # Enable QMP monitor by default.
- self._qmp_connection: Optional[qmp.QEMUMonitorProtocol] = None
- self._qemu_full_args: Tuple[str, ...] = ()
- self._temp_dir: Optional[str] = None
- self._launched = False
- self._machine: Optional[str] = None
- self._console_index = 0
- self._console_set = False
- self._console_device_type: Optional[str] = None
- self._console_address = os.path.join(
- self._sock_dir, f"{self._name}-console.sock"
- )
- self._console_socket: Optional[socket.socket] = None
- self._remove_files: List[str] = []
- self._user_killed = False
-
- def __enter__(self) -> 'QEMUMachine':
- return self
-
- def __exit__(self,
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType]) -> None:
- self.shutdown()
-
- def add_monitor_null(self) -> None:
- """
- This can be used to add an unused monitor instance.
- """
- self._args.append('-monitor')
- self._args.append('null')
-
- def add_fd(self, fd: int, fdset: int,
- opaque: str, opts: str = '') -> 'QEMUMachine':
- """
- Pass a file descriptor to the VM
- """
- options = ['fd=%d' % fd,
- 'set=%d' % fdset,
- 'opaque=%s' % opaque]
- if opts:
- options.append(opts)
-
- # This did not exist before 3.4, but since then it is
- # mandatory for our purpose
- if hasattr(os, 'set_inheritable'):
- os.set_inheritable(fd, True)
-
- self._args.append('-add-fd')
- self._args.append(','.join(options))
- return self
-
- def send_fd_scm(self, fd: Optional[int] = None,
- file_path: Optional[str] = None) -> int:
- """
- Send an fd or file_path to socket_scm_helper.
-
- Exactly one of fd and file_path must be given.
- If it is file_path, the helper will open that file and pass its own fd.
- """
- # In iotest.py, the qmp should always use unix socket.
- assert self._qmp.is_scm_available()
- if self._socket_scm_helper is None:
- raise QEMUMachineError("No path to socket_scm_helper set")
- if not os.path.exists(self._socket_scm_helper):
- raise QEMUMachineError("%s does not exist" %
- self._socket_scm_helper)
-
- # This did not exist before 3.4, but since then it is
- # mandatory for our purpose
- if hasattr(os, 'set_inheritable'):
- os.set_inheritable(self._qmp.get_sock_fd(), True)
- if fd is not None:
- os.set_inheritable(fd, True)
-
- fd_param = ["%s" % self._socket_scm_helper,
- "%d" % self._qmp.get_sock_fd()]
-
- if file_path is not None:
- assert fd is None
- fd_param.append(file_path)
- else:
- assert fd is not None
- fd_param.append(str(fd))
-
- proc = subprocess.run(
- fd_param,
- stdin=subprocess.DEVNULL,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- check=False,
- close_fds=False,
- )
- if proc.stdout:
- LOG.debug(proc.stdout)
-
- return proc.returncode
-
- @staticmethod
- def _remove_if_exists(path: str) -> None:
- """
- Remove file object at path if it exists
- """
- try:
- os.remove(path)
- except OSError as exception:
- if exception.errno == errno.ENOENT:
- return
- raise
-
- def is_running(self) -> bool:
- """Returns true if the VM is running."""
- return self._popen is not None and self._popen.poll() is None
-
- @property
- def _subp(self) -> 'subprocess.Popen[bytes]':
- if self._popen is None:
- raise QEMUMachineError('Subprocess pipe not present')
- return self._popen
-
- def exitcode(self) -> Optional[int]:
- """Returns the exit code if possible, or None."""
- if self._popen is None:
- return None
- return self._popen.poll()
-
- def get_pid(self) -> Optional[int]:
- """Returns the PID of the running process, or None."""
- if not self.is_running():
- return None
- return self._subp.pid
-
- def _load_io_log(self) -> None:
- if self._qemu_log_path is not None:
- with open(self._qemu_log_path, "r") as iolog:
- self._iolog = iolog.read()
-
- @property
- def _base_args(self) -> List[str]:
- args = ['-display', 'none', '-vga', 'none']
-
- if self._qmp_set:
- if isinstance(self._monitor_address, tuple):
- moncdev = "socket,id=mon,host={},port={}".format(
- *self._monitor_address
- )
- else:
- moncdev = f"socket,id=mon,path={self._monitor_address}"
- args.extend(['-chardev', moncdev, '-mon',
- 'chardev=mon,mode=control'])
-
- if self._machine is not None:
- args.extend(['-machine', self._machine])
- for _ in range(self._console_index):
- args.extend(['-serial', 'null'])
- if self._console_set:
- chardev = ('socket,id=console,path=%s,server=on,wait=off' %
- self._console_address)
- args.extend(['-chardev', chardev])
- if self._console_device_type is None:
- args.extend(['-serial', 'chardev:console'])
- else:
- device = '%s,chardev=console' % self._console_device_type
- args.extend(['-device', device])
- return args
-
- def _pre_launch(self) -> None:
- self._qemu_log_path = os.path.join(self.temp_dir, self._name + ".log")
-
- if self._console_set:
- self._remove_files.append(self._console_address)
-
- if self._qmp_set:
- if self._remove_monitor_sockfile:
- assert isinstance(self._monitor_address, str)
- self._remove_files.append(self._monitor_address)
- self._qmp_connection = qmp.QEMUMonitorProtocol(
- self._monitor_address,
- server=True,
- nickname=self._name
- )
-
- # NOTE: Make sure any opened resources are *definitely* freed in
- # _post_shutdown()!
- # pylint: disable=consider-using-with
- self._qemu_log_file = open(self._qemu_log_path, 'wb')
-
- def _post_launch(self) -> None:
- if self._qmp_connection:
- self._qmp.accept()
-
- def _post_shutdown(self) -> None:
- """
- Called to cleanup the VM instance after the process has exited.
- May also be called after a failed launch.
- """
- # Comprehensive reset for the failed launch case:
- self._early_cleanup()
-
- if self._qmp_connection:
- self._qmp.close()
- self._qmp_connection = None
-
- if self._qemu_log_file is not None:
- self._qemu_log_file.close()
- self._qemu_log_file = None
-
- self._load_io_log()
-
- self._qemu_log_path = None
-
- if self._temp_dir is not None:
- shutil.rmtree(self._temp_dir)
- self._temp_dir = None
-
- while len(self._remove_files) > 0:
- self._remove_if_exists(self._remove_files.pop())
-
- exitcode = self.exitcode()
- if (exitcode is not None and exitcode < 0
- and not (self._user_killed and exitcode == -signal.SIGKILL)):
- msg = 'qemu received signal %i; command: "%s"'
- if self._qemu_full_args:
- command = ' '.join(self._qemu_full_args)
- else:
- command = ''
- LOG.warning(msg, -int(exitcode), command)
-
- self._user_killed = False
- self._launched = False
-
- def launch(self) -> None:
- """
- Launch the VM and make sure we cleanup and expose the
- command line/output in case of exception
- """
-
- if self._launched:
- raise QEMUMachineError('VM already launched')
-
- self._iolog = None
- self._qemu_full_args = ()
- try:
- self._launch()
- self._launched = True
- except:
- self._post_shutdown()
-
- LOG.debug('Error launching VM')
- if self._qemu_full_args:
- LOG.debug('Command: %r', ' '.join(self._qemu_full_args))
- if self._iolog:
- LOG.debug('Output: %r', self._iolog)
- raise
-
- def _launch(self) -> None:
- """
- Launch the VM and establish a QMP connection
- """
- self._pre_launch()
- self._qemu_full_args = tuple(
- chain(self._wrapper,
- [self._binary],
- self._base_args,
- self._args)
- )
- LOG.debug('VM launch command: %r', ' '.join(self._qemu_full_args))
-
- # Cleaning up of this subprocess is guaranteed by _do_shutdown.
- # pylint: disable=consider-using-with
- self._popen = subprocess.Popen(self._qemu_full_args,
- stdin=subprocess.DEVNULL,
- stdout=self._qemu_log_file,
- stderr=subprocess.STDOUT,
- shell=False,
- close_fds=False)
- self._post_launch()
-
- def _early_cleanup(self) -> None:
- """
- Perform any cleanup that needs to happen before the VM exits.
-
- May be invoked by both soft and hard shutdown in failover scenarios.
- Called additionally by _post_shutdown for comprehensive cleanup.
- """
- # If we keep the console socket open, we may deadlock waiting
- # for QEMU to exit, while QEMU is waiting for the socket to
- # become writeable.
- if self._console_socket is not None:
- self._console_socket.close()
- self._console_socket = None
-
- def _hard_shutdown(self) -> None:
- """
- Perform early cleanup, kill the VM, and wait for it to terminate.
-
- :raise subprocess.Timeout: When timeout is exceeds 60 seconds
- waiting for the QEMU process to terminate.
- """
- self._early_cleanup()
- self._subp.kill()
- self._subp.wait(timeout=60)
-
- def _soft_shutdown(self, timeout: Optional[int],
- has_quit: bool = False) -> None:
- """
- Perform early cleanup, attempt to gracefully shut down the VM, and wait
- for it to terminate.
-
- :param timeout: Timeout in seconds for graceful shutdown.
- A value of None is an infinite wait.
- :param has_quit: When True, don't attempt to issue 'quit' QMP command
-
- :raise ConnectionReset: On QMP communication errors
- :raise subprocess.TimeoutExpired: When timeout is exceeded waiting for
- the QEMU process to terminate.
- """
- self._early_cleanup()
-
- if self._qmp_connection:
- if not has_quit:
- # Might raise ConnectionReset
- self._qmp.cmd('quit')
-
- # May raise subprocess.TimeoutExpired
- self._subp.wait(timeout=timeout)
-
- def _do_shutdown(self, timeout: Optional[int],
- has_quit: bool = False) -> None:
- """
- Attempt to shutdown the VM gracefully; fallback to a hard shutdown.
-
- :param timeout: Timeout in seconds for graceful shutdown.
- A value of None is an infinite wait.
- :param has_quit: When True, don't attempt to issue 'quit' QMP command
-
- :raise AbnormalShutdown: When the VM could not be shut down gracefully.
- The inner exception will likely be ConnectionReset or
- subprocess.TimeoutExpired. In rare cases, non-graceful termination
- may result in its own exceptions, likely subprocess.TimeoutExpired.
- """
- try:
- self._soft_shutdown(timeout, has_quit)
- except Exception as exc:
- self._hard_shutdown()
- raise AbnormalShutdown("Could not perform graceful shutdown") \
- from exc
-
- def shutdown(self, has_quit: bool = False,
- hard: bool = False,
- timeout: Optional[int] = 30) -> None:
- """
- Terminate the VM (gracefully if possible) and perform cleanup.
- Cleanup will always be performed.
-
- If the VM has not yet been launched, or shutdown(), wait(), or kill()
- have already been called, this method does nothing.
-
- :param has_quit: When true, do not attempt to issue 'quit' QMP command.
- :param hard: When true, do not attempt graceful shutdown, and
- suppress the SIGKILL warning log message.
- :param timeout: Optional timeout in seconds for graceful shutdown.
- Default 30 seconds, A `None` value is an infinite wait.
- """
- if not self._launched:
- return
-
- try:
- if hard:
- self._user_killed = True
- self._hard_shutdown()
- else:
- self._do_shutdown(timeout, has_quit)
- finally:
- self._post_shutdown()
-
- def kill(self) -> None:
- """
- Terminate the VM forcefully, wait for it to exit, and perform cleanup.
- """
- self.shutdown(hard=True)
-
- def wait(self, timeout: Optional[int] = 30) -> None:
- """
- Wait for the VM to power off and perform post-shutdown cleanup.
-
- :param timeout: Optional timeout in seconds. Default 30 seconds.
- A value of `None` is an infinite wait.
- """
- self.shutdown(has_quit=True, timeout=timeout)
-
- def set_qmp_monitor(self, enabled: bool = True) -> None:
- """
- Set the QMP monitor.
-
- @param enabled: if False, qmp monitor options will be removed from
- the base arguments of the resulting QEMU command
- line. Default is True.
- @note: call this function before launch().
- """
- self._qmp_set = enabled
-
- @property
- def _qmp(self) -> qmp.QEMUMonitorProtocol:
- if self._qmp_connection is None:
- raise QEMUMachineError("Attempt to access QMP with no connection")
- return self._qmp_connection
-
- @classmethod
- def _qmp_args(cls, _conv_keys: bool = True, **args: Any) -> Dict[str, Any]:
- qmp_args = dict()
- for key, value in args.items():
- if _conv_keys:
- qmp_args[key.replace('_', '-')] = value
- else:
- qmp_args[key] = value
- return qmp_args
-
- def qmp(self, cmd: str,
- conv_keys: bool = True,
- **args: Any) -> QMPMessage:
- """
- Invoke a QMP command and return the response dict
- """
- qmp_args = self._qmp_args(conv_keys, **args)
- return self._qmp.cmd(cmd, args=qmp_args)
-
- def command(self, cmd: str,
- conv_keys: bool = True,
- **args: Any) -> QMPReturnValue:
- """
- Invoke a QMP command.
- On success return the response dict.
- On failure raise an exception.
- """
- qmp_args = self._qmp_args(conv_keys, **args)
- return self._qmp.command(cmd, **qmp_args)
-
- def get_qmp_event(self, wait: bool = False) -> Optional[QMPMessage]:
- """
- Poll for one queued QMP events and return it
- """
- if self._events:
- return self._events.pop(0)
- return self._qmp.pull_event(wait=wait)
-
- def get_qmp_events(self, wait: bool = False) -> List[QMPMessage]:
- """
- Poll for queued QMP events and return a list of dicts
- """
- events = self._qmp.get_events(wait=wait)
- events.extend(self._events)
- del self._events[:]
- self._qmp.clear_events()
- return events
-
- @staticmethod
- def event_match(event: Any, match: Optional[Any]) -> bool:
- """
- Check if an event matches optional match criteria.
-
- The match criteria takes the form of a matching subdict. The event is
- checked to be a superset of the subdict, recursively, with matching
- values whenever the subdict values are not None.
-
- This has a limitation that you cannot explicitly check for None values.
-
- Examples, with the subdict queries on the left:
- - None matches any object.
- - {"foo": None} matches {"foo": {"bar": 1}}
- - {"foo": None} matches {"foo": 5}
- - {"foo": {"abc": None}} does not match {"foo": {"bar": 1}}
- - {"foo": {"rab": 2}} matches {"foo": {"bar": 1, "rab": 2}}
- """
- if match is None:
- return True
-
- try:
- for key in match:
- if key in event:
- if not QEMUMachine.event_match(event[key], match[key]):
- return False
- else:
- return False
- return True
- except TypeError:
- # either match or event wasn't iterable (not a dict)
- return bool(match == event)
-
- def event_wait(self, name: str,
- timeout: float = 60.0,
- match: Optional[QMPMessage] = None) -> Optional[QMPMessage]:
- """
- event_wait waits for and returns a named event from QMP with a timeout.
-
- name: The event to wait for.
- timeout: QEMUMonitorProtocol.pull_event timeout parameter.
- match: Optional match criteria. See event_match for details.
- """
- return self.events_wait([(name, match)], timeout)
-
- def events_wait(self,
- events: Sequence[Tuple[str, Any]],
- timeout: float = 60.0) -> Optional[QMPMessage]:
- """
- events_wait waits for and returns a single named event from QMP.
- In the case of multiple qualifying events, this function returns the
- first one.
-
- :param events: A sequence of (name, match_criteria) tuples.
- The match criteria are optional and may be None.
- See event_match for details.
- :param timeout: Optional timeout, in seconds.
- See QEMUMonitorProtocol.pull_event.
-
- :raise QMPTimeoutError: If timeout was non-zero and no matching events
- were found.
- :return: A QMP event matching the filter criteria.
- If timeout was 0 and no event matched, None.
- """
- def _match(event: QMPMessage) -> bool:
- for name, match in events:
- if event['event'] == name and self.event_match(event, match):
- return True
- return False
-
- event: Optional[QMPMessage]
-
- # Search cached events
- for event in self._events:
- if _match(event):
- self._events.remove(event)
- return event
-
- # Poll for new events
- while True:
- event = self._qmp.pull_event(wait=timeout)
- if event is None:
- # NB: None is only returned when timeout is false-ish.
- # Timeouts raise QMPTimeoutError instead!
- break
- if _match(event):
- return event
- self._events.append(event)
-
- return None
-
- def get_log(self) -> Optional[str]:
- """
- After self.shutdown or failed qemu execution, this returns the output
- of the qemu process.
- """
- return self._iolog
-
- def add_args(self, *args: str) -> None:
- """
- Adds to the list of extra arguments to be given to the QEMU binary
- """
- self._args.extend(args)
-
- def set_machine(self, machine_type: str) -> None:
- """
- Sets the machine type
-
- If set, the machine type will be added to the base arguments
- of the resulting QEMU command line.
- """
- self._machine = machine_type
-
- def set_console(self,
- device_type: Optional[str] = None,
- console_index: int = 0) -> None:
- """
- Sets the device type for a console device
-
- If set, the console device and a backing character device will
- be added to the base arguments of the resulting QEMU command
- line.
-
- This is a convenience method that will either use the provided
- device type, or default to a "-serial chardev:console" command
- line argument.
-
- The actual setting of command line arguments will be be done at
- machine launch time, as it depends on the temporary directory
- to be created.
-
- @param device_type: the device type, such as "isa-serial". If
- None is given (the default value) a "-serial
- chardev:console" command line argument will
- be used instead, resorting to the machine's
- default device type.
- @param console_index: the index of the console device to use.
- If not zero, the command line will create
- 'index - 1' consoles and connect them to
- the 'null' backing character device.
- """
- self._console_set = True
- self._console_device_type = device_type
- self._console_index = console_index
-
- @property
- def console_socket(self) -> socket.socket:
- """
- Returns a socket connected to the console
- """
- if self._console_socket is None:
- self._console_socket = console_socket.ConsoleSocket(
- self._console_address,
- file=self._console_log_path,
- drain=self._drain_console)
- return self._console_socket
-
- @property
- def temp_dir(self) -> str:
- """
- Returns a temporary directory to be used for this machine
- """
- if self._temp_dir is None:
- self._temp_dir = tempfile.mkdtemp(prefix="qemu-machine-",
- dir=self._base_temp_dir)
- return self._temp_dir
--- /dev/null
+[flake8]
+extend-ignore = E722 # Pylint handles this, but smarter.
\ No newline at end of file
--- /dev/null
+"""
+QEMU development and testing library.
+
+This library provides a few high-level classes for driving QEMU from a
+test suite, not intended for production use.
+
+- QEMUMachine: Configure and Boot a QEMU VM
+ - QEMUQtestMachine: VM class, with a qtest socket.
+
+- QEMUQtestProtocol: Connect to, send/receive qtest messages.
+"""
+
+# Copyright (C) 2020-2021 John Snow for Red Hat Inc.
+# Copyright (C) 2015-2016 Red Hat Inc.
+# Copyright (C) 2012 IBM Corp.
+#
+# Authors:
+# John Snow <jsnow@redhat.com>
+# Fam Zheng <fam@euphon.net>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+
+from .machine import QEMUMachine
+from .qtest import QEMUQtestMachine, QEMUQtestProtocol
+
+
+__all__ = (
+ 'QEMUMachine',
+ 'QEMUQtestProtocol',
+ 'QEMUQtestMachine',
+)
--- /dev/null
+"""
+QEMU Console Socket Module:
+
+This python module implements a ConsoleSocket object,
+which can drain a socket and optionally dump the bytes to file.
+"""
+# Copyright 2020 Linaro
+#
+# Authors:
+# Robert Foley <robert.foley@linaro.org>
+#
+# This code is licensed under the GPL version 2 or later. See
+# the COPYING file in the top-level directory.
+#
+
+from collections import deque
+import socket
+import threading
+import time
+from typing import Deque, Optional
+
+
+class ConsoleSocket(socket.socket):
+ """
+ ConsoleSocket represents a socket attached to a char device.
+
+ Optionally (if drain==True), drains the socket and places the bytes
+ into an in memory buffer for later processing.
+
+ Optionally a file path can be passed in and we will also
+ dump the characters to this file for debugging purposes.
+ """
+ def __init__(self, address: str, file: Optional[str] = None,
+ drain: bool = False):
+ self._recv_timeout_sec = 300.0
+ self._sleep_time = 0.5
+ self._buffer: Deque[int] = deque()
+ socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
+ self.connect(address)
+ self._logfile = None
+ if file:
+ # pylint: disable=consider-using-with
+ self._logfile = open(file, "bw")
+ self._open = True
+ self._drain_thread = None
+ if drain:
+ self._drain_thread = self._thread_start()
+
+ def __repr__(self) -> str:
+ tmp = super().__repr__()
+ tmp = tmp.rstrip(">")
+ tmp = "%s, logfile=%s, drain_thread=%s>" % (tmp, self._logfile,
+ self._drain_thread)
+ return tmp
+
+ def _drain_fn(self) -> None:
+ """Drains the socket and runs while the socket is open."""
+ while self._open:
+ try:
+ self._drain_socket()
+ except socket.timeout:
+ # The socket is expected to timeout since we set a
+ # short timeout to allow the thread to exit when
+ # self._open is set to False.
+ time.sleep(self._sleep_time)
+
+ def _thread_start(self) -> threading.Thread:
+ """Kick off a thread to drain the socket."""
+ # Configure socket to not block and timeout.
+ # This allows our drain thread to not block
+ # on recieve and exit smoothly.
+ socket.socket.setblocking(self, False)
+ socket.socket.settimeout(self, 1)
+ drain_thread = threading.Thread(target=self._drain_fn)
+ drain_thread.daemon = True
+ drain_thread.start()
+ return drain_thread
+
+ def close(self) -> None:
+ """Close the base object and wait for the thread to terminate"""
+ if self._open:
+ self._open = False
+ if self._drain_thread is not None:
+ thread, self._drain_thread = self._drain_thread, None
+ thread.join()
+ socket.socket.close(self)
+ if self._logfile:
+ self._logfile.close()
+ self._logfile = None
+
+ def _drain_socket(self) -> None:
+ """process arriving characters into in memory _buffer"""
+ data = socket.socket.recv(self, 1)
+ if self._logfile:
+ self._logfile.write(data)
+ self._logfile.flush()
+ self._buffer.extend(data)
+
+ def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
+ """Return chars from in memory buffer.
+ Maintains the same API as socket.socket.recv.
+ """
+ if self._drain_thread is None:
+ # Not buffering the socket, pass thru to socket.
+ return socket.socket.recv(self, bufsize, flags)
+ assert not flags, "Cannot pass flags to recv() in drained mode"
+ start_time = time.time()
+ while len(self._buffer) < bufsize:
+ time.sleep(self._sleep_time)
+ elapsed_sec = time.time() - start_time
+ if elapsed_sec > self._recv_timeout_sec:
+ raise socket.timeout
+ return bytes((self._buffer.popleft() for i in range(bufsize)))
+
+ def setblocking(self, value: bool) -> None:
+ """When not draining we pass thru to the socket,
+ since when draining we control socket blocking.
+ """
+ if self._drain_thread is None:
+ socket.socket.setblocking(self, value)
+
+ def settimeout(self, value: Optional[float]) -> None:
+ """When not draining we pass thru to the socket,
+ since when draining we control the timeout.
+ """
+ if value is not None:
+ self._recv_timeout_sec = value
+ if self._drain_thread is None:
+ socket.socket.settimeout(self, value)
--- /dev/null
+"""
+QEMU machine module:
+
+The machine module primarily provides the QEMUMachine class,
+which provides facilities for managing the lifetime of a QEMU VM.
+"""
+
+# Copyright (C) 2015-2016 Red Hat Inc.
+# Copyright (C) 2012 IBM Corp.
+#
+# Authors:
+# Fam Zheng <famz@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+# Based on qmp.py.
+#
+
+import errno
+from itertools import chain
+import logging
+import os
+import shutil
+import signal
+import socket
+import subprocess
+import tempfile
+from types import TracebackType
+from typing import (
+ Any,
+ BinaryIO,
+ Dict,
+ List,
+ Optional,
+ Sequence,
+ Tuple,
+ Type,
+)
+
+from qemu.qmp import (
+ QEMUMonitorProtocol,
+ QMPMessage,
+ QMPReturnValue,
+ SocketAddrT,
+)
+
+from . import console_socket
+
+
+LOG = logging.getLogger(__name__)
+
+
+class QEMUMachineError(Exception):
+ """
+ Exception called when an error in QEMUMachine happens.
+ """
+
+
+class QEMUMachineAddDeviceError(QEMUMachineError):
+ """
+ Exception raised when a request to add a device can not be fulfilled
+
+ The failures are caused by limitations, lack of information or conflicting
+ requests on the QEMUMachine methods. This exception does not represent
+ failures reported by the QEMU binary itself.
+ """
+
+
+class AbnormalShutdown(QEMUMachineError):
+ """
+ Exception raised when a graceful shutdown was requested, but not performed.
+ """
+
+
+class QEMUMachine:
+ """
+ A QEMU VM.
+
+ Use this object as a context manager to ensure
+ the QEMU process terminates::
+
+ with VM(binary) as vm:
+ ...
+ # vm is guaranteed to be shut down here
+ """
+
+ def __init__(self,
+ binary: str,
+ args: Sequence[str] = (),
+ wrapper: Sequence[str] = (),
+ name: Optional[str] = None,
+ base_temp_dir: str = "/var/tmp",
+ monitor_address: Optional[SocketAddrT] = None,
+ socket_scm_helper: Optional[str] = None,
+ sock_dir: Optional[str] = None,
+ drain_console: bool = False,
+ console_log: Optional[str] = None):
+ '''
+ Initialize a QEMUMachine
+
+ @param binary: path to the qemu binary
+ @param args: list of extra arguments
+ @param wrapper: list of arguments used as prefix to qemu binary
+ @param name: prefix for socket and log file names (default: qemu-PID)
+ @param base_temp_dir: default location where temp files are created
+ @param monitor_address: address for QMP monitor
+ @param socket_scm_helper: helper program, required for send_fd_scm()
+ @param sock_dir: where to create socket (defaults to base_temp_dir)
+ @param drain_console: (optional) True to drain console socket to buffer
+ @param console_log: (optional) path to console log file
+ @note: Qemu process is not started until launch() is used.
+ '''
+ # Direct user configuration
+
+ self._binary = binary
+ self._args = list(args)
+ self._wrapper = wrapper
+
+ self._name = name or "qemu-%d" % os.getpid()
+ self._base_temp_dir = base_temp_dir
+ self._sock_dir = sock_dir or self._base_temp_dir
+ self._socket_scm_helper = socket_scm_helper
+
+ if monitor_address is not None:
+ self._monitor_address = monitor_address
+ self._remove_monitor_sockfile = False
+ else:
+ self._monitor_address = os.path.join(
+ self._sock_dir, f"{self._name}-monitor.sock"
+ )
+ self._remove_monitor_sockfile = True
+
+ self._console_log_path = console_log
+ if self._console_log_path:
+ # In order to log the console, buffering needs to be enabled.
+ self._drain_console = True
+ else:
+ self._drain_console = drain_console
+
+ # Runstate
+ self._qemu_log_path: Optional[str] = None
+ self._qemu_log_file: Optional[BinaryIO] = None
+ self._popen: Optional['subprocess.Popen[bytes]'] = None
+ self._events: List[QMPMessage] = []
+ self._iolog: Optional[str] = None
+ self._qmp_set = True # Enable QMP monitor by default.
+ self._qmp_connection: Optional[QEMUMonitorProtocol] = None
+ self._qemu_full_args: Tuple[str, ...] = ()
+ self._temp_dir: Optional[str] = None
+ self._launched = False
+ self._machine: Optional[str] = None
+ self._console_index = 0
+ self._console_set = False
+ self._console_device_type: Optional[str] = None
+ self._console_address = os.path.join(
+ self._sock_dir, f"{self._name}-console.sock"
+ )
+ self._console_socket: Optional[socket.socket] = None
+ self._remove_files: List[str] = []
+ self._user_killed = False
+
+ def __enter__(self) -> 'QEMUMachine':
+ return self
+
+ def __exit__(self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType]) -> None:
+ self.shutdown()
+
+ def add_monitor_null(self) -> None:
+ """
+ This can be used to add an unused monitor instance.
+ """
+ self._args.append('-monitor')
+ self._args.append('null')
+
+ def add_fd(self, fd: int, fdset: int,
+ opaque: str, opts: str = '') -> 'QEMUMachine':
+ """
+ Pass a file descriptor to the VM
+ """
+ options = ['fd=%d' % fd,
+ 'set=%d' % fdset,
+ 'opaque=%s' % opaque]
+ if opts:
+ options.append(opts)
+
+ # This did not exist before 3.4, but since then it is
+ # mandatory for our purpose
+ if hasattr(os, 'set_inheritable'):
+ os.set_inheritable(fd, True)
+
+ self._args.append('-add-fd')
+ self._args.append(','.join(options))
+ return self
+
+ def send_fd_scm(self, fd: Optional[int] = None,
+ file_path: Optional[str] = None) -> int:
+ """
+ Send an fd or file_path to socket_scm_helper.
+
+ Exactly one of fd and file_path must be given.
+ If it is file_path, the helper will open that file and pass its own fd.
+ """
+ # In iotest.py, the qmp should always use unix socket.
+ assert self._qmp.is_scm_available()
+ if self._socket_scm_helper is None:
+ raise QEMUMachineError("No path to socket_scm_helper set")
+ if not os.path.exists(self._socket_scm_helper):
+ raise QEMUMachineError("%s does not exist" %
+ self._socket_scm_helper)
+
+ # This did not exist before 3.4, but since then it is
+ # mandatory for our purpose
+ if hasattr(os, 'set_inheritable'):
+ os.set_inheritable(self._qmp.get_sock_fd(), True)
+ if fd is not None:
+ os.set_inheritable(fd, True)
+
+ fd_param = ["%s" % self._socket_scm_helper,
+ "%d" % self._qmp.get_sock_fd()]
+
+ if file_path is not None:
+ assert fd is None
+ fd_param.append(file_path)
+ else:
+ assert fd is not None
+ fd_param.append(str(fd))
+
+ proc = subprocess.run(
+ fd_param,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ check=False,
+ close_fds=False,
+ )
+ if proc.stdout:
+ LOG.debug(proc.stdout)
+
+ return proc.returncode
+
+ @staticmethod
+ def _remove_if_exists(path: str) -> None:
+ """
+ Remove file object at path if it exists
+ """
+ try:
+ os.remove(path)
+ except OSError as exception:
+ if exception.errno == errno.ENOENT:
+ return
+ raise
+
+ def is_running(self) -> bool:
+ """Returns true if the VM is running."""
+ return self._popen is not None and self._popen.poll() is None
+
+ @property
+ def _subp(self) -> 'subprocess.Popen[bytes]':
+ if self._popen is None:
+ raise QEMUMachineError('Subprocess pipe not present')
+ return self._popen
+
+ def exitcode(self) -> Optional[int]:
+ """Returns the exit code if possible, or None."""
+ if self._popen is None:
+ return None
+ return self._popen.poll()
+
+ def get_pid(self) -> Optional[int]:
+ """Returns the PID of the running process, or None."""
+ if not self.is_running():
+ return None
+ return self._subp.pid
+
+ def _load_io_log(self) -> None:
+ if self._qemu_log_path is not None:
+ with open(self._qemu_log_path, "r") as iolog:
+ self._iolog = iolog.read()
+
+ @property
+ def _base_args(self) -> List[str]:
+ args = ['-display', 'none', '-vga', 'none']
+
+ if self._qmp_set:
+ if isinstance(self._monitor_address, tuple):
+ moncdev = "socket,id=mon,host={},port={}".format(
+ *self._monitor_address
+ )
+ else:
+ moncdev = f"socket,id=mon,path={self._monitor_address}"
+ args.extend(['-chardev', moncdev, '-mon',
+ 'chardev=mon,mode=control'])
+
+ if self._machine is not None:
+ args.extend(['-machine', self._machine])
+ for _ in range(self._console_index):
+ args.extend(['-serial', 'null'])
+ if self._console_set:
+ chardev = ('socket,id=console,path=%s,server=on,wait=off' %
+ self._console_address)
+ args.extend(['-chardev', chardev])
+ if self._console_device_type is None:
+ args.extend(['-serial', 'chardev:console'])
+ else:
+ device = '%s,chardev=console' % self._console_device_type
+ args.extend(['-device', device])
+ return args
+
+ def _pre_launch(self) -> None:
+ self._qemu_log_path = os.path.join(self.temp_dir, self._name + ".log")
+
+ if self._console_set:
+ self._remove_files.append(self._console_address)
+
+ if self._qmp_set:
+ if self._remove_monitor_sockfile:
+ assert isinstance(self._monitor_address, str)
+ self._remove_files.append(self._monitor_address)
+ self._qmp_connection = QEMUMonitorProtocol(
+ self._monitor_address,
+ server=True,
+ nickname=self._name
+ )
+
+ # NOTE: Make sure any opened resources are *definitely* freed in
+ # _post_shutdown()!
+ # pylint: disable=consider-using-with
+ self._qemu_log_file = open(self._qemu_log_path, 'wb')
+
+ def _post_launch(self) -> None:
+ if self._qmp_connection:
+ self._qmp.accept()
+
+ def _post_shutdown(self) -> None:
+ """
+ Called to cleanup the VM instance after the process has exited.
+ May also be called after a failed launch.
+ """
+ # Comprehensive reset for the failed launch case:
+ self._early_cleanup()
+
+ if self._qmp_connection:
+ self._qmp.close()
+ self._qmp_connection = None
+
+ if self._qemu_log_file is not None:
+ self._qemu_log_file.close()
+ self._qemu_log_file = None
+
+ self._load_io_log()
+
+ self._qemu_log_path = None
+
+ if self._temp_dir is not None:
+ shutil.rmtree(self._temp_dir)
+ self._temp_dir = None
+
+ while len(self._remove_files) > 0:
+ self._remove_if_exists(self._remove_files.pop())
+
+ exitcode = self.exitcode()
+ if (exitcode is not None and exitcode < 0
+ and not (self._user_killed and exitcode == -signal.SIGKILL)):
+ msg = 'qemu received signal %i; command: "%s"'
+ if self._qemu_full_args:
+ command = ' '.join(self._qemu_full_args)
+ else:
+ command = ''
+ LOG.warning(msg, -int(exitcode), command)
+
+ self._user_killed = False
+ self._launched = False
+
+ def launch(self) -> None:
+ """
+ Launch the VM and make sure we cleanup and expose the
+ command line/output in case of exception
+ """
+
+ if self._launched:
+ raise QEMUMachineError('VM already launched')
+
+ self._iolog = None
+ self._qemu_full_args = ()
+ try:
+ self._launch()
+ self._launched = True
+ except:
+ self._post_shutdown()
+
+ LOG.debug('Error launching VM')
+ if self._qemu_full_args:
+ LOG.debug('Command: %r', ' '.join(self._qemu_full_args))
+ if self._iolog:
+ LOG.debug('Output: %r', self._iolog)
+ raise
+
+ def _launch(self) -> None:
+ """
+ Launch the VM and establish a QMP connection
+ """
+ self._pre_launch()
+ self._qemu_full_args = tuple(
+ chain(self._wrapper,
+ [self._binary],
+ self._base_args,
+ self._args)
+ )
+ LOG.debug('VM launch command: %r', ' '.join(self._qemu_full_args))
+
+ # Cleaning up of this subprocess is guaranteed by _do_shutdown.
+ # pylint: disable=consider-using-with
+ self._popen = subprocess.Popen(self._qemu_full_args,
+ stdin=subprocess.DEVNULL,
+ stdout=self._qemu_log_file,
+ stderr=subprocess.STDOUT,
+ shell=False,
+ close_fds=False)
+ self._post_launch()
+
+ def _early_cleanup(self) -> None:
+ """
+ Perform any cleanup that needs to happen before the VM exits.
+
+ May be invoked by both soft and hard shutdown in failover scenarios.
+ Called additionally by _post_shutdown for comprehensive cleanup.
+ """
+ # If we keep the console socket open, we may deadlock waiting
+ # for QEMU to exit, while QEMU is waiting for the socket to
+ # become writeable.
+ if self._console_socket is not None:
+ self._console_socket.close()
+ self._console_socket = None
+
+ def _hard_shutdown(self) -> None:
+ """
+ Perform early cleanup, kill the VM, and wait for it to terminate.
+
+ :raise subprocess.Timeout: When timeout is exceeds 60 seconds
+ waiting for the QEMU process to terminate.
+ """
+ self._early_cleanup()
+ self._subp.kill()
+ self._subp.wait(timeout=60)
+
+ def _soft_shutdown(self, timeout: Optional[int],
+ has_quit: bool = False) -> None:
+ """
+ Perform early cleanup, attempt to gracefully shut down the VM, and wait
+ for it to terminate.
+
+ :param timeout: Timeout in seconds for graceful shutdown.
+ A value of None is an infinite wait.
+ :param has_quit: When True, don't attempt to issue 'quit' QMP command
+
+ :raise ConnectionReset: On QMP communication errors
+ :raise subprocess.TimeoutExpired: When timeout is exceeded waiting for
+ the QEMU process to terminate.
+ """
+ self._early_cleanup()
+
+ if self._qmp_connection:
+ if not has_quit:
+ # Might raise ConnectionReset
+ self._qmp.cmd('quit')
+
+ # May raise subprocess.TimeoutExpired
+ self._subp.wait(timeout=timeout)
+
+ def _do_shutdown(self, timeout: Optional[int],
+ has_quit: bool = False) -> None:
+ """
+ Attempt to shutdown the VM gracefully; fallback to a hard shutdown.
+
+ :param timeout: Timeout in seconds for graceful shutdown.
+ A value of None is an infinite wait.
+ :param has_quit: When True, don't attempt to issue 'quit' QMP command
+
+ :raise AbnormalShutdown: When the VM could not be shut down gracefully.
+ The inner exception will likely be ConnectionReset or
+ subprocess.TimeoutExpired. In rare cases, non-graceful termination
+ may result in its own exceptions, likely subprocess.TimeoutExpired.
+ """
+ try:
+ self._soft_shutdown(timeout, has_quit)
+ except Exception as exc:
+ self._hard_shutdown()
+ raise AbnormalShutdown("Could not perform graceful shutdown") \
+ from exc
+
+ def shutdown(self, has_quit: bool = False,
+ hard: bool = False,
+ timeout: Optional[int] = 30) -> None:
+ """
+ Terminate the VM (gracefully if possible) and perform cleanup.
+ Cleanup will always be performed.
+
+ If the VM has not yet been launched, or shutdown(), wait(), or kill()
+ have already been called, this method does nothing.
+
+ :param has_quit: When true, do not attempt to issue 'quit' QMP command.
+ :param hard: When true, do not attempt graceful shutdown, and
+ suppress the SIGKILL warning log message.
+ :param timeout: Optional timeout in seconds for graceful shutdown.
+ Default 30 seconds, A `None` value is an infinite wait.
+ """
+ if not self._launched:
+ return
+
+ try:
+ if hard:
+ self._user_killed = True
+ self._hard_shutdown()
+ else:
+ self._do_shutdown(timeout, has_quit)
+ finally:
+ self._post_shutdown()
+
+ def kill(self) -> None:
+ """
+ Terminate the VM forcefully, wait for it to exit, and perform cleanup.
+ """
+ self.shutdown(hard=True)
+
+ def wait(self, timeout: Optional[int] = 30) -> None:
+ """
+ Wait for the VM to power off and perform post-shutdown cleanup.
+
+ :param timeout: Optional timeout in seconds. Default 30 seconds.
+ A value of `None` is an infinite wait.
+ """
+ self.shutdown(has_quit=True, timeout=timeout)
+
+ def set_qmp_monitor(self, enabled: bool = True) -> None:
+ """
+ Set the QMP monitor.
+
+ @param enabled: if False, qmp monitor options will be removed from
+ the base arguments of the resulting QEMU command
+ line. Default is True.
+ @note: call this function before launch().
+ """
+ self._qmp_set = enabled
+
+ @property
+ def _qmp(self) -> QEMUMonitorProtocol:
+ if self._qmp_connection is None:
+ raise QEMUMachineError("Attempt to access QMP with no connection")
+ return self._qmp_connection
+
+ @classmethod
+ def _qmp_args(cls, _conv_keys: bool = True, **args: Any) -> Dict[str, Any]:
+ qmp_args = dict()
+ for key, value in args.items():
+ if _conv_keys:
+ qmp_args[key.replace('_', '-')] = value
+ else:
+ qmp_args[key] = value
+ return qmp_args
+
+ def qmp(self, cmd: str,
+ conv_keys: bool = True,
+ **args: Any) -> QMPMessage:
+ """
+ Invoke a QMP command and return the response dict
+ """
+ qmp_args = self._qmp_args(conv_keys, **args)
+ return self._qmp.cmd(cmd, args=qmp_args)
+
+ def command(self, cmd: str,
+ conv_keys: bool = True,
+ **args: Any) -> QMPReturnValue:
+ """
+ Invoke a QMP command.
+ On success return the response dict.
+ On failure raise an exception.
+ """
+ qmp_args = self._qmp_args(conv_keys, **args)
+ return self._qmp.command(cmd, **qmp_args)
+
+ def get_qmp_event(self, wait: bool = False) -> Optional[QMPMessage]:
+ """
+ Poll for one queued QMP events and return it
+ """
+ if self._events:
+ return self._events.pop(0)
+ return self._qmp.pull_event(wait=wait)
+
+ def get_qmp_events(self, wait: bool = False) -> List[QMPMessage]:
+ """
+ Poll for queued QMP events and return a list of dicts
+ """
+ events = self._qmp.get_events(wait=wait)
+ events.extend(self._events)
+ del self._events[:]
+ self._qmp.clear_events()
+ return events
+
+ @staticmethod
+ def event_match(event: Any, match: Optional[Any]) -> bool:
+ """
+ Check if an event matches optional match criteria.
+
+ The match criteria takes the form of a matching subdict. The event is
+ checked to be a superset of the subdict, recursively, with matching
+ values whenever the subdict values are not None.
+
+ This has a limitation that you cannot explicitly check for None values.
+
+ Examples, with the subdict queries on the left:
+ - None matches any object.
+ - {"foo": None} matches {"foo": {"bar": 1}}
+ - {"foo": None} matches {"foo": 5}
+ - {"foo": {"abc": None}} does not match {"foo": {"bar": 1}}
+ - {"foo": {"rab": 2}} matches {"foo": {"bar": 1, "rab": 2}}
+ """
+ if match is None:
+ return True
+
+ try:
+ for key in match:
+ if key in event:
+ if not QEMUMachine.event_match(event[key], match[key]):
+ return False
+ else:
+ return False
+ return True
+ except TypeError:
+ # either match or event wasn't iterable (not a dict)
+ return bool(match == event)
+
+ def event_wait(self, name: str,
+ timeout: float = 60.0,
+ match: Optional[QMPMessage] = None) -> Optional[QMPMessage]:
+ """
+ event_wait waits for and returns a named event from QMP with a timeout.
+
+ name: The event to wait for.
+ timeout: QEMUMonitorProtocol.pull_event timeout parameter.
+ match: Optional match criteria. See event_match for details.
+ """
+ return self.events_wait([(name, match)], timeout)
+
+ def events_wait(self,
+ events: Sequence[Tuple[str, Any]],
+ timeout: float = 60.0) -> Optional[QMPMessage]:
+ """
+ events_wait waits for and returns a single named event from QMP.
+ In the case of multiple qualifying events, this function returns the
+ first one.
+
+ :param events: A sequence of (name, match_criteria) tuples.
+ The match criteria are optional and may be None.
+ See event_match for details.
+ :param timeout: Optional timeout, in seconds.
+ See QEMUMonitorProtocol.pull_event.
+
+ :raise QMPTimeoutError: If timeout was non-zero and no matching events
+ were found.
+ :return: A QMP event matching the filter criteria.
+ If timeout was 0 and no event matched, None.
+ """
+ def _match(event: QMPMessage) -> bool:
+ for name, match in events:
+ if event['event'] == name and self.event_match(event, match):
+ return True
+ return False
+
+ event: Optional[QMPMessage]
+
+ # Search cached events
+ for event in self._events:
+ if _match(event):
+ self._events.remove(event)
+ return event
+
+ # Poll for new events
+ while True:
+ event = self._qmp.pull_event(wait=timeout)
+ if event is None:
+ # NB: None is only returned when timeout is false-ish.
+ # Timeouts raise QMPTimeoutError instead!
+ break
+ if _match(event):
+ return event
+ self._events.append(event)
+
+ return None
+
+ def get_log(self) -> Optional[str]:
+ """
+ After self.shutdown or failed qemu execution, this returns the output
+ of the qemu process.
+ """
+ return self._iolog
+
+ def add_args(self, *args: str) -> None:
+ """
+ Adds to the list of extra arguments to be given to the QEMU binary
+ """
+ self._args.extend(args)
+
+ def set_machine(self, machine_type: str) -> None:
+ """
+ Sets the machine type
+
+ If set, the machine type will be added to the base arguments
+ of the resulting QEMU command line.
+ """
+ self._machine = machine_type
+
+ def set_console(self,
+ device_type: Optional[str] = None,
+ console_index: int = 0) -> None:
+ """
+ Sets the device type for a console device
+
+ If set, the console device and a backing character device will
+ be added to the base arguments of the resulting QEMU command
+ line.
+
+ This is a convenience method that will either use the provided
+ device type, or default to a "-serial chardev:console" command
+ line argument.
+
+ The actual setting of command line arguments will be be done at
+ machine launch time, as it depends on the temporary directory
+ to be created.
+
+ @param device_type: the device type, such as "isa-serial". If
+ None is given (the default value) a "-serial
+ chardev:console" command line argument will
+ be used instead, resorting to the machine's
+ default device type.
+ @param console_index: the index of the console device to use.
+ If not zero, the command line will create
+ 'index - 1' consoles and connect them to
+ the 'null' backing character device.
+ """
+ self._console_set = True
+ self._console_device_type = device_type
+ self._console_index = console_index
+
+ @property
+ def console_socket(self) -> socket.socket:
+ """
+ Returns a socket connected to the console
+ """
+ if self._console_socket is None:
+ self._console_socket = console_socket.ConsoleSocket(
+ self._console_address,
+ file=self._console_log_path,
+ drain=self._drain_console)
+ return self._console_socket
+
+ @property
+ def temp_dir(self) -> str:
+ """
+ Returns a temporary directory to be used for this machine
+ """
+ if self._temp_dir is None:
+ self._temp_dir = tempfile.mkdtemp(prefix="qemu-machine-",
+ dir=self._base_temp_dir)
+ return self._temp_dir
--- /dev/null
+[MASTER]
+
+[MESSAGES CONTROL]
+
+# Disable the message, report, category or checker with the given id(s). You
+# can either give multiple identifiers separated by comma (,) or put this
+# option multiple times (only on the command line, not in the configuration
+# file where it should appear only once). You can also use "--disable=all" to
+# disable everything first and then reenable specific checks. For example, if
+# you want to run only the similarities checker, you can use "--disable=all
+# --enable=similarities". If you want to run only the classes checker, but have
+# no Warning level messages displayed, use "--disable=all --enable=classes
+# --disable=W".
+disable=too-many-arguments,
+ too-many-instance-attributes,
+ too-many-public-methods,
+
+[REPORTS]
+
+[REFACTORING]
+
+[MISCELLANEOUS]
+
+[LOGGING]
+
+[BASIC]
+
+# Good variable names which should always be accepted, separated by a comma.
+good-names=i,
+ j,
+ k,
+ ex,
+ Run,
+ _,
+ fd,
+ c,
+[VARIABLES]
+
+[STRING]
+
+[SPELLING]
+
+[FORMAT]
+
+[SIMILARITIES]
+
+# Ignore imports when computing similarities.
+ignore-imports=yes
+
+[TYPECHECK]
+
+[CLASSES]
+
+[IMPORTS]
+
+[DESIGN]
+
+[EXCEPTIONS]
--- /dev/null
+"""
+QEMU qtest library
+
+qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
+offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
+subclass of QEMUMachine, respectively.
+"""
+
+# Copyright (C) 2015 Red Hat Inc.
+#
+# Authors:
+# Fam Zheng <famz@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+# Based on qmp.py.
+#
+
+import os
+import socket
+from typing import (
+ List,
+ Optional,
+ Sequence,
+ TextIO,
+)
+
+from qemu.qmp import SocketAddrT
+
+from .machine import QEMUMachine
+
+
+class QEMUQtestProtocol:
+ """
+ QEMUQtestProtocol implements a connection to a qtest socket.
+
+ :param address: QEMU address, can be either a unix socket path (string)
+ or a tuple in the form ( address, port ) for a TCP
+ connection
+ :param server: server mode, listens on the socket (bool)
+ :raise socket.error: on socket connection errors
+
+ .. note::
+ No conection is estabalished by __init__(), this is done
+ by the connect() or accept() methods.
+ """
+ def __init__(self, address: SocketAddrT,
+ server: bool = False):
+ self._address = address
+ self._sock = self._get_sock()
+ self._sockfile: Optional[TextIO] = None
+ if server:
+ self._sock.bind(self._address)
+ self._sock.listen(1)
+
+ def _get_sock(self) -> socket.socket:
+ if isinstance(self._address, tuple):
+ family = socket.AF_INET
+ else:
+ family = socket.AF_UNIX
+ return socket.socket(family, socket.SOCK_STREAM)
+
+ def connect(self) -> None:
+ """
+ Connect to the qtest socket.
+
+ @raise socket.error on socket connection errors
+ """
+ self._sock.connect(self._address)
+ self._sockfile = self._sock.makefile(mode='r')
+
+ def accept(self) -> None:
+ """
+ Await connection from QEMU.
+
+ @raise socket.error on socket connection errors
+ """
+ self._sock, _ = self._sock.accept()
+ self._sockfile = self._sock.makefile(mode='r')
+
+ def cmd(self, qtest_cmd: str) -> str:
+ """
+ Send a qtest command on the wire.
+
+ @param qtest_cmd: qtest command text to be sent
+ """
+ assert self._sockfile is not None
+ self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
+ resp = self._sockfile.readline()
+ return resp
+
+ def close(self) -> None:
+ """
+ Close this socket.
+ """
+ self._sock.close()
+ if self._sockfile:
+ self._sockfile.close()
+ self._sockfile = None
+
+ def settimeout(self, timeout: Optional[float]) -> None:
+ """Set a timeout, in seconds."""
+ self._sock.settimeout(timeout)
+
+
+class QEMUQtestMachine(QEMUMachine):
+ """
+ A QEMU VM, with a qtest socket available.
+ """
+
+ def __init__(self,
+ binary: str,
+ args: Sequence[str] = (),
+ name: Optional[str] = None,
+ base_temp_dir: str = "/var/tmp",
+ socket_scm_helper: Optional[str] = None,
+ sock_dir: Optional[str] = None):
+ if name is None:
+ name = "qemu-%d" % os.getpid()
+ if sock_dir is None:
+ sock_dir = base_temp_dir
+ super().__init__(binary, args, name=name, base_temp_dir=base_temp_dir,
+ socket_scm_helper=socket_scm_helper,
+ sock_dir=sock_dir)
+ self._qtest: Optional[QEMUQtestProtocol] = None
+ self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
+
+ @property
+ def _base_args(self) -> List[str]:
+ args = super()._base_args
+ args.extend([
+ '-qtest', f"unix:path={self._qtest_path}",
+ '-accel', 'qtest'
+ ])
+ return args
+
+ def _pre_launch(self) -> None:
+ super()._pre_launch()
+ self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
+
+ def _post_launch(self) -> None:
+ assert self._qtest is not None
+ super()._post_launch()
+ self._qtest.accept()
+
+ def _post_shutdown(self) -> None:
+ super()._post_shutdown()
+ self._remove_if_exists(self._qtest_path)
+
+ def qtest(self, cmd: str) -> str:
+ """
+ Send a qtest command to the guest.
+
+ :param cmd: qtest command to send
+ :return: qtest server response
+ """
+ if self._qtest is None:
+ raise RuntimeError("qtest socket not available")
+ return self._qtest.cmd(cmd)
+++ /dev/null
-[MASTER]
-
-[MESSAGES CONTROL]
-
-# Disable the message, report, category or checker with the given id(s). You
-# can either give multiple identifiers separated by comma (,) or put this
-# option multiple times (only on the command line, not in the configuration
-# file where it should appear only once). You can also use "--disable=all" to
-# disable everything first and then reenable specific checks. For example, if
-# you want to run only the similarities checker, you can use "--disable=all
-# --enable=similarities". If you want to run only the classes checker, but have
-# no Warning level messages displayed, use "--disable=all --enable=classes
-# --disable=W".
-disable=too-many-arguments,
- too-many-instance-attributes,
- too-many-public-methods,
-
-[REPORTS]
-
-[REFACTORING]
-
-[MISCELLANEOUS]
-
-[LOGGING]
-
-[BASIC]
-
-# Good variable names which should always be accepted, separated by a comma.
-good-names=i,
- j,
- k,
- ex,
- Run,
- _,
- fd,
- c,
-[VARIABLES]
-
-[STRING]
-
-[SPELLING]
-
-[FORMAT]
-
-[SIMILARITIES]
-
-# Ignore imports when computing similarities.
-ignore-imports=yes
-
-[TYPECHECK]
-
-[CLASSES]
-
-[IMPORTS]
-
-[DESIGN]
-
-[EXCEPTIONS]
+++ /dev/null
-""" QEMU Monitor Protocol Python class """
-# Copyright (C) 2009, 2010 Red Hat Inc.
-#
-# Authors:
-# Luiz Capitulino <lcapitulino@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2. See
-# the COPYING file in the top-level directory.
-
-import errno
-import json
-import logging
-import socket
-from types import TracebackType
-from typing import (
- Any,
- Dict,
- List,
- Optional,
- TextIO,
- Tuple,
- Type,
- Union,
- cast,
-)
-
-
-# QMPMessage is a QMP Message of any kind.
-# e.g. {'yee': 'haw'}
-#
-# QMPReturnValue is the inner value of return values only.
-# {'return': {}} is the QMPMessage,
-# {} is the QMPReturnValue.
-QMPMessage = Dict[str, Any]
-QMPReturnValue = Dict[str, Any]
-
-InternetAddrT = Tuple[str, str]
-UnixAddrT = str
-SocketAddrT = Union[InternetAddrT, UnixAddrT]
-
-
-class QMPError(Exception):
- """
- QMP base exception
- """
-
-
-class QMPConnectError(QMPError):
- """
- QMP connection exception
- """
-
-
-class QMPCapabilitiesError(QMPError):
- """
- QMP negotiate capabilities exception
- """
-
-
-class QMPTimeoutError(QMPError):
- """
- QMP timeout exception
- """
-
-
-class QMPProtocolError(QMPError):
- """
- QMP protocol error; unexpected response
- """
-
-
-class QMPResponseError(QMPError):
- """
- Represents erroneous QMP monitor reply
- """
- def __init__(self, reply: QMPMessage):
- try:
- desc = reply['error']['desc']
- except KeyError:
- desc = reply
- super().__init__(desc)
- self.reply = reply
-
-
-class QEMUMonitorProtocol:
- """
- Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then
- allow to handle commands and events.
- """
-
- #: Logger object for debugging messages
- logger = logging.getLogger('QMP')
-
- def __init__(self, address: SocketAddrT,
- server: bool = False,
- nickname: Optional[str] = None):
- """
- Create a QEMUMonitorProtocol class.
-
- @param address: QEMU address, can be either a unix socket path (string)
- or a tuple in the form ( address, port ) for a TCP
- connection
- @param server: server mode listens on the socket (bool)
- @raise OSError on socket connection errors
- @note No connection is established, this is done by the connect() or
- accept() methods
- """
- self.__events: List[QMPMessage] = []
- self.__address = address
- self.__sock = self.__get_sock()
- self.__sockfile: Optional[TextIO] = None
- self._nickname = nickname
- if self._nickname:
- self.logger = logging.getLogger('QMP').getChild(self._nickname)
- if server:
- self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.__sock.bind(self.__address)
- self.__sock.listen(1)
-
- def __get_sock(self) -> socket.socket:
- if isinstance(self.__address, tuple):
- family = socket.AF_INET
- else:
- family = socket.AF_UNIX
- return socket.socket(family, socket.SOCK_STREAM)
-
- def __negotiate_capabilities(self) -> QMPMessage:
- greeting = self.__json_read()
- if greeting is None or "QMP" not in greeting:
- raise QMPConnectError
- # Greeting seems ok, negotiate capabilities
- resp = self.cmd('qmp_capabilities')
- if resp and "return" in resp:
- return greeting
- raise QMPCapabilitiesError
-
- def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
- assert self.__sockfile is not None
- while True:
- data = self.__sockfile.readline()
- if not data:
- return None
- # By definition, any JSON received from QMP is a QMPMessage,
- # and we are asserting only at static analysis time that it
- # has a particular shape.
- resp: QMPMessage = json.loads(data)
- if 'event' in resp:
- self.logger.debug("<<< %s", resp)
- self.__events.append(resp)
- if not only_event:
- continue
- return resp
-
- def __get_events(self, wait: Union[bool, float] = False) -> None:
- """
- Check for new events in the stream and cache them in __events.
-
- @param wait (bool): block until an event is available.
- @param wait (float): If wait is a float, treat it as a timeout value.
-
- @raise QMPTimeoutError: If a timeout float is provided and the timeout
- period elapses.
- @raise QMPConnectError: If wait is True but no events could be
- retrieved or if some other error occurred.
- """
-
- # Current timeout and blocking status
- current_timeout = self.__sock.gettimeout()
-
- # Check for new events regardless and pull them into the cache:
- self.__sock.settimeout(0) # i.e. setblocking(False)
- try:
- self.__json_read()
- except OSError as err:
- # EAGAIN: No data available; not critical
- if err.errno != errno.EAGAIN:
- raise
- finally:
- self.__sock.settimeout(current_timeout)
-
- # Wait for new events, if needed.
- # if wait is 0.0, this means "no wait" and is also implicitly false.
- if not self.__events and wait:
- if isinstance(wait, float):
- self.__sock.settimeout(wait)
- try:
- ret = self.__json_read(only_event=True)
- except socket.timeout as err:
- raise QMPTimeoutError("Timeout waiting for event") from err
- except Exception as err:
- msg = "Error while reading from socket"
- raise QMPConnectError(msg) from err
- finally:
- self.__sock.settimeout(current_timeout)
-
- if ret is None:
- raise QMPConnectError("Error while reading from socket")
-
- def __enter__(self) -> 'QEMUMonitorProtocol':
- # Implement context manager enter function.
- return self
-
- def __exit__(self,
- # pylint: disable=duplicate-code
- # see https://github.com/PyCQA/pylint/issues/3619
- exc_type: Optional[Type[BaseException]],
- exc_val: Optional[BaseException],
- exc_tb: Optional[TracebackType]) -> None:
- # Implement context manager exit function.
- self.close()
-
- def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
- """
- Connect to the QMP Monitor and perform capabilities negotiation.
-
- @return QMP greeting dict, or None if negotiate is false
- @raise OSError on socket connection errors
- @raise QMPConnectError if the greeting is not received
- @raise QMPCapabilitiesError if fails to negotiate capabilities
- """
- self.__sock.connect(self.__address)
- self.__sockfile = self.__sock.makefile(mode='r')
- if negotiate:
- return self.__negotiate_capabilities()
- return None
-
- def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
- """
- Await connection from QMP Monitor and perform capabilities negotiation.
-
- @param timeout: timeout in seconds (nonnegative float number, or
- None). The value passed will set the behavior of the
- underneath QMP socket as described in [1].
- Default value is set to 15.0.
- @return QMP greeting dict
- @raise OSError on socket connection errors
- @raise QMPConnectError if the greeting is not received
- @raise QMPCapabilitiesError if fails to negotiate capabilities
-
- [1]
- https://docs.python.org/3/library/socket.html#socket.socket.settimeout
- """
- self.__sock.settimeout(timeout)
- self.__sock, _ = self.__sock.accept()
- self.__sockfile = self.__sock.makefile(mode='r')
- return self.__negotiate_capabilities()
-
- def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
- """
- Send a QMP command to the QMP Monitor.
-
- @param qmp_cmd: QMP command to be sent as a Python dict
- @return QMP response as a Python dict
- """
- self.logger.debug(">>> %s", qmp_cmd)
- self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8'))
- resp = self.__json_read()
- if resp is None:
- raise QMPConnectError("Unexpected empty reply from server")
- self.logger.debug("<<< %s", resp)
- return resp
-
- def cmd(self, name: str,
- args: Optional[Dict[str, Any]] = None,
- cmd_id: Optional[Any] = None) -> QMPMessage:
- """
- Build a QMP command and send it to the QMP Monitor.
-
- @param name: command name (string)
- @param args: command arguments (dict)
- @param cmd_id: command id (dict, list, string or int)
- """
- qmp_cmd: QMPMessage = {'execute': name}
- if args:
- qmp_cmd['arguments'] = args
- if cmd_id:
- qmp_cmd['id'] = cmd_id
- return self.cmd_obj(qmp_cmd)
-
- def command(self, cmd: str, **kwds: Any) -> QMPReturnValue:
- """
- Build and send a QMP command to the monitor, report errors if any
- """
- ret = self.cmd(cmd, kwds)
- if 'error' in ret:
- raise QMPResponseError(ret)
- if 'return' not in ret:
- raise QMPProtocolError(
- "'return' key not found in QMP response '{}'".format(str(ret))
- )
- return cast(QMPReturnValue, ret['return'])
-
- def pull_event(self,
- wait: Union[bool, float] = False) -> Optional[QMPMessage]:
- """
- Pulls a single event.
-
- @param wait (bool): block until an event is available.
- @param wait (float): If wait is a float, treat it as a timeout value.
-
- @raise QMPTimeoutError: If a timeout float is provided and the timeout
- period elapses.
- @raise QMPConnectError: If wait is True but no events could be
- retrieved or if some other error occurred.
-
- @return The first available QMP event, or None.
- """
- self.__get_events(wait)
-
- if self.__events:
- return self.__events.pop(0)
- return None
-
- def get_events(self, wait: bool = False) -> List[QMPMessage]:
- """
- Get a list of available QMP events.
-
- @param wait (bool): block until an event is available.
- @param wait (float): If wait is a float, treat it as a timeout value.
-
- @raise QMPTimeoutError: If a timeout float is provided and the timeout
- period elapses.
- @raise QMPConnectError: If wait is True but no events could be
- retrieved or if some other error occurred.
-
- @return The list of available QMP events.
- """
- self.__get_events(wait)
- return self.__events
-
- def clear_events(self) -> None:
- """
- Clear current list of pending events.
- """
- self.__events = []
-
- def close(self) -> None:
- """
- Close the socket and socket file.
- """
- if self.__sock:
- self.__sock.close()
- if self.__sockfile:
- self.__sockfile.close()
-
- def settimeout(self, timeout: Optional[float]) -> None:
- """
- Set the socket timeout.
-
- @param timeout (float): timeout in seconds (non-zero), or None.
- @note This is a wrap around socket.settimeout
-
- @raise ValueError: if timeout was set to 0.
- """
- if timeout == 0:
- msg = "timeout cannot be 0; this engages non-blocking mode."
- msg += " Use 'None' instead to disable timeouts."
- raise ValueError(msg)
- self.__sock.settimeout(timeout)
-
- def get_sock_fd(self) -> int:
- """
- Get the socket file descriptor.
-
- @return The file descriptor number.
- """
- return self.__sock.fileno()
-
- def is_scm_available(self) -> bool:
- """
- Check if the socket allows for SCM_RIGHTS.
-
- @return True if SCM_RIGHTS is available, otherwise False.
- """
- return self.__sock.family == socket.AF_UNIX
--- /dev/null
+"""
+QEMU Monitor Protocol (QMP) development library & tooling.
+
+This package provides a fairly low-level class for communicating to QMP
+protocol servers, as implemented by QEMU, the QEMU Guest Agent, and the
+QEMU Storage Daemon. This library is not intended for production use.
+
+`QEMUMonitorProtocol` is the primary class of interest, and all errors
+raised derive from `QMPError`.
+"""
+
+# Copyright (C) 2009, 2010 Red Hat Inc.
+#
+# Authors:
+# Luiz Capitulino <lcapitulino@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+
+import errno
+import json
+import logging
+import socket
+from types import TracebackType
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ TextIO,
+ Tuple,
+ Type,
+ Union,
+ cast,
+)
+
+
+# QMPMessage is a QMP Message of any kind.
+# e.g. {'yee': 'haw'}
+#
+# QMPReturnValue is the inner value of return values only.
+# {'return': {}} is the QMPMessage,
+# {} is the QMPReturnValue.
+QMPMessage = Dict[str, Any]
+QMPReturnValue = Dict[str, Any]
+
+InternetAddrT = Tuple[str, str]
+UnixAddrT = str
+SocketAddrT = Union[InternetAddrT, UnixAddrT]
+
+
+class QMPError(Exception):
+ """
+ QMP base exception
+ """
+
+
+class QMPConnectError(QMPError):
+ """
+ QMP connection exception
+ """
+
+
+class QMPCapabilitiesError(QMPError):
+ """
+ QMP negotiate capabilities exception
+ """
+
+
+class QMPTimeoutError(QMPError):
+ """
+ QMP timeout exception
+ """
+
+
+class QMPProtocolError(QMPError):
+ """
+ QMP protocol error; unexpected response
+ """
+
+
+class QMPResponseError(QMPError):
+ """
+ Represents erroneous QMP monitor reply
+ """
+ def __init__(self, reply: QMPMessage):
+ try:
+ desc = reply['error']['desc']
+ except KeyError:
+ desc = reply
+ super().__init__(desc)
+ self.reply = reply
+
+
+class QEMUMonitorProtocol:
+ """
+ Provide an API to connect to QEMU via QEMU Monitor Protocol (QMP) and then
+ allow to handle commands and events.
+ """
+
+ #: Logger object for debugging messages
+ logger = logging.getLogger('QMP')
+
+ def __init__(self, address: SocketAddrT,
+ server: bool = False,
+ nickname: Optional[str] = None):
+ """
+ Create a QEMUMonitorProtocol class.
+
+ @param address: QEMU address, can be either a unix socket path (string)
+ or a tuple in the form ( address, port ) for a TCP
+ connection
+ @param server: server mode listens on the socket (bool)
+ @raise OSError on socket connection errors
+ @note No connection is established, this is done by the connect() or
+ accept() methods
+ """
+ self.__events: List[QMPMessage] = []
+ self.__address = address
+ self.__sock = self.__get_sock()
+ self.__sockfile: Optional[TextIO] = None
+ self._nickname = nickname
+ if self._nickname:
+ self.logger = logging.getLogger('QMP').getChild(self._nickname)
+ if server:
+ self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.__sock.bind(self.__address)
+ self.__sock.listen(1)
+
+ def __get_sock(self) -> socket.socket:
+ if isinstance(self.__address, tuple):
+ family = socket.AF_INET
+ else:
+ family = socket.AF_UNIX
+ return socket.socket(family, socket.SOCK_STREAM)
+
+ def __negotiate_capabilities(self) -> QMPMessage:
+ greeting = self.__json_read()
+ if greeting is None or "QMP" not in greeting:
+ raise QMPConnectError
+ # Greeting seems ok, negotiate capabilities
+ resp = self.cmd('qmp_capabilities')
+ if resp and "return" in resp:
+ return greeting
+ raise QMPCapabilitiesError
+
+ def __json_read(self, only_event: bool = False) -> Optional[QMPMessage]:
+ assert self.__sockfile is not None
+ while True:
+ data = self.__sockfile.readline()
+ if not data:
+ return None
+ # By definition, any JSON received from QMP is a QMPMessage,
+ # and we are asserting only at static analysis time that it
+ # has a particular shape.
+ resp: QMPMessage = json.loads(data)
+ if 'event' in resp:
+ self.logger.debug("<<< %s", resp)
+ self.__events.append(resp)
+ if not only_event:
+ continue
+ return resp
+
+ def __get_events(self, wait: Union[bool, float] = False) -> None:
+ """
+ Check for new events in the stream and cache them in __events.
+
+ @param wait (bool): block until an event is available.
+ @param wait (float): If wait is a float, treat it as a timeout value.
+
+ @raise QMPTimeoutError: If a timeout float is provided and the timeout
+ period elapses.
+ @raise QMPConnectError: If wait is True but no events could be
+ retrieved or if some other error occurred.
+ """
+
+ # Current timeout and blocking status
+ current_timeout = self.__sock.gettimeout()
+
+ # Check for new events regardless and pull them into the cache:
+ self.__sock.settimeout(0) # i.e. setblocking(False)
+ try:
+ self.__json_read()
+ except OSError as err:
+ # EAGAIN: No data available; not critical
+ if err.errno != errno.EAGAIN:
+ raise
+ finally:
+ self.__sock.settimeout(current_timeout)
+
+ # Wait for new events, if needed.
+ # if wait is 0.0, this means "no wait" and is also implicitly false.
+ if not self.__events and wait:
+ if isinstance(wait, float):
+ self.__sock.settimeout(wait)
+ try:
+ ret = self.__json_read(only_event=True)
+ except socket.timeout as err:
+ raise QMPTimeoutError("Timeout waiting for event") from err
+ except Exception as err:
+ msg = "Error while reading from socket"
+ raise QMPConnectError(msg) from err
+ finally:
+ self.__sock.settimeout(current_timeout)
+
+ if ret is None:
+ raise QMPConnectError("Error while reading from socket")
+
+ def __enter__(self) -> 'QEMUMonitorProtocol':
+ # Implement context manager enter function.
+ return self
+
+ def __exit__(self,
+ # pylint: disable=duplicate-code
+ # see https://github.com/PyCQA/pylint/issues/3619
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType]) -> None:
+ # Implement context manager exit function.
+ self.close()
+
+ def connect(self, negotiate: bool = True) -> Optional[QMPMessage]:
+ """
+ Connect to the QMP Monitor and perform capabilities negotiation.
+
+ @return QMP greeting dict, or None if negotiate is false
+ @raise OSError on socket connection errors
+ @raise QMPConnectError if the greeting is not received
+ @raise QMPCapabilitiesError if fails to negotiate capabilities
+ """
+ self.__sock.connect(self.__address)
+ self.__sockfile = self.__sock.makefile(mode='r')
+ if negotiate:
+ return self.__negotiate_capabilities()
+ return None
+
+ def accept(self, timeout: Optional[float] = 15.0) -> QMPMessage:
+ """
+ Await connection from QMP Monitor and perform capabilities negotiation.
+
+ @param timeout: timeout in seconds (nonnegative float number, or
+ None). The value passed will set the behavior of the
+ underneath QMP socket as described in [1].
+ Default value is set to 15.0.
+ @return QMP greeting dict
+ @raise OSError on socket connection errors
+ @raise QMPConnectError if the greeting is not received
+ @raise QMPCapabilitiesError if fails to negotiate capabilities
+
+ [1]
+ https://docs.python.org/3/library/socket.html#socket.socket.settimeout
+ """
+ self.__sock.settimeout(timeout)
+ self.__sock, _ = self.__sock.accept()
+ self.__sockfile = self.__sock.makefile(mode='r')
+ return self.__negotiate_capabilities()
+
+ def cmd_obj(self, qmp_cmd: QMPMessage) -> QMPMessage:
+ """
+ Send a QMP command to the QMP Monitor.
+
+ @param qmp_cmd: QMP command to be sent as a Python dict
+ @return QMP response as a Python dict
+ """
+ self.logger.debug(">>> %s", qmp_cmd)
+ self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8'))
+ resp = self.__json_read()
+ if resp is None:
+ raise QMPConnectError("Unexpected empty reply from server")
+ self.logger.debug("<<< %s", resp)
+ return resp
+
+ def cmd(self, name: str,
+ args: Optional[Dict[str, Any]] = None,
+ cmd_id: Optional[Any] = None) -> QMPMessage:
+ """
+ Build a QMP command and send it to the QMP Monitor.
+
+ @param name: command name (string)
+ @param args: command arguments (dict)
+ @param cmd_id: command id (dict, list, string or int)
+ """
+ qmp_cmd: QMPMessage = {'execute': name}
+ if args:
+ qmp_cmd['arguments'] = args
+ if cmd_id:
+ qmp_cmd['id'] = cmd_id
+ return self.cmd_obj(qmp_cmd)
+
+ def command(self, cmd: str, **kwds: Any) -> QMPReturnValue:
+ """
+ Build and send a QMP command to the monitor, report errors if any
+ """
+ ret = self.cmd(cmd, kwds)
+ if 'error' in ret:
+ raise QMPResponseError(ret)
+ if 'return' not in ret:
+ raise QMPProtocolError(
+ "'return' key not found in QMP response '{}'".format(str(ret))
+ )
+ return cast(QMPReturnValue, ret['return'])
+
+ def pull_event(self,
+ wait: Union[bool, float] = False) -> Optional[QMPMessage]:
+ """
+ Pulls a single event.
+
+ @param wait (bool): block until an event is available.
+ @param wait (float): If wait is a float, treat it as a timeout value.
+
+ @raise QMPTimeoutError: If a timeout float is provided and the timeout
+ period elapses.
+ @raise QMPConnectError: If wait is True but no events could be
+ retrieved or if some other error occurred.
+
+ @return The first available QMP event, or None.
+ """
+ self.__get_events(wait)
+
+ if self.__events:
+ return self.__events.pop(0)
+ return None
+
+ def get_events(self, wait: bool = False) -> List[QMPMessage]:
+ """
+ Get a list of available QMP events.
+
+ @param wait (bool): block until an event is available.
+ @param wait (float): If wait is a float, treat it as a timeout value.
+
+ @raise QMPTimeoutError: If a timeout float is provided and the timeout
+ period elapses.
+ @raise QMPConnectError: If wait is True but no events could be
+ retrieved or if some other error occurred.
+
+ @return The list of available QMP events.
+ """
+ self.__get_events(wait)
+ return self.__events
+
+ def clear_events(self) -> None:
+ """
+ Clear current list of pending events.
+ """
+ self.__events = []
+
+ def close(self) -> None:
+ """
+ Close the socket and socket file.
+ """
+ if self.__sock:
+ self.__sock.close()
+ if self.__sockfile:
+ self.__sockfile.close()
+
+ def settimeout(self, timeout: Optional[float]) -> None:
+ """
+ Set the socket timeout.
+
+ @param timeout (float): timeout in seconds (non-zero), or None.
+ @note This is a wrap around socket.settimeout
+
+ @raise ValueError: if timeout was set to 0.
+ """
+ if timeout == 0:
+ msg = "timeout cannot be 0; this engages non-blocking mode."
+ msg += " Use 'None' instead to disable timeouts."
+ raise ValueError(msg)
+ self.__sock.settimeout(timeout)
+
+ def get_sock_fd(self) -> int:
+ """
+ Get the socket file descriptor.
+
+ @return The file descriptor number.
+ """
+ return self.__sock.fileno()
+
+ def is_scm_available(self) -> bool:
+ """
+ Check if the socket allows for SCM_RIGHTS.
+
+ @return True if SCM_RIGHTS is available, otherwise False.
+ """
+ return self.__sock.family == socket.AF_UNIX
+++ /dev/null
-"""
-QEMU qtest library
-
-qtest offers the QEMUQtestProtocol and QEMUQTestMachine classes, which
-offer a connection to QEMU's qtest protocol socket, and a qtest-enabled
-subclass of QEMUMachine, respectively.
-"""
-
-# Copyright (C) 2015 Red Hat Inc.
-#
-# Authors:
-# Fam Zheng <famz@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2. See
-# the COPYING file in the top-level directory.
-#
-# Based on qmp.py.
-#
-
-import os
-import socket
-from typing import (
- List,
- Optional,
- Sequence,
- TextIO,
-)
-
-from .machine import QEMUMachine
-from .qmp import SocketAddrT
-
-
-class QEMUQtestProtocol:
- """
- QEMUQtestProtocol implements a connection to a qtest socket.
-
- :param address: QEMU address, can be either a unix socket path (string)
- or a tuple in the form ( address, port ) for a TCP
- connection
- :param server: server mode, listens on the socket (bool)
- :raise socket.error: on socket connection errors
-
- .. note::
- No conection is estabalished by __init__(), this is done
- by the connect() or accept() methods.
- """
- def __init__(self, address: SocketAddrT,
- server: bool = False):
- self._address = address
- self._sock = self._get_sock()
- self._sockfile: Optional[TextIO] = None
- if server:
- self._sock.bind(self._address)
- self._sock.listen(1)
-
- def _get_sock(self) -> socket.socket:
- if isinstance(self._address, tuple):
- family = socket.AF_INET
- else:
- family = socket.AF_UNIX
- return socket.socket(family, socket.SOCK_STREAM)
-
- def connect(self) -> None:
- """
- Connect to the qtest socket.
-
- @raise socket.error on socket connection errors
- """
- self._sock.connect(self._address)
- self._sockfile = self._sock.makefile(mode='r')
-
- def accept(self) -> None:
- """
- Await connection from QEMU.
-
- @raise socket.error on socket connection errors
- """
- self._sock, _ = self._sock.accept()
- self._sockfile = self._sock.makefile(mode='r')
-
- def cmd(self, qtest_cmd: str) -> str:
- """
- Send a qtest command on the wire.
-
- @param qtest_cmd: qtest command text to be sent
- """
- assert self._sockfile is not None
- self._sock.sendall((qtest_cmd + "\n").encode('utf-8'))
- resp = self._sockfile.readline()
- return resp
-
- def close(self) -> None:
- """
- Close this socket.
- """
- self._sock.close()
- if self._sockfile:
- self._sockfile.close()
- self._sockfile = None
-
- def settimeout(self, timeout: Optional[float]) -> None:
- """Set a timeout, in seconds."""
- self._sock.settimeout(timeout)
-
-
-class QEMUQtestMachine(QEMUMachine):
- """
- A QEMU VM, with a qtest socket available.
- """
-
- def __init__(self,
- binary: str,
- args: Sequence[str] = (),
- name: Optional[str] = None,
- base_temp_dir: str = "/var/tmp",
- socket_scm_helper: Optional[str] = None,
- sock_dir: Optional[str] = None):
- if name is None:
- name = "qemu-%d" % os.getpid()
- if sock_dir is None:
- sock_dir = base_temp_dir
- super().__init__(binary, args, name=name, base_temp_dir=base_temp_dir,
- socket_scm_helper=socket_scm_helper,
- sock_dir=sock_dir)
- self._qtest: Optional[QEMUQtestProtocol] = None
- self._qtest_path = os.path.join(sock_dir, name + "-qtest.sock")
-
- @property
- def _base_args(self) -> List[str]:
- args = super()._base_args
- args.extend([
- '-qtest', f"unix:path={self._qtest_path}",
- '-accel', 'qtest'
- ])
- return args
-
- def _pre_launch(self) -> None:
- super()._pre_launch()
- self._qtest = QEMUQtestProtocol(self._qtest_path, server=True)
-
- def _post_launch(self) -> None:
- assert self._qtest is not None
- super()._post_launch()
- self._qtest.accept()
-
- def _post_shutdown(self) -> None:
- super()._post_shutdown()
- self._remove_if_exists(self._qtest_path)
-
- def qtest(self, cmd: str) -> str:
- """
- Send a qtest command to the guest.
-
- :param cmd: qtest command to send
- :return: qtest server response
- """
- if self._qtest is None:
- raise RuntimeError("qtest socket not available")
- return self._qtest.cmd(cmd)
+++ /dev/null
-"""
-QEMU utility library
-
-This offers miscellaneous utility functions, which may not be easily
-distinguishable or numerous to be in their own module.
-"""
-
-# Copyright (C) 2021 Red Hat Inc.
-#
-# Authors:
-# Cleber Rosa <crosa@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2. See
-# the COPYING file in the top-level directory.
-#
-
-import re
-from typing import Optional
-
-
-def get_info_usernet_hostfwd_port(info_usernet_output: str) -> Optional[int]:
- """
- Returns the port given to the hostfwd parameter via info usernet
-
- :param info_usernet_output: output generated by hmp command "info usernet"
- :return: the port number allocated by the hostfwd option
- """
- for line in info_usernet_output.split('\r\n'):
- regex = r'TCP.HOST_FORWARD.*127\.0\.0\.1\s+(\d+)\s+10\.'
- match = re.search(regex, line)
- if match is not None:
- return int(match[1])
- return None
--- /dev/null
+"""
+QEMU development and testing utilities
+
+This package provides a small handful of utilities for performing
+various tasks not directly related to the launching of a VM.
+"""
+
+# Copyright (C) 2021 Red Hat Inc.
+#
+# Authors:
+# John Snow <jsnow@redhat.com>
+# Cleber Rosa <crosa@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+
+import re
+from typing import Optional
+
+# pylint: disable=import-error
+from .accel import kvm_available, list_accel, tcg_available
+
+
+__all__ = (
+ 'get_info_usernet_hostfwd_port',
+ 'kvm_available',
+ 'list_accel',
+ 'tcg_available',
+)
+
+
+def get_info_usernet_hostfwd_port(info_usernet_output: str) -> Optional[int]:
+ """
+ Returns the port given to the hostfwd parameter via info usernet
+
+ :param info_usernet_output: output generated by hmp command "info usernet"
+ :return: the port number allocated by the hostfwd option
+ """
+ for line in info_usernet_output.split('\r\n'):
+ regex = r'TCP.HOST_FORWARD.*127\.0\.0\.1\s+(\d+)\s+10\.'
+ match = re.search(regex, line)
+ if match is not None:
+ return int(match[1])
+ return None
--- /dev/null
+"""
+QEMU accel module:
+
+This module provides utilities for discover and check the availability of
+accelerators.
+"""
+# Copyright (C) 2015-2016 Red Hat Inc.
+# Copyright (C) 2012 IBM Corp.
+#
+# Authors:
+# Fam Zheng <famz@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+
+import logging
+import os
+import subprocess
+from typing import List, Optional
+
+
+LOG = logging.getLogger(__name__)
+
+# Mapping host architecture to any additional architectures it can
+# support which often includes its 32 bit cousin.
+ADDITIONAL_ARCHES = {
+ "x86_64": "i386",
+ "aarch64": "armhf",
+ "ppc64le": "ppc64",
+}
+
+
+def list_accel(qemu_bin: str) -> List[str]:
+ """
+ List accelerators enabled in the QEMU binary.
+
+ @param qemu_bin (str): path to the QEMU binary.
+ @raise Exception: if failed to run `qemu -accel help`
+ @return a list of accelerator names.
+ """
+ if not qemu_bin:
+ return []
+ try:
+ out = subprocess.check_output([qemu_bin, '-accel', 'help'],
+ universal_newlines=True)
+ except:
+ LOG.debug("Failed to get the list of accelerators in %s", qemu_bin)
+ raise
+ # Skip the first line which is the header.
+ return [acc.strip() for acc in out.splitlines()[1:]]
+
+
+def kvm_available(target_arch: Optional[str] = None,
+ qemu_bin: Optional[str] = None) -> bool:
+ """
+ Check if KVM is available using the following heuristic:
+ - Kernel module is present in the host;
+ - Target and host arches don't mismatch;
+ - KVM is enabled in the QEMU binary.
+
+ @param target_arch (str): target architecture
+ @param qemu_bin (str): path to the QEMU binary
+ @return True if kvm is available, otherwise False.
+ """
+ if not os.access("/dev/kvm", os.R_OK | os.W_OK):
+ return False
+ if target_arch:
+ host_arch = os.uname()[4]
+ if target_arch != host_arch:
+ if target_arch != ADDITIONAL_ARCHES.get(host_arch):
+ return False
+ if qemu_bin and "kvm" not in list_accel(qemu_bin):
+ return False
+ return True
+
+
+def tcg_available(qemu_bin: str) -> bool:
+ """
+ Check if TCG is available.
+
+ @param qemu_bin (str): path to the QEMU binary
+ """
+ return 'tcg' in list_accel(qemu_bin)
sys.path.append(os.path.join(SOURCE_DIR, 'python'))
-from qemu.accel import kvm_available
-from qemu.accel import tcg_available
from qemu.machine import QEMUMachine
-from qemu.utils import get_info_usernet_hostfwd_port
-
+from qemu.utils import (
+ get_info_usernet_hostfwd_port,
+ kvm_available,
+ tcg_available,
+)
def is_readable_executable_file(path):
return os.path.isfile(path) and os.access(path, os.R_OK | os.X_OK)
from avocado_qemu import exec_command_and_wait_for_pattern
from avocado_qemu import is_readable_executable_file
-from qemu.accel import kvm_available
+from qemu.utils import kvm_available
import os
import socket
# Import qemu after iotests.py has amended sys.path
# pylint: disable=wrong-import-order
-import qemu
+from qemu.machine import machine
BlockBitmapMapping = List[Dict[str, object]]
# the failed migration
try:
self.vm_b.shutdown()
- except qemu.machine.AbnormalShutdown:
+ except machine.AbnormalShutdown:
pass
def test_aliased_bitmap_name_too_long(self) -> None:
# pylint: disable=import-error, wrong-import-position
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
-from qemu import qtest
+from qemu.machine import qtest
from qemu.qmp import QMPMessage
# Use this logger for logging messages directly from the iotests module
import sys
import subprocess
import basevm
-from qemu.accel import kvm_available
+from qemu.utils import kvm_available
# This is the config needed for current version of QEMU.
# This works for both kvm and tcg.
import time
import datetime
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
-from qemu.accel import kvm_available
from qemu.machine import QEMUMachine
-from qemu.utils import get_info_usernet_hostfwd_port
+from qemu.utils import get_info_usernet_hostfwd_port, kvm_available
import subprocess
import hashlib
import argparse