static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
- struct dlm_message *ms);
+ struct dlm_message *ms, bool local);
static int receive_extralen(struct dlm_message *ms);
static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
static void toss_rsb(struct kref *kref);
/* Handles situations where we might be processing a "fake" or "local" reply in
which we can't try to take waiters_mutex again. */
-static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
+static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms,
+ bool local)
{
struct dlm_ls *ls = lkb->lkb_resource->res_ls;
int error;
- if (ms->m_flags != cpu_to_le32(DLM_IFL_LOCAL_MS))
+ if (!local)
mutex_lock(&ls->ls_waiters_mutex);
error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
- if (ms->m_flags != cpu_to_le32(DLM_IFL_LOCAL_MS))
+ if (!local)
mutex_unlock(&ls->ls_waiters_mutex);
return error;
}
/* down conversions go without a reply from the master */
if (!error && down_conversion(lkb)) {
remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
- r->res_ls->ls_local_ms.m_flags = cpu_to_le32(DLM_IFL_LOCAL_MS);
r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
r->res_ls->ls_local_ms.m_result = 0;
- __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms);
+ __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
}
return error;
(le32_to_cpu(ms->m_flags) & 0x0000FFFF);
}
-static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
+static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
+ bool local)
{
- if (ms->m_flags == cpu_to_le32(DLM_IFL_LOCAL_MS))
+ if (local)
return;
lkb->lkb_sbflags = le32_to_cpu(ms->m_sbflags);
if (error)
goto out;
- receive_flags_reply(lkb, ms);
+ receive_flags_reply(lkb, ms, false);
if (is_altmode(lkb))
munge_altmode(lkb, ms);
grant_lock_pc(r, lkb, ms);
case -EINPROGRESS:
case 0:
/* request was queued or granted on remote master */
- receive_flags_reply(lkb, ms);
+ receive_flags_reply(lkb, ms, false);
lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
if (is_altmode(lkb))
munge_altmode(lkb, ms);
}
static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
- struct dlm_message *ms)
+ struct dlm_message *ms, bool local)
{
/* this is the value returned from do_convert() on the master */
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
break;
case -EDEADLK:
- receive_flags_reply(lkb, ms);
+ receive_flags_reply(lkb, ms, local);
revert_lock_pc(r, lkb);
queue_cast(r, lkb, -EDEADLK);
break;
case -EINPROGRESS:
/* convert was queued on remote master */
- receive_flags_reply(lkb, ms);
+ receive_flags_reply(lkb, ms, local);
if (is_demoted(lkb))
munge_demoted(lkb);
del_lkb(r, lkb);
case 0:
/* convert was granted on remote master */
- receive_flags_reply(lkb, ms);
+ receive_flags_reply(lkb, ms, local);
if (is_demoted(lkb))
munge_demoted(lkb);
grant_lock_pc(r, lkb, ms);
}
}
-static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
+static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
+ bool local)
{
struct dlm_rsb *r = lkb->lkb_resource;
int error;
goto out;
/* local reply can happen with waiters_mutex held */
- error = remove_from_waiters_ms(lkb, ms);
+ error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
- __receive_convert_reply(r, lkb, ms);
+ __receive_convert_reply(r, lkb, ms, local);
out:
unlock_rsb(r);
put_rsb(r);
if (error)
return error;
- _receive_convert_reply(lkb, ms);
+ _receive_convert_reply(lkb, ms, false);
dlm_put_lkb(lkb);
return 0;
}
-static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
+static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
+ bool local)
{
struct dlm_rsb *r = lkb->lkb_resource;
int error;
goto out;
/* local reply can happen with waiters_mutex held */
- error = remove_from_waiters_ms(lkb, ms);
+ error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
case -DLM_EUNLOCK:
- receive_flags_reply(lkb, ms);
+ receive_flags_reply(lkb, ms, local);
remove_lock_pc(r, lkb);
queue_cast(r, lkb, -DLM_EUNLOCK);
break;
if (error)
return error;
- _receive_unlock_reply(lkb, ms);
+ _receive_unlock_reply(lkb, ms, false);
dlm_put_lkb(lkb);
return 0;
}
-static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
+static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms,
+ bool local)
{
struct dlm_rsb *r = lkb->lkb_resource;
int error;
goto out;
/* local reply can happen with waiters_mutex held */
- error = remove_from_waiters_ms(lkb, ms);
+ error = remove_from_waiters_ms(lkb, ms, local);
if (error)
goto out;
switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
case -DLM_ECANCEL:
- receive_flags_reply(lkb, ms);
+ receive_flags_reply(lkb, ms, local);
revert_lock_pc(r, lkb);
queue_cast(r, lkb, -DLM_ECANCEL);
break;
if (error)
return error;
- _receive_cancel_reply(lkb, ms);
+ _receive_cancel_reply(lkb, ms, false);
dlm_put_lkb(lkb);
return 0;
}
if (middle_conversion(lkb)) {
hold_lkb(lkb);
memset(ms_local, 0, sizeof(struct dlm_message));
- ms_local->m_flags = cpu_to_le32(DLM_IFL_LOCAL_MS);
ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
- _receive_convert_reply(lkb, ms_local);
+ _receive_convert_reply(lkb, ms_local, true);
/* Same special case as in receive_rcom_lock_args() */
lkb->lkb_grmode = DLM_LOCK_IV;
case DLM_MSG_UNLOCK:
hold_lkb(lkb);
memset(ms_local, 0, sizeof(struct dlm_message));
- ms_local->m_flags = cpu_to_le32(DLM_IFL_LOCAL_MS);
ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
- _receive_unlock_reply(lkb, ms_local);
+ _receive_unlock_reply(lkb, ms_local, true);
dlm_put_lkb(lkb);
break;
case DLM_MSG_CANCEL:
hold_lkb(lkb);
memset(ms_local, 0, sizeof(struct dlm_message));
- ms_local->m_flags = cpu_to_le32(DLM_IFL_LOCAL_MS);
ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
- _receive_cancel_reply(lkb, ms_local);
+ _receive_cancel_reply(lkb, ms_local, true);
dlm_put_lkb(lkb);
break;