MultiFDMethods *ops;
} *multifd_recv_state;
+static bool multifd_use_packets(void)
+{
+ return !migrate_mapped_ram();
+}
+
/* Multifd without compression */
/**
return;
}
+static void multifd_send_prepare_iovs(MultiFDSendParams *p)
+{
+ MultiFDPages_t *pages = p->pages;
+
+ for (int i = 0; i < pages->num; i++) {
+ p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
+ p->iov[p->iovs_num].iov_len = p->page_size;
+ p->iovs_num++;
+ }
+
+ p->next_packet_size = pages->num * p->page_size;
+}
+
/**
* nocomp_send_prepare: prepare date to be able to send
*
static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
{
bool use_zero_copy_send = migrate_zero_copy_send();
- MultiFDPages_t *pages = p->pages;
int ret;
+ if (!multifd_use_packets()) {
+ multifd_send_prepare_iovs(p);
+ return 0;
+ }
+
if (!use_zero_copy_send) {
/*
* Only !zerocopy needs the header in IOV; zerocopy will
multifd_send_prepare_header(p);
}
- for (int i = 0; i < pages->num; i++) {
- p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
- p->iov[p->iovs_num].iov_len = p->page_size;
- p->iovs_num++;
- }
-
- p->next_packet_size = pages->num * p->page_size;
+ multifd_send_prepare_iovs(p);
p->flags |= MULTIFD_FLAG_NOCOMP;
multifd_send_fill_packet(p);
*/
static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
{
- uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
+ uint32_t flags;
+
+ if (!multifd_use_packets()) {
+ return 0;
+ }
+
+ flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
if (flags != MULTIFD_FLAG_NOCOMP) {
error_setg(errp, "multifd %u: flags received %x flags expected %x",
MigrationThread *thread = NULL;
Error *local_err = NULL;
int ret = 0;
+ bool use_packets = multifd_use_packets();
thread = migration_threads_add(p->name, qemu_get_thread_id());
trace_multifd_send_thread_start(p->id);
rcu_register_thread();
- if (multifd_send_initial_packet(p, &local_err) < 0) {
- ret = -1;
- goto out;
+ if (use_packets) {
+ if (multifd_send_initial_packet(p, &local_err) < 0) {
+ ret = -1;
+ goto out;
+ }
}
while (true) {
* it doesn't require explicit memory barriers.
*/
assert(qatomic_read(&p->pending_sync));
- p->flags = MULTIFD_FLAG_SYNC;
- multifd_send_fill_packet(p);
- ret = qio_channel_write_all(p->c, (void *)p->packet,
- p->packet_len, &local_err);
- if (ret != 0) {
- break;
+
+ if (use_packets) {
+ p->flags = MULTIFD_FLAG_SYNC;
+ multifd_send_fill_packet(p);
+ ret = qio_channel_write_all(p->c, (void *)p->packet,
+ p->packet_len, &local_err);
+ if (ret != 0) {
+ break;
+ }
+ /* p->next_packet_size will always be zero for a SYNC packet */
+ stat64_add(&mig_stats.multifd_bytes, p->packet_len);
+ p->flags = 0;
}
- /* p->next_packet_size will always be zero for a SYNC packet */
- stat64_add(&mig_stats.multifd_bytes, p->packet_len);
- p->flags = 0;
+
qatomic_set(&p->pending_sync, false);
qemu_sem_post(&p->sem_sync);
}
Error *local_err = NULL;
int thread_count, ret = 0;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+ bool use_packets = multifd_use_packets();
uint8_t i;
if (!migrate_multifd()) {
qemu_sem_init(&p->sem_sync, 0);
p->id = i;
p->pages = multifd_pages_init(page_count);
- p->packet_len = sizeof(MultiFDPacket_t)
- + sizeof(uint64_t) * page_count;
- p->packet = g_malloc0(p->packet_len);
- p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
- p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+
+ if (use_packets) {
+ p->packet_len = sizeof(MultiFDPacket_t)
+ + sizeof(uint64_t) * page_count;
+ p->packet = g_malloc0(p->packet_len);
+ p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
+ p->packet->version = cpu_to_be32(MULTIFD_VERSION);
+
+ /* We need one extra place for the packet header */
+ p->iov = g_new0(struct iovec, page_count + 1);
+ } else {
+ p->iov = g_new0(struct iovec, page_count);
+ }
p->name = g_strdup_printf("multifdsend_%d", i);
- /* We need one extra place for the packet header */
- p->iov = g_new0(struct iovec, page_count + 1);
p->page_size = qemu_target_page_size();
p->page_count = page_count;
p->write_flags = 0;
* multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
* however try to wakeup it without harm in cleanup phase.
*/
- qemu_sem_post(&p->sem_sync);
+ if (multifd_use_packets()) {
+ qemu_sem_post(&p->sem_sync);
+ }
/*
* We could arrive here for two reasons:
int thread_count = migrate_multifd_channels();
int i;
- if (!migrate_multifd()) {
+ if (!migrate_multifd() || !multifd_use_packets()) {
return;
}
{
MultiFDRecvParams *p = opaque;
Error *local_err = NULL;
+ bool use_packets = multifd_use_packets();
int ret;
trace_multifd_recv_thread_start(p->id);
rcu_register_thread();
while (true) {
- uint32_t flags;
+ uint32_t flags = 0;
bool has_data = false;
p->normal_num = 0;
break;
}
- ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
- p->packet_len, &local_err);
- if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
- break;
- }
+ if (use_packets) {
+ ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
+ p->packet_len, &local_err);
+ if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
+ break;
+ }
- qemu_mutex_lock(&p->mutex);
- ret = multifd_recv_unfill_packet(p, &local_err);
- if (ret) {
+ qemu_mutex_lock(&p->mutex);
+ ret = multifd_recv_unfill_packet(p, &local_err);
+ if (ret) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
+ }
+
+ flags = p->flags;
+ /* recv methods don't know how to handle the SYNC flag */
+ p->flags &= ~MULTIFD_FLAG_SYNC;
+ has_data = !!p->normal_num;
qemu_mutex_unlock(&p->mutex);
- break;
}
- flags = p->flags;
- /* recv methods don't know how to handle the SYNC flag */
- p->flags &= ~MULTIFD_FLAG_SYNC;
- has_data = !!p->normal_num;
- qemu_mutex_unlock(&p->mutex);
-
if (has_data) {
ret = multifd_recv_state->ops->recv(p, &local_err);
if (ret != 0) {
}
}
- if (flags & MULTIFD_FLAG_SYNC) {
- qemu_sem_post(&multifd_recv_state->sem_sync);
- qemu_sem_wait(&p->sem_sync);
+ if (use_packets) {
+ if (flags & MULTIFD_FLAG_SYNC) {
+ qemu_sem_post(&multifd_recv_state->sem_sync);
+ qemu_sem_wait(&p->sem_sync);
+ }
}
}
{
int thread_count;
uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
+ bool use_packets = multifd_use_packets();
uint8_t i;
/*
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem_sync, 0);
p->id = i;
- p->packet_len = sizeof(MultiFDPacket_t)
- + sizeof(uint64_t) * page_count;
- p->packet = g_malloc0(p->packet_len);
+
+ if (use_packets) {
+ p->packet_len = sizeof(MultiFDPacket_t)
+ + sizeof(uint64_t) * page_count;
+ p->packet = g_malloc0(p->packet_len);
+ }
p->name = g_strdup_printf("multifdrecv_%d", i);
p->iov = g_new0(struct iovec, page_count);
p->normal = g_new0(ram_addr_t, page_count);
{
MultiFDRecvParams *p;
Error *local_err = NULL;
+ bool use_packets = multifd_use_packets();
int id;
- id = multifd_recv_initial_packet(ioc, &local_err);
- if (id < 0) {
- multifd_recv_terminate_threads(local_err);
- error_propagate_prepend(errp, local_err,
- "failed to receive packet"
- " via multifd channel %d: ",
- qatomic_read(&multifd_recv_state->count));
- return;
+ if (use_packets) {
+ id = multifd_recv_initial_packet(ioc, &local_err);
+ if (id < 0) {
+ multifd_recv_terminate_threads(local_err);
+ error_propagate_prepend(errp, local_err,
+ "failed to receive packet"
+ " via multifd channel %d: ",
+ qatomic_read(&multifd_recv_state->count));
+ return;
+ }
+ trace_multifd_recv_new_channel(id);
+ } else {
+ /* next patch gives this a meaningful value */
+ id = 0;
}
- trace_multifd_recv_new_channel(id);
p = &multifd_recv_state->params[id];
if (p->c != NULL) {