goto out;
        }
 
-       err = ceph_pre_init_acls(dir, &mode, &as_ctx);
-       if (err < 0)
-               goto out;
-       err = ceph_security_init_secctx(dentry, mode, &as_ctx);
-       if (err < 0)
-               goto out;
-
        dout("mknod in dir %p dentry %p mode 0%ho rdev %d\n",
             dir, dentry, mode, rdev);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_MKNOD, USE_AUTH_MDS);
                err = PTR_ERR(req);
                goto out;
        }
+
+       req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
+       if (IS_ERR(req->r_new_inode)) {
+               err = PTR_ERR(req->r_new_inode);
+               req->r_new_inode = NULL;
+               goto out_req;
+       }
+
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
        req->r_parent = dir;
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL |
                             CEPH_CAP_XATTR_EXCL;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
-       if (as_ctx.pagelist) {
-               req->r_pagelist = as_ctx.pagelist;
-               as_ctx.pagelist = NULL;
-       }
+
+       ceph_as_ctx_to_req(req, &as_ctx);
+
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
+out_req:
        ceph_mdsc_put_request(req);
 out:
        if (!err)
        struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
        struct ceph_mds_request *req;
        struct ceph_acl_sec_ctx as_ctx = {};
+       umode_t mode = S_IFLNK | 0777;
        int err;
 
        if (ceph_snap(dir) != CEPH_NOSNAP)
                goto out;
        }
 
-       err = ceph_security_init_secctx(dentry, S_IFLNK | 0777, &as_ctx);
-       if (err < 0)
-               goto out;
-
        dout("symlink in dir %p dentry %p to '%s'\n", dir, dentry, dest);
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SYMLINK, USE_AUTH_MDS);
        if (IS_ERR(req)) {
                err = PTR_ERR(req);
                goto out;
        }
+
+       req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
+       if (IS_ERR(req->r_new_inode)) {
+               err = PTR_ERR(req->r_new_inode);
+               req->r_new_inode = NULL;
+               goto out_req;
+       }
+
        req->r_path2 = kstrdup(dest, GFP_KERNEL);
        if (!req->r_path2) {
                err = -ENOMEM;
-               ceph_mdsc_put_request(req);
-               goto out;
+               goto out_req;
        }
        req->r_parent = dir;
        ihold(dir);
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL |
                             CEPH_CAP_XATTR_EXCL;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
-       if (as_ctx.pagelist) {
-               req->r_pagelist = as_ctx.pagelist;
-               as_ctx.pagelist = NULL;
-       }
+
+       ceph_as_ctx_to_req(req, &as_ctx);
+
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err && !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
+out_req:
        ceph_mdsc_put_request(req);
 out:
        if (err)
                goto out;
        }
 
-       mode |= S_IFDIR;
-       err = ceph_pre_init_acls(dir, &mode, &as_ctx);
-       if (err < 0)
-               goto out;
-       err = ceph_security_init_secctx(dentry, mode, &as_ctx);
-       if (err < 0)
-               goto out;
 
        req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);
        if (IS_ERR(req)) {
                goto out;
        }
 
+       mode |= S_IFDIR;
+       req->r_new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
+       if (IS_ERR(req->r_new_inode)) {
+               err = PTR_ERR(req->r_new_inode);
+               req->r_new_inode = NULL;
+               goto out_req;
+       }
+
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
        req->r_parent = dir;
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL |
                             CEPH_CAP_XATTR_EXCL;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
-       if (as_ctx.pagelist) {
-               req->r_pagelist = as_ctx.pagelist;
-               as_ctx.pagelist = NULL;
-       }
+
+       ceph_as_ctx_to_req(req, &as_ctx);
+
        err = ceph_mdsc_do_request(mdsc, dir, req);
        if (!err &&
            !req->r_reply_info.head->is_target &&
            !req->r_reply_info.head->is_dentry)
                err = ceph_handle_notrace_create(dir, dentry);
+out_req:
        ceph_mdsc_put_request(req);
 out:
        if (!err)
 
        ceph_mdsc_release_dir_caps(req);
 }
 
