if (ceph_snap(dir) != CEPH_NOSNAP)
                return -EROFS;
 
+       err = ceph_wait_on_conflict_unlink(dentry);
+       if (err)
+               return err;
+
        if (ceph_quota_is_max_files_exceeded(dir)) {
                err = -EDQUOT;
                goto out;
        if (ceph_snap(dir) != CEPH_NOSNAP)
                return -EROFS;
 
+       err = ceph_wait_on_conflict_unlink(dentry);
+       if (err)
+               return err;
+
        if (ceph_quota_is_max_files_exceeded(dir)) {
                err = -EDQUOT;
                goto out;
        struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
        struct ceph_mds_request *req;
        struct ceph_acl_sec_ctx as_ctx = {};
-       int err = -EROFS;
+       int err;
        int op;
 
+       err = ceph_wait_on_conflict_unlink(dentry);
+       if (err)
+               return err;
+
        if (ceph_snap(dir) == CEPH_SNAPDIR) {
                /* mkdir .snap/foo is a MKSNAP */
                op = CEPH_MDS_OP_MKSNAP;
                dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode);
                op = CEPH_MDS_OP_MKDIR;
        } else {
+               err = -EROFS;
                goto out;
        }
 
        struct ceph_mds_request *req;
        int err;
 
+       err = ceph_wait_on_conflict_unlink(dentry);
+       if (err)
+               return err;
+
        if (ceph_snap(dir) != CEPH_NOSNAP)
                return -EROFS;
 
 static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc,
                                 struct ceph_mds_request *req)
 {
+       struct dentry *dentry = req->r_dentry;
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
+       struct ceph_dentry_info *di = ceph_dentry(dentry);
        int result = req->r_err ? req->r_err :
                        le32_to_cpu(req->r_reply_info.head->result);
 
+       if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
+               pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
+                       __func__, dentry, dentry);
+
+       spin_lock(&fsc->async_unlink_conflict_lock);
+       hash_del_rcu(&di->hnode);
+       spin_unlock(&fsc->async_unlink_conflict_lock);
+
+       spin_lock(&dentry->d_lock);
+       di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
+       wake_up_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT);
+       spin_unlock(&dentry->d_lock);
+
+       synchronize_rcu();
+
        if (result == -EJUKEBOX)
                goto out;
 
        if (result) {
                int pathlen = 0;
                u64 base = 0;
-               char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
+               char *path = ceph_mdsc_build_path(dentry, &pathlen,
                                                  &base, 0);
 
                /* mark error on parent + clear complete */
                ceph_dir_clear_complete(req->r_parent);
 
                /* drop the dentry -- we don't know its status */
-               if (!d_unhashed(req->r_dentry))
-                       d_drop(req->r_dentry);
+               if (!d_unhashed(dentry))
+                       d_drop(dentry);
 
                /* mark inode itself for an error (since metadata is bogus) */
                mapping_set_error(req->r_old_inode->i_mapping, result);
 
-               pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n",
+               pr_warn("async unlink failure path=(%llx)%s result=%d!\n",
                        base, IS_ERR(path) ? "<<bad>>" : path, result);
                ceph_mdsc_free_path(path, pathlen);
        }
 
        if (try_async && op == CEPH_MDS_OP_UNLINK &&
            (req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) {
+               struct ceph_dentry_info *di = ceph_dentry(dentry);
+
                dout("async unlink on %llu/%.*s caps=%s", ceph_ino(dir),
                     dentry->d_name.len, dentry->d_name.name,
                     ceph_cap_string(req->r_dir_caps));
                req->r_callback = ceph_async_unlink_cb;
                req->r_old_inode = d_inode(dentry);
                ihold(req->r_old_inode);
+
+               spin_lock(&dentry->d_lock);
+               di->flags |= CEPH_DENTRY_ASYNC_UNLINK;
+               spin_unlock(&dentry->d_lock);
+
+               spin_lock(&fsc->async_unlink_conflict_lock);
+               hash_add_rcu(fsc->async_unlink_conflict, &di->hnode,
+                            dentry->d_name.hash);
+               spin_unlock(&fsc->async_unlink_conflict_lock);
+
                err = ceph_mdsc_submit_request(mdsc, dir, req);
                if (!err) {
                        /*
                         */
                        drop_nlink(inode);
                        d_delete(dentry);
-               } else if (err == -EJUKEBOX) {
-                       try_async = false;
-                       ceph_mdsc_put_request(req);
-                       goto retry;
+               } else {
+                       spin_lock(&fsc->async_unlink_conflict_lock);
+                       hash_del_rcu(&di->hnode);
+                       spin_unlock(&fsc->async_unlink_conflict_lock);
+
+                       spin_lock(&dentry->d_lock);
+                       di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK;
+                       spin_unlock(&dentry->d_lock);
+
+                       if (err == -EJUKEBOX) {
+                               try_async = false;
+                               ceph_mdsc_put_request(req);
+                               goto retry;
+                       }
                }
        } else {
                set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
            (!ceph_quota_is_same_realm(old_dir, new_dir)))
                return -EXDEV;
 
+       err = ceph_wait_on_conflict_unlink(new_dentry);
+       if (err)
+               return err;
+
        dout("rename dir %p dentry %p to dir %p dentry %p\n",
             old_dir, old_dentry, new_dir, new_dentry);
        req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
 
                char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen,
                                                  &base, 0);
 
