struct midcomms_node {
int nodeid;
uint32_t version;
- uint32_t seq_send;
- uint32_t seq_next;
+ atomic_t seq_send;
+ atomic_t seq_next;
/* These queues are unbound because we cannot drop any message in dlm.
* We could send a fence signal for a specific node to the cluster
* manager if queues hits some maximum value, however this handling
{
pr_debug("reset node %d\n", node->nodeid);
- node->seq_next = DLM_SEQ_INIT;
- node->seq_send = DLM_SEQ_INIT;
+ atomic_set(&node->seq_next, DLM_SEQ_INIT);
+ atomic_set(&node->seq_send, DLM_SEQ_INIT);
node->version = DLM_VERSION_NOT_SET;
node->flags = 0;
struct midcomms_node *node,
uint32_t seq)
{
- if (seq == node->seq_next) {
- node->seq_next++;
+ bool is_expected_seq;
+ uint32_t oval, nval;
+ do {
+ oval = atomic_read(&node->seq_next);
+ is_expected_seq = (oval == seq);
+ if (!is_expected_seq)
+ break;
+
+ nval = oval + 1;
+ } while (atomic_cmpxchg(&node->seq_next, oval, nval) != oval);
+
+ if (is_expected_seq) {
switch (p->header.h_cmd) {
case DLM_FIN:
spin_lock(&node->state_lock);
switch (node->state) {
case DLM_ESTABLISHED:
- dlm_send_ack(node->nodeid, node->seq_next);
+ dlm_send_ack(node->nodeid, nval);
/* passive shutdown DLM_LAST_ACK case 1
* additional we check if the node is used by
}
break;
case DLM_FIN_WAIT1:
- dlm_send_ack(node->nodeid, node->seq_next);
+ dlm_send_ack(node->nodeid, nval);
node->state = DLM_CLOSING;
set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
break;
case DLM_FIN_WAIT2:
- dlm_send_ack(node->nodeid, node->seq_next);
+ dlm_send_ack(node->nodeid, nval);
midcomms_node_reset(node);
pr_debug("switch node %d to state %s\n",
node->nodeid, dlm_state_str(node->state));
/* retry to ack message which we already have by sending back
* current node->seq_next number as ack.
*/
- if (seq < node->seq_next)
- dlm_send_ack(node->nodeid, node->seq_next);
+ if (seq < oval)
+ dlm_send_ack(node->nodeid, oval);
log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
- seq, node->seq_next, node->nodeid);
+ seq, oval, node->nodeid);
}
}
switch (node->state) {
case DLM_ESTABLISHED:
spin_unlock(&node->state_lock);
- dlm_send_ack(node->nodeid, node->seq_next);
+ dlm_send_ack(node->nodeid, atomic_read(&node->seq_next));
break;
default:
spin_unlock(&node->state_lock);
list_add_tail_rcu(&mh->list, &mh->node->send_queue);
spin_unlock_bh(&mh->node->send_queue_lock);
- mh->seq = mh->node->seq_send++;
+ mh->seq = atomic_fetch_inc(&mh->node->seq_send);
}
static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
switch (h->h_cmd) {
case DLM_OPTS:
if (!h->u.h_seq)
- h->u.h_seq = cpu_to_le32(rd->node->seq_send++);
+ h->u.h_seq = cpu_to_le32(atomic_fetch_inc(&rd->node->seq_send));
break;
default:
break;