ceph: add ceph_cap_unlink_work to fire check_caps() immediately
authorXiubo Li <xiubli@redhat.com>
Thu, 14 Sep 2023 02:29:16 +0000 (10:29 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Tue, 13 Feb 2024 10:22:54 +0000 (11:22 +0100)
When unlinking a file the check caps could be delayed for more than
5 seconds, but in MDS side it maybe waiting for the clients to
release caps.

This will use the cap_wq work queue and a dedicated list to help
fire the check_caps() and dirty buffer flushing immediately.

Link: https://tracker.ceph.com/issues/50223
Signed-off-by: Xiubo Li <xiubli@redhat.com>
Reviewed-by: Milind Changire <mchangir@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/caps.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h

index bce3a840f15c2671b10494b6dc8cb5bf3879ce37..7fb4aae97412464c54b42037f75016085d214bd3 100644 (file)
@@ -4772,7 +4772,22 @@ int ceph_drop_caps_for_unlink(struct inode *inode)
                if (__ceph_caps_dirty(ci)) {
                        struct ceph_mds_client *mdsc =
                                ceph_inode_to_fs_client(inode)->mdsc;
-                       __cap_delay_requeue_front(mdsc, ci);
+
+                       doutc(mdsc->fsc->client, "%p %llx.%llx\n", inode,
+                             ceph_vinop(inode));
+                       spin_lock(&mdsc->cap_unlink_delay_lock);
+                       ci->i_ceph_flags |= CEPH_I_FLUSH;
+                       if (!list_empty(&ci->i_cap_delay_list))
+                               list_del_init(&ci->i_cap_delay_list);
+                       list_add_tail(&ci->i_cap_delay_list,
+                                     &mdsc->cap_unlink_delay_list);
+                       spin_unlock(&mdsc->cap_unlink_delay_lock);
+
+                       /*
+                        * Fire the work immediately, because the MDS maybe
+                        * waiting for caps release.
+                        */
+                       ceph_queue_cap_unlink_work(mdsc);
                }
        }
        spin_unlock(&ci->i_ceph_lock);
index f71bb9c9569fc754f447b50cfb8abc89083fe7fd..3ab9c268a8bb398b779cc93d3da98f3d13df8fe3 100644 (file)
@@ -2484,6 +2484,50 @@ void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
        }
 }
 
+void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc)
+{
+       struct ceph_client *cl = mdsc->fsc->client;
+       if (mdsc->stopping)
+               return;
+
+        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) {
+                doutc(cl, "caps unlink work queued\n");
+        } else {
+                doutc(cl, "failed to queue caps unlink work\n");
+        }
+}
+
+static void ceph_cap_unlink_work(struct work_struct *work)
+{
+       struct ceph_mds_client *mdsc =
+               container_of(work, struct ceph_mds_client, cap_unlink_work);
+       struct ceph_client *cl = mdsc->fsc->client;
+
+       doutc(cl, "begin\n");
+       spin_lock(&mdsc->cap_unlink_delay_lock);
+       while (!list_empty(&mdsc->cap_unlink_delay_list)) {
+               struct ceph_inode_info *ci;
+               struct inode *inode;
+
+               ci = list_first_entry(&mdsc->cap_unlink_delay_list,
+                                     struct ceph_inode_info,
+                                     i_cap_delay_list);
+               list_del_init(&ci->i_cap_delay_list);
+
+               inode = igrab(&ci->netfs.inode);
+               if (inode) {
+                       spin_unlock(&mdsc->cap_unlink_delay_lock);
+                       doutc(cl, "on %p %llx.%llx\n", inode,
+                             ceph_vinop(inode));
+                       ceph_check_caps(ci, CHECK_CAPS_FLUSH);
+                       iput(inode);
+                       spin_lock(&mdsc->cap_unlink_delay_lock);
+               }
+       }
+       spin_unlock(&mdsc->cap_unlink_delay_lock);
+       doutc(cl, "done\n");
+}
+
 /*
  * requests
  */
@@ -5359,6 +5403,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        INIT_LIST_HEAD(&mdsc->cap_delay_list);
        INIT_LIST_HEAD(&mdsc->cap_wait_list);
        spin_lock_init(&mdsc->cap_delay_lock);
+       INIT_LIST_HEAD(&mdsc->cap_unlink_delay_list);
+       spin_lock_init(&mdsc->cap_unlink_delay_lock);
        INIT_LIST_HEAD(&mdsc->snap_flush_list);
        spin_lock_init(&mdsc->snap_flush_lock);
        mdsc->last_cap_flush_tid = 1;
@@ -5367,6 +5413,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        spin_lock_init(&mdsc->cap_dirty_lock);
        init_waitqueue_head(&mdsc->cap_flushing_wq);
        INIT_WORK(&mdsc->cap_reclaim_work, ceph_cap_reclaim_work);
+       INIT_WORK(&mdsc->cap_unlink_work, ceph_cap_unlink_work);
        err = ceph_metric_init(&mdsc->metric);
        if (err)
                goto err_mdsmap;
@@ -5640,6 +5687,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc)
        ceph_cleanup_global_and_empty_realms(mdsc);
 
        cancel_work_sync(&mdsc->cap_reclaim_work);
+       cancel_work_sync(&mdsc->cap_unlink_work);
        cancel_delayed_work_sync(&mdsc->delayed_work); /* cancel timer */
 
        doutc(cl, "done\n");
index 40560af3882720bd4bc90c1f728cd2ebeaaf3de9..03f8ff00874f727adff8b88cc8d538fc989692d8 100644 (file)
@@ -462,6 +462,8 @@ struct ceph_mds_client {
        unsigned long    last_renew_caps;  /* last time we renewed our caps */
        struct list_head cap_delay_list;   /* caps with delayed release */
        spinlock_t       cap_delay_lock;   /* protects cap_delay_list */
+       struct list_head cap_unlink_delay_list;  /* caps with delayed release for unlink */
+       spinlock_t       cap_unlink_delay_lock;  /* protects cap_unlink_delay_list */
        struct list_head snap_flush_list;  /* cap_snaps ready to flush */
        spinlock_t       snap_flush_lock;
 
@@ -475,6 +477,8 @@ struct ceph_mds_client {
        struct work_struct cap_reclaim_work;
        atomic_t           cap_reclaim_pending;
 
+       struct work_struct cap_unlink_work;
+
        /*
         * Cap reservations
         *
@@ -574,6 +578,7 @@ extern void ceph_flush_cap_releases(struct ceph_mds_client *mdsc,
                                    struct ceph_mds_session *session);
 extern void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc);
 extern void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr);
+extern void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc);
 extern int ceph_iterate_session_caps(struct ceph_mds_session *session,
                                     int (*cb)(struct inode *, int mds, void *),
                                     void *arg);