-static int ceph_finish_async_create(struct inode *dir, struct dentry *dentry,
+static int ceph_finish_async_create(struct inode *dir, struct inode *inode,
+                                   struct dentry *dentry,
                                    struct file *file, umode_t mode,
                                    struct ceph_mds_request *req,
                                    struct ceph_acl_sec_ctx *as_ctx,
        struct ceph_mds_reply_info_in iinfo = { .in = &in };
        struct ceph_inode_info *ci = ceph_inode(dir);
        struct ceph_dentry_info *di = ceph_dentry(dentry);
-       struct inode *inode;
        struct timespec64 now;
        struct ceph_string *pool_ns;
        struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb);
 
        ktime_get_real_ts64(&now);
 
-       inode = ceph_get_inode(dentry->d_sb, vino);
-       if (IS_ERR(inode))
-               return PTR_ERR(inode);
-
        iinfo.inline_version = CEPH_INLINE_NONE;
        iinfo.change_attr = 1;
        ceph_encode_timespec64(&iinfo.btime, &now);
                ceph_dir_clear_complete(dir);
                if (!d_unhashed(dentry))
                        d_drop(dentry);
-               if (inode->i_state & I_NEW)
-                       discard_new_inode(inode);
+               discard_new_inode(inode);
        } else {
                struct dentry *dn;
 
        struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb);
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
+       struct inode *new_inode = NULL;
        struct dentry *dn;
        struct ceph_acl_sec_ctx as_ctx = {};
        bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS);
         */
        flags &= ~O_TRUNC;
 
+retry:
        if (flags & O_CREAT) {
                if (ceph_quota_is_max_files_exceeded(dir))
                        return -EDQUOT;
-               err = ceph_pre_init_acls(dir, &mode, &as_ctx);
-               if (err < 0)
-                       return err;
-               err = ceph_security_init_secctx(dentry, mode, &as_ctx);
-               if (err < 0)
+
+               new_inode = ceph_new_inode(dir, dentry, &mode, &as_ctx);
+               if (IS_ERR(new_inode)) {
+                       err = PTR_ERR(new_inode);
                        goto out_ctx;
+               }
                /* Async create can't handle more than a page of xattrs */
                if (as_ctx.pagelist &&
                    !list_is_singular(&as_ctx.pagelist->head))
                /* If it's not being looked up, it's negative */
                return -ENOENT;
        }
