#include <stdint.h>
#include <stdarg.h>
#include <stdlib.h>
+#include <zlib.h>
#ifndef _WIN32
#include <sys/types.h>
#include <sys/mman.h>
#define RAM_SAVE_FLAG_CONTINUE 0x20
#define RAM_SAVE_FLAG_XBZRLE 0x40
/* 0x80 is reserved in migration.h start with 0x100 next */
+#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100
static struct defconfig_file {
const char *filename;
};
typedef struct CompressParam CompressParam;
+struct DecompressParam {
+ /* To be done */
+};
+typedef struct DecompressParam DecompressParam;
+
static CompressParam *comp_param;
static QemuThread *compress_threads;
static bool quit_comp_thread;
+static bool quit_decomp_thread;
+static DecompressParam *decomp_param;
+static QemuThread *decompress_threads;
+static uint8_t *compressed_data_buf;
static void *do_data_compress(void *opaque)
{
}
}
+static void *do_data_decompress(void *opaque)
+{
+ while (!quit_decomp_thread) {
+ /* To be done */
+ }
+
+ return NULL;
+}
+
+void migrate_decompress_threads_create(void)
+{
+ int i, thread_count;
+
+ thread_count = migrate_decompress_threads();
+ decompress_threads = g_new0(QemuThread, thread_count);
+ decomp_param = g_new0(DecompressParam, thread_count);
+ compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+ quit_decomp_thread = false;
+ for (i = 0; i < thread_count; i++) {
+ qemu_thread_create(decompress_threads + i, "decompress",
+ do_data_decompress, decomp_param + i,
+ QEMU_THREAD_JOINABLE);
+ }
+}
+
+void migrate_decompress_threads_join(void)
+{
+ int i, thread_count;
+
+ quit_decomp_thread = true;
+ thread_count = migrate_decompress_threads();
+ for (i = 0; i < thread_count; i++) {
+ qemu_thread_join(decompress_threads + i);
+ }
+ g_free(decompress_threads);
+ g_free(decomp_param);
+ g_free(compressed_data_buf);
+ decompress_threads = NULL;
+ decomp_param = NULL;
+ compressed_data_buf = NULL;
+}
+
+static void decompress_data_with_multi_threads(uint8_t *compbuf,
+ void *host, int len)
+{
+ /* To be done */
+}
+
static int ram_load(QEMUFile *f, void *opaque, int version_id)
{
int flags = 0, ret = 0;
static uint64_t seq_iter;
+ int len = 0;
seq_iter++;
}
qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
break;
+ case RAM_SAVE_FLAG_COMPRESS_PAGE:
+ host = host_from_stream_offset(f, addr, flags);
+ if (!host) {
+ error_report("Invalid RAM offset " RAM_ADDR_FMT, addr);
+ ret = -EINVAL;
+ break;
+ }
+
+ len = qemu_get_be32(f);
+ if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) {
+ error_report("Invalid compressed data length: %d", len);
+ ret = -EINVAL;
+ break;
+ }
+ qemu_get_buffer(f, compressed_data_buf, len);
+ decompress_data_with_multi_threads(compressed_data_buf, host, len);
+ break;
case RAM_SAVE_FLAG_XBZRLE:
host = host_from_stream_offset(f, addr, flags);
if (!host) {
QEMUBH *cleanup_bh;
QEMUFile *file;
int compress_thread_count;
+ int decompress_thread_count;
int compress_level;
int state;
void migrate_compress_threads_create(void);
void migrate_compress_threads_join(void);
+void migrate_decompress_threads_create(void);
+void migrate_decompress_threads_join(void);
uint64_t ram_bytes_remaining(void);
uint64_t ram_bytes_transferred(void);
uint64_t ram_bytes_total(void);
bool migrate_use_compression(void);
int migrate_compress_level(void);
int migrate_compress_threads(void);
+int migrate_decompress_threads(void);
void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
/* Default compression thread count */
#define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8
+/* Default decompression thread count, usually decompression is at
+ * least 4 times as fast as compression.*/
+#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2
/*0: means nocompress, 1: best speed, ... 9: best compress ratio */
#define DEFAULT_MIGRATE_COMPRESS_LEVEL 1
.xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE,
.mbps = -1,
.compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT,
+ .decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT,
.compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL,
};
free_xbzrle_decoded_buf();
if (ret < 0) {
error_report("load of migration failed: %s", strerror(-ret));
+ migrate_decompress_threads_join();
exit(EXIT_FAILURE);
}
qemu_announce_self();
bdrv_invalidate_cache_all(&local_err);
if (local_err) {
error_report_err(local_err);
+ migrate_decompress_threads_join();
exit(EXIT_FAILURE);
}
} else {
runstate_set(RUN_STATE_PAUSED);
}
+ migrate_decompress_threads_join();
}
void process_incoming_migration(QEMUFile *f)
int fd = qemu_get_fd(f);
assert(fd != -1);
+ migrate_decompress_threads_create();
qemu_set_nonblock(fd);
qemu_coroutine_enter(co, f);
}
int64_t xbzrle_cache_size = s->xbzrle_cache_size;
int compress_level = s->compress_level;
int compress_thread_count = s->compress_thread_count;
+ int decompress_thread_count = s->decompress_thread_count;
memcpy(enabled_capabilities, s->enabled_capabilities,
sizeof(enabled_capabilities));
s->compress_level = compress_level;
s->compress_thread_count = compress_thread_count;
+ s->decompress_thread_count = decompress_thread_count;
s->bandwidth_limit = bandwidth_limit;
s->state = MIGRATION_STATUS_SETUP;
trace_migrate_set_state(MIGRATION_STATUS_SETUP);
return s->compress_thread_count;
}
+int migrate_decompress_threads(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->decompress_thread_count;
+}
+
int migrate_use_xbzrle(void)
{
MigrationState *s;