import subprocess
import sys
from typing import (Any, Callable, Dict, Iterable,
- List, Optional, Sequence, TypeVar)
+ List, Optional, Sequence, Tuple, TypeVar)
import unittest
# pylint: disable=import-error, wrong-import-position
luks_default_key_secret_opt = 'key-secret=keysec0'
-def qemu_img(*args):
- '''Run qemu-img and return the exit code'''
- devnull = open('/dev/null', 'r+')
- exitcode = subprocess.call(qemu_img_args + list(args),
- stdin=devnull, stdout=devnull)
- if exitcode < 0:
+def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
+ """
+ Run qemu-img and return both its output and its exit code
+ """
+ subp = subprocess.Popen(qemu_img_args + list(args),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ universal_newlines=True)
+ output = subp.communicate()[0]
+ if subp.returncode < 0:
sys.stderr.write('qemu-img received signal %i: %s\n'
- % (-exitcode, ' '.join(qemu_img_args + list(args))))
- return exitcode
+ % (-subp.returncode,
+ ' '.join(qemu_img_args + list(args))))
+ return (output, subp.returncode)
+
+def qemu_img(*args: str) -> int:
+ '''Run qemu-img and return the exit code'''
+ return qemu_img_pipe_and_status(*args)[1]
def ordered_qmp(qmsg, conv_keys=True):
# Dictionaries are not ordered prior to 3.6, therefore:
% (-exitcode, ' '.join(qemu_img_args + list(args))))
return exitcode
-def qemu_img_pipe(*args):
+def qemu_img_pipe(*args: str) -> str:
'''Run qemu-img and return its output'''
- subp = subprocess.Popen(qemu_img_args + list(args),
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- universal_newlines=True)
- output = subp.communicate()[0]
- if subp.returncode < 0:
- sys.stderr.write('qemu-img received signal %i: %s\n'
- % (-subp.returncode,
- ' '.join(qemu_img_args + list(args))))
- return output
+ return qemu_img_pipe_and_status(*args)[0]
def qemu_img_log(*args):
result = qemu_img_pipe(*args)