-retry:
+
        /* do the open */
        req = prepare_open_request(dir->i_sb, flags, mode);
        if (IS_ERR(req)) {
                req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL |
                                     CEPH_CAP_XATTR_EXCL;
                req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
-               if (as_ctx.pagelist) {
-                       req->r_pagelist = as_ctx.pagelist;
-                       as_ctx.pagelist = NULL;
-               }
-               if (try_async &&
-                   (req->r_dir_caps =
-                     try_prep_async_create(dir, dentry, &lo,
-                                           &req->r_deleg_ino))) {
+
+               ceph_as_ctx_to_req(req, &as_ctx);
+
+               if (try_async && (req->r_dir_caps =
+                                 try_prep_async_create(dir, dentry, &lo,
+                                                       &req->r_deleg_ino))) {
+                       struct ceph_vino vino = { .ino = req->r_deleg_ino,
+                                                 .snap = CEPH_NOSNAP };
                        struct ceph_dentry_info *di = ceph_dentry(dentry);
 
                        set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags);
                        req->r_args.open.flags |= cpu_to_le32(CEPH_O_EXCL);
                        req->r_callback = ceph_async_create_cb;
 
+                       /* Hash inode before RPC */
+                       new_inode = ceph_get_inode(dir->i_sb, vino, new_inode);
+                       if (IS_ERR(new_inode)) {
+                               err = PTR_ERR(new_inode);
+                               new_inode = NULL;
+                               goto out_req;
+                       }
+                       WARN_ON_ONCE(!(new_inode->i_state & I_NEW));
+
                        spin_lock(&dentry->d_lock);
                        di->flags |= CEPH_DENTRY_ASYNC_CREATE;
                        spin_unlock(&dentry->d_lock);
 
                        err = ceph_mdsc_submit_request(mdsc, dir, req);
                        if (!err) {
-                               err = ceph_finish_async_create(dir, dentry,
-                                                       file, mode, req,
-                                                       &as_ctx, &lo);
+                               err = ceph_finish_async_create(dir, new_inode,
+                                                              dentry, file,
+                                                              mode, req,
+                                                              &as_ctx, &lo);
+                               new_inode = NULL;
                        } else if (err == -EJUKEBOX) {
                                restore_deleg_ino(dir, req->r_deleg_ino);
                                ceph_mdsc_put_request(req);
+                               discard_new_inode(new_inode);
+                               ceph_release_acl_sec_ctx(&as_ctx);
+                               memset(&as_ctx, 0, sizeof(as_ctx));
+                               new_inode = NULL;
                                try_async = false;
                                ceph_put_string(rcu_dereference_raw(lo.pool_ns));
                                goto retry;
        }
 
        set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
+       req->r_new_inode = new_inode;
+       new_inode = NULL;
        err = ceph_mdsc_do_request(mdsc, (flags & O_CREAT) ? dir : NULL, req);
        if (err == -ENOENT) {
                dentry = ceph_handle_snapdir(req, dentry);
        }
 out_req:
        ceph_mdsc_put_request(req);
+       iput(new_inode);
 out_ctx:
        ceph_release_acl_sec_ctx(&as_ctx);
        dout("atomic_open result=%d\n", err);
 
        return 0;
 }
 
-struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino)
+/**
+ * ceph_new_inode - allocate a new inode in advance of an expected create
+ * @dir: parent directory for new inode
+ * @dentry: dentry that may eventually point to new inode
+ * @mode: mode of new inode
+ * @as_ctx: pointer to inherited security context
+ *
+ * Allocate a new inode in advance of an operation to create a new inode.
+ * This allocates the inode and sets up the acl_sec_ctx with appropriate
+ * info for the new inode.
+ *
+ * Returns a pointer to the new inode or an ERR_PTR.
+ */
+struct inode *ceph_new_inode(struct inode *dir, struct dentry *dentry,
+                            umode_t *mode, struct ceph_acl_sec_ctx *as_ctx)
+{
+       int err;
+       struct inode *inode;
+
+       inode = new_inode(dir->i_sb);
+       if (!inode)
+               return ERR_PTR(-ENOMEM);
+
+       if (!S_ISLNK(*mode)) {
+               err = ceph_pre_init_acls(dir, mode, as_ctx);
+               if (err < 0)
+                       goto out_err;
+       }
+
+       err = ceph_security_init_secctx(dentry, *mode, as_ctx);
+       if (err < 0)
+               goto out_err;
+
+       inode->i_state = 0;
+       inode->i_mode = *mode;
+       return inode;
+out_err:
+       iput(inode);
+       return ERR_PTR(err);
+}
+
+void ceph_as_ctx_to_req(struct ceph_mds_request *req,
+                       struct ceph_acl_sec_ctx *as_ctx)
+{
+       if (as_ctx->pagelist) {
+               req->r_pagelist = as_ctx->pagelist;
+               as_ctx->pagelist = NULL;
+       }
+}
+
+/**
+ * ceph_get_inode - find or create/hash a new inode
+ * @sb: superblock to search and allocate in
+ * @vino: vino to search for
+ * @newino: optional new inode to insert if one isn't found (may be NULL)
+ *
+ * Search for or insert a new inode into the hash for the given vino, and
+ * return a reference to it. If new is non-NULL, its reference is consumed.
+ */
+struct inode *ceph_get_inode(struct super_block *sb, struct ceph_vino vino,
+                            struct inode *newino)
 {
        struct inode *inode;
 
        if (ceph_vino_is_reserved(vino))
                return ERR_PTR(-EREMOTEIO);
 
-       inode = iget5_locked(sb, (unsigned long)vino.ino, ceph_ino_compare,
-                            ceph_set_ino_cb, &vino);
-       if (!inode)
+       if (newino) {
+               inode = inode_insert5(newino, (unsigned long)vino.ino,
+                                     ceph_ino_compare, ceph_set_ino_cb, &vino);
+               if (inode != newino)
+                       iput(newino);
+       } else {
+               inode = iget5_locked(sb, (unsigned long)vino.ino,
+                                    ceph_ino_compare, ceph_set_ino_cb, &vino);
+       }
+
+       if (!inode) {
+               dout("No inode found for %llx.%llx\n", vino.ino, vino.snap);
                return ERR_PTR(-ENOMEM);
+       }
 
        dout("get_inode on %llu=%llx.%llx got %p new %d\n", ceph_present_inode(inode),
             ceph_vinop(inode), inode, !!(inode->i_state & I_NEW));
                .ino = ceph_ino(parent),
                .snap = CEPH_SNAPDIR,
        };
