spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
        if (error)
-               goto do_toss;
+               goto do_new;
        
        /*
         * rsb is active, so we can't check master_nodeid without lock_rsb.
         */
 
+       if (rsb_flag(r, RSB_TOSS))
+               goto do_toss;
+
        kref_get(&r->res_ref);
        goto out_unlock;
 
 
  do_toss:
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-       if (error)
-               goto do_new;
-
        /*
         * rsb found inactive (master_nodeid may be out of date unless
         * we are the dir_nodeid or were the master)  No other thread
                r->res_first_lkid = 0;
        }
 
-       rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-       error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+       rsb_clear_flag(r, RSB_TOSS);
        goto out_unlock;
 
 
        }
 
  out_add:
-       error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+       error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
  out_unlock:
        spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
        if (error)
+               goto do_new;
+
+       if (rsb_flag(r, RSB_TOSS))
                goto do_toss;
 
        /*
 
 
  do_toss:
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-       if (error)
-               goto do_new;
-
        /*
         * rsb found inactive. No other thread is using this rsb because
         * it's on the toss list, so we can look at or update
                r->res_nodeid = 0;
        }
 
-       rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-       error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+       rsb_clear_flag(r, RSB_TOSS);
        goto out_unlock;
 
 
        r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
        kref_init(&r->res_ref);
 
-       error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+       error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
  out_unlock:
        spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
                return error;
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
        if (!error) {
+               if (rsb_flag(r, RSB_TOSS))
+                       goto do_toss;
+
                /* because the rsb is active, we need to lock_rsb before
                 * checking/changing re_master_nodeid
                 */
                put_rsb(r);
 
                return 0;
-       }
-
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-       if (error)
+       } else {
                goto not_found;
+       }
 
+ do_toss:
        /* because the rsb is inactive (on toss list), it's not refcounted
         * and lock_rsb is not used, but is protected by the rsbtbl lock
         */
        r->res_nodeid = from_nodeid;
        kref_init(&r->res_ref);
        r->res_toss_time = jiffies;
+       rsb_set_flag(r, RSB_TOSS);
 
-       error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
+       error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
        if (error) {
                /* should never happen */
                dlm_free_rsb(r);
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-               for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+               for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       if (rsb_flag(r, RSB_TOSS))
+                               continue;
+
                        if (r->res_hash == hash)
                                dlm_dump_rsb(r);
                }
        b = hash & (ls->ls_rsbtbl_size - 1);
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
        if (!error)
-               goto out_dump;
-
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-       if (error)
                goto out;
- out_dump:
+
        dlm_dump_rsb(r);
  out:
        spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
        DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
        kref_init(&r->res_ref);
-       rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
-       rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
+       WARN_ON(rsb_flag(r, RSB_TOSS));
+       rsb_set_flag(r, RSB_TOSS);
        r->res_toss_time = jiffies;
        set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
        if (r->res_lvbptr) {
                return;
        }
 
-       for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
+       for (n = rb_first(&ls->ls_rsbtbl[b].r); n; n = next) {
                next = rb_next(n);
                r = rb_entry(n, struct dlm_rsb, res_hashnode);
+               if (!rsb_flag(r, RSB_TOSS))
+                       continue;
 
                /* If we're the directory record for this rsb, and
                   we're not the master of it, then we need to wait
                        continue;
                }
 
-               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
                dlm_free_rsb(r);
        }
 
                len = ls->ls_remove_lens[i];
 
                spin_lock_bh(&ls->ls_rsbtbl_lock);
-               rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+               rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
                if (rv) {
+                       spin_unlock_bh(&ls->ls_rsbtbl_lock);
+                       log_error(ls, "remove_name not found %s", name);
+                       continue;
+               }
+
+               if (!rsb_flag(r, RSB_TOSS)) {
                        spin_unlock_bh(&ls->ls_rsbtbl_lock);
                        log_debug(ls, "remove_name not toss %s", name);
                        continue;
                        continue;
                }
 
-               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
                send_remove(r);
                spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-       rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+       rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
        if (rv) {
-               /* verify the rsb is on keep list per comment above */
-               rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
-               if (rv) {
-                       /* should not happen */
-                       log_error(ls, "receive_remove from %d not found %s",
-                                 from_nodeid, name);
-                       spin_unlock_bh(&ls->ls_rsbtbl_lock);
-                       return;
-               }
+               /* should not happen */
+               log_error(ls, "%s from %d not found %s", __func__,
+                         from_nodeid, name);
+               spin_unlock_bh(&ls->ls_rsbtbl_lock);
+               return;
+       }
+
+       if (!rsb_flag(r, RSB_TOSS)) {
                if (r->res_master_nodeid != from_nodeid) {
                        /* should not happen */
                        log_error(ls, "receive_remove keep from %d master %d",
        }
 
        if (kref_put(&r->res_ref, kill_rsb)) {
-               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
                spin_unlock_bh(&ls->ls_rsbtbl_lock);
                dlm_free_rsb(r);
        } else {
        struct dlm_rsb *r;
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
-       for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
+       for (n = rb_first(&ls->ls_rsbtbl[bucket].r); n; n = rb_next(n)) {
                r = rb_entry(n, struct dlm_rsb, res_hashnode);
+               if (rsb_flag(r, RSB_TOSS))
+                       continue;
 
                if (!rsb_flag(r, RSB_RECOVER_GRANT))
                        continue;