u16 advertised;
u16 window;
u16 bc_rcv_nxt;
+ u16 bc_syncpt;
u16 bc_acked;
bool usr_pending;
};
struct sk_buff *_skb, *tmp;
int mtyp = msg_type(hdr);
- /* Bcast may be bypassed by unicast or other bcast, - sort it in */
+ /* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */
if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
skb_queue_walk_safe(defq, _skb, tmp) {
_hdr = buf_msg(_skb);
struct sk_buff_head *xmitq)
{
struct sk_buff *skb = __skb_dequeue(inputq);
- bool ack, deliver, update;
+ bool ack, deliver, update, leave = false;
struct sk_buff_head *defq;
struct tipc_member *m;
struct tipc_msg *hdr;
if (!msg_in_group(hdr))
goto drop;
- if (msg_is_grp_evt(hdr)) {
- if (!grp->events)
- goto drop;
- __skb_queue_tail(inputq, skb);
- return;
- }
-
m = tipc_group_find_member(grp, node, port);
if (!tipc_group_is_receiver(m))
goto drop;
break;
case TIPC_GRP_UCAST_MSG:
break;
+ case TIPC_GRP_MEMBER_EVT:
+ if (m->state == MBR_LEAVING)
+ leave = true;
+ if (!grp->events)
+ deliver = false;
+ break;
default:
break;
}
if (ack)
tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);
+ if (leave) {
+ tipc_group_delete_member(grp, m);
+ __skb_queue_purge(defq);
+ break;
+ }
if (!update)
continue;
msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
msg_set_adv_win(hdr, adv);
m->advertised += adv;
+ } else if (mtyp == GRP_LEAVE_MSG) {
+ msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
} else if (mtyp == GRP_ADV_MSG) {
msg_set_adv_win(hdr, adv);
m->advertised += adv;
u32 node = msg_orignode(hdr);
u32 port = msg_origport(hdr);
struct tipc_member *m;
+ struct tipc_msg *ehdr;
if (!grp)
return;
MBR_QUARANTINED);
if (!m)
return;
- m->bc_rcv_nxt = msg_grp_bc_syncpt(hdr);
+ m->bc_syncpt = msg_grp_bc_syncpt(hdr);
+ m->bc_rcv_nxt = m->bc_syncpt;
m->window += msg_adv_win(hdr);
/* Wait until PUBLISH event is received */
*usr_wakeup = true;
m->usr_pending = false;
tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
+ ehdr = buf_msg(m->event_msg);
+ msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
__skb_queue_tail(inputq, m->event_msg);
}
if (m->window < ADV_IDLE)
case GRP_LEAVE_MSG:
if (!m)
return;
+ m->bc_syncpt = msg_grp_bc_syncpt(hdr);
/* Wait until WITHDRAW event is received */
if (m->state != MBR_LEAVING) {
return;
}
/* Otherwise deliver already received WITHDRAW event */
+ ehdr = buf_msg(m->event_msg);
+ msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
__skb_queue_tail(inputq, m->event_msg);
*usr_wakeup = true;
- tipc_group_delete_member(grp, m);
list_del_init(&m->congested);
return;
case GRP_ADV_MSG:
int event = evt->event;
struct tipc_member *m;
struct net *net;
+ bool node_up;
u32 self;
if (!grp)
m->event_msg = skb;
m->state = MBR_PUBLISHED;
} else {
+ msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
__skb_queue_tail(inputq, skb);
m->state = MBR_JOINED;
*usr_wakeup = true;
*usr_wakeup = true;
m->usr_pending = false;
+ node_up = tipc_node_is_up(net, node);
/* Hold back event if more messages might be expected */
- if (m->state != MBR_LEAVING && tipc_node_is_up(net, node)) {
+ if (m->state != MBR_LEAVING && node_up) {
m->event_msg = skb;
m->state = MBR_LEAVING;
} else {
+ if (node_up)
+ msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
+ else
+ msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
__skb_queue_tail(inputq, skb);
- tipc_group_delete_member(grp, m);
}
list_del_init(&m->congested);
}