-               pr_warn("ceph: async create failure path=(%llx)%s result=%d!\n",
+               pr_warn("async create failure path=(%llx)%s result=%d!\n",
                        base, IS_ERR(path) ? "<<bad>>" : path, result);
                ceph_mdsc_free_path(path, pathlen);
 
        if (dentry->d_name.len > NAME_MAX)
                return -ENAMETOOLONG;
 
+       err = ceph_wait_on_conflict_unlink(dentry);
+       if (err)
+               return err;
+
        if (flags & O_CREAT) {
                if (ceph_quota_is_max_files_exceeded(dir))
                        return -EDQUOT;
 
                                dout("added delegated inode 0x%llx\n",
                                     start - 1);
                        } else if (err == -EBUSY) {
-                               pr_warn("ceph: MDS delegated inode 0x%llx more than once.\n",
+                               pr_warn("MDS delegated inode 0x%llx more than once.\n",
                                        start - 1);
                        } else {
                                return err;
        free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
 }
 
+/*
+ * In async unlink case the kclient won't wait for the first reply
+ * from MDS and just drop all the links and unhash the dentry and then
+ * succeeds immediately.
+ *
+ * For any new create/link/rename,etc requests followed by using the
+ * same file names we must wait for the first reply of the inflight
+ * unlink request, or the MDS possibly will fail these following
+ * requests with -EEXIST if the inflight async unlink request was
+ * delayed for some reasons.
+ *
+ * And the worst case is that for the none async openc request it will
+ * successfully open the file if the CDentry hasn't been unlinked yet,
+ * but later the previous delayed async unlink request will remove the
+ * CDenty. That means the just created file is possiblly deleted later
+ * by accident.
+ *
+ * We need to wait for the inflight async unlink requests to finish
+ * when creating new files/directories by using the same file names.
+ */
+int ceph_wait_on_conflict_unlink(struct dentry *dentry)
+{
+       struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb);
+       struct dentry *pdentry = dentry->d_parent;
+       struct dentry *udentry, *found = NULL;
+       struct ceph_dentry_info *di;
+       struct qstr dname;
+       u32 hash = dentry->d_name.hash;
+       int err;
+
+       dname.name = dentry->d_name.name;
+       dname.len = dentry->d_name.len;
+
+       rcu_read_lock();
+       hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
+                                  hnode, hash) {
+               udentry = di->dentry;
+
+               spin_lock(&udentry->d_lock);
+               if (udentry->d_name.hash != hash)
+                       goto next;
+               if (unlikely(udentry->d_parent != pdentry))
+                       goto next;
+               if (!hash_hashed(&di->hnode))
+                       goto next;
+
+               if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
+                       pr_warn("%s dentry %p:%pd async unlink bit is not set\n",
+                               __func__, dentry, dentry);
+
+               if (!d_same_name(udentry, pdentry, &dname))
+                       goto next;
+
+               spin_unlock(&udentry->d_lock);
+               found = dget(udentry);
+               break;
+next:
+               spin_unlock(&udentry->d_lock);
+       }
+       rcu_read_unlock();
+
+       if (likely(!found))
+               return 0;
+
+       dout("%s dentry %p:%pd conflict with old %p:%pd\n", __func__,
+            dentry, dentry, found, found);
+
+       err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
+                         TASK_KILLABLE);
+       dput(found);
+       return err;
+}
+
 
 /*
  * sessions
 
                           TASK_KILLABLE);
 }
 
+extern int ceph_wait_on_conflict_unlink(struct dentry *dentry);
 extern u64 ceph_get_deleg_ino(struct ceph_mds_session *session);
 extern int ceph_restore_deleg_ino(struct ceph_mds_session *session, u64 ino);
 #endif
 
        if (!fsc->cap_wq)
                goto fail_inode_wq;
 
+       hash_init(fsc->async_unlink_conflict);
+       spin_lock_init(&fsc->async_unlink_conflict_lock);
+
        spin_lock(&ceph_fsc_lock);
        list_add_tail(&fsc->metric_wakeup, &ceph_fsc_list);
        spin_unlock(&ceph_fsc_lock);
 
 #include <linux/security.h>
 #include <linux/netfs.h>
 #include <linux/fscache.h>
+#include <linux/hashtable.h>
 
 #include <linux/ceph/libceph.h>
 
        char *mon_addr;
 };
 
+#define CEPH_ASYNC_CREATE_CONFLICT_BITS 8
+
 struct ceph_fs_client {
        struct super_block *sb;
 
        struct workqueue_struct *inode_wq;
        struct workqueue_struct *cap_wq;
 
+       DECLARE_HASHTABLE(async_unlink_conflict, CEPH_ASYNC_CREATE_CONFLICT_BITS);
+       spinlock_t async_unlink_conflict_lock;
+
 #ifdef CONFIG_DEBUG_FS
        struct dentry *debugfs_dentry_lru, *debugfs_caps;
        struct dentry *debugfs_congestion_kb;
        struct dentry *dentry;
        struct ceph_mds_session *lease_session;
        struct list_head lease_list;
-       unsigned flags;
+       struct hlist_node hnode;
+       unsigned long flags;
        int lease_shared_gen;
        u32 lease_gen;
        u32 lease_seq;
        u64 offset;
 };
 
-#define CEPH_DENTRY_REFERENCED         1
-#define CEPH_DENTRY_LEASE_LIST         2
-#define CEPH_DENTRY_SHRINK_LIST                4
-#define CEPH_DENTRY_PRIMARY_LINK       8
+#define CEPH_DENTRY_REFERENCED         (1 << 0)
+#define CEPH_DENTRY_LEASE_LIST         (1 << 1)
+#define CEPH_DENTRY_SHRINK_LIST                (1 << 2)
+#define CEPH_DENTRY_PRIMARY_LINK       (1 << 3)
+#define CEPH_DENTRY_ASYNC_UNLINK_BIT   (4)
+#define CEPH_DENTRY_ASYNC_UNLINK       (1 << CEPH_DENTRY_ASYNC_UNLINK_BIT)
 
 struct ceph_inode_xattrs_info {
        /*