-       struct inode *inode = ceph_get_inode(parent->i_sb, vino);
+       struct inode *inode = ceph_get_inode(parent->i_sb, vino, NULL);
        struct ceph_inode_info *ci = ceph_inode(inode);
 
        if (IS_ERR(inode))
                vino.ino = le64_to_cpu(rde->inode.in->ino);
                vino.snap = le64_to_cpu(rde->inode.in->snapid);
 
-               in = ceph_get_inode(req->r_dentry->d_sb, vino);
+               in = ceph_get_inode(req->r_dentry->d_sb, vino, NULL);
                if (IS_ERR(in)) {
                        err = PTR_ERR(in);
                        dout("new_inode badness got %d\n", err);
                if (d_really_is_positive(dn)) {
                        in = d_inode(dn);
                } else {
-                       in = ceph_get_inode(parent->d_sb, tvino);
+                       in = ceph_get_inode(parent->d_sb, tvino, NULL);
                        if (IS_ERR(in)) {
                                dout("new_inode badness\n");
                                d_drop(dn);
 
                iput(req->r_parent);
        }
        iput(req->r_target_inode);
+       iput(req->r_new_inode);
        if (req->r_dentry)
                dput(req->r_dentry);
        if (req->r_old_dentry)
 
        /* Must find target inode outside of mutexes to avoid deadlocks */
        if ((err >= 0) && rinfo->head->is_target) {
-               struct inode *in;
+               struct inode *in = xchg(&req->r_new_inode, NULL);
                struct ceph_vino tvino = {
                        .ino  = le64_to_cpu(rinfo->targeti.in->ino),
                        .snap = le64_to_cpu(rinfo->targeti.in->snapid)
                };
 
-               in = ceph_get_inode(mdsc->fsc->sb, tvino);
+               /*
+                * If we ended up opening an existing inode, discard
+                * r_new_inode
+                */
+               if (req->r_op == CEPH_MDS_OP_CREATE &&
+                   !req->r_reply_info.has_create_ino) {
+                       /* This should never happen on an async create */
+                       WARN_ON_ONCE(req->r_deleg_ino);
+                       iput(in);
+                       in = NULL;
+               }
+
+               in = ceph_get_inode(mdsc->fsc->sb, tvino, in);
                if (IS_ERR(in)) {
                        err = PTR_ERR(in);
                        mutex_lock(&session->s_mutex);
 
 
        struct inode *r_parent;             /* parent dir inode */
        struct inode *r_target_inode;       /* resulting inode */
+       struct inode *r_new_inode;          /* new inode (for creates) */
 
 #define CEPH_MDS_R_DIRECT_IS_HASH      (1) /* r_direct_hash is valid */
 #define CEPH_MDS_R_ABORTED             (2) /* call was aborted */
 
 /* inode.c */
 struct ceph_mds_reply_info_in;
 struct ceph_mds_reply_dirfrag;
+struct ceph_acl_sec_ctx;
 
 extern const struct inode_operations ceph_file_iops;
 
 extern void ceph_evict_inode(struct inode *inode);
 extern void ceph_free_inode(struct inode *inode);
 
+struct inode *ceph_new_inode(struct inode *dir, struct dentry *dentry,
+                            umode_t *mode, struct ceph_acl_sec_ctx *as_ctx);
+void ceph_as_ctx_to_req(struct ceph_mds_request *req,
+                       struct ceph_acl_sec_ctx *as_ctx);
+
 extern struct inode *ceph_get_inode(struct super_block *sb,
-                                   struct ceph_vino vino);
+                                   struct ceph_vino vino,
+                                   struct inode *newino);
 extern struct inode *ceph_get_snapdir(struct inode *parent);
 extern int ceph_fill_file_size(struct inode *inode, int issued,
                               u32 truncate_seq, u64 truncate_size, u64 size);