diff --git a/PendingReleaseNotes b/PendingReleaseNotes
index e23c9eb08b1..ff7a4a00af1 100644
--- a/PendingReleaseNotes
+++ b/PendingReleaseNotes
@@ -91,6 +91,13 @@
   of the system.
 * CephFS: Full support for subvolumes and subvolume groups is now available
   for snap_schedule Manager module.
+* CephFS: Two FS names can now be swapped, optionally along with their IDs,
+  using "ceph fs swap" command. The function of this API is to facilitate
+  file system swaps for disaster recovery. In particular, it avoids situations
+  where a named file system is temporarily missing which would prompt a higher
+  level storage operator (like Rook) to recreate the missing file system.
+  See https://docs.ceph.com/en/latest/cephfs/administration/#file-systems
+  docs for more information.
 * CephFS: The `subvolume snapshot clone` command now depends on the config option
   `snapshot_clone_no_wait` which is used to reject the clone operation when
diff --git a/doc/cephfs/administration.rst b/doc/cephfs/administration.rst
index cd912b42aea..dd9a8e25b16 100644
--- a/doc/cephfs/administration.rst
+++ b/doc/cephfs/administration.rst
@@ -92,6 +92,46 @@ The CephX IDs authorized to the old file system name need to be reauthorized
 to the new name. Any on-going operations of the clients using these IDs may be
 disrupted. Mirroring is expected to be disabled on the file system.
+    fs swap <fs1-name> <fs1_id> <fs2-name> <fs2_id> [--swap-fscids=yes|no] [--yes-i-really-mean-it]
+Swaps names of two Ceph file sytems and updates the application tags on all
+pools of both FSs accordingly. Certain tools that track FSCIDs of the file
+systems, besides the FS names, might get confused due to this operation. For
+this reason, mandatory option ``--swap-fscids`` has been provided that must be
+used to indicate whether or not FSCIDs must be swapped.
+.. note:: FSCID stands for "File System Cluster ID".
+Before the swap, mirroring should be disabled on both the CephFSs
+(because the cephfs-mirror daemon uses the fscid internally and changing it
+while the daemon is running could result in undefined behaviour), both the
+CephFSs should be offline and the file system flag ``refuse_client_sessions``
+must be set for both the CephFS.
+The function of this API is to facilitate disaster recovery where a new file
+system reconstructed from the previous one is ready to take over for the
+possibly damaged file system. Instead of two ``fs rename`` operations, the
+operator can use a swap so there is no FSMap epoch where the primary (or
+production) named file system does not exist. This is important when Ceph is
+monitored by automatic storage operators like (Rook) which try to reconcile
+the storage system continuously. That operator may attempt to recreate the
+file system as soon as it is seen to not exist.
+After the swap, CephX credentials may need to be reauthorized if the existing
+mounts should "follow" the old file system to its new name. Generally, for
+disaster recovery, its desirable for the existing mounts to continue using
+the same file system name. Any active file system mounts for either CephFSs
+must remount. Existing unflushed operations will be lost. When it is judged
+that one of the swapped file systems is ready for clients, run::
+    ceph fs set <fs> joinable true
+    ceph fs set <fs> refuse_client_sessions false
+Keep in mind that one of the swapped file systems may be left offline for
+future analysis if doing a disaster recovery swap.
diff --git a/doc/man/8/ceph.rst b/doc/man/8/ceph.rst
index 881c0d3077b..6487b2ca8f6 100644
--- a/doc/man/8/ceph.rst
+++ b/doc/man/8/ceph.rst
@@ -23,7 +23,7 @@ Synopsis
 | **ceph** **df** *{detail}*
-| **ceph** **fs** [ *ls* \| *new* \| *reset* \| *rm* \| *authorize* ] ...
+| **ceph** **fs** [ *ls* \| *new* \| *reset* \| *rm* \| *authorize* \| *swap* ] ...
 | **ceph** **fsid**
@@ -393,6 +393,15 @@ Usage::
     ceph fs authorize <fs_name> client.<client_id> <path> <perms> [<path> <perms>...]
+Subcommand ``swap`` swaps the names of two Ceph file system and updates
+application tags on the pool of the file systems accordingly. Optionally,
+FSIDs of the filesystems can also be swapped along with names by passing
+    ceph fs swap <fs1-name> <fs1-id> <fs2-name> <fs2-id> [--swap-fscids] {--yes-i-really-meant-it}
diff --git a/qa/tasks/cephfs/test_admin.py b/qa/tasks/cephfs/test_admin.py
index 252d85c7644..3e00b8be028 100644
--- a/qa/tasks/cephfs/test_admin.py
+++ b/qa/tasks/cephfs/test_admin.py
@@ -821,6 +821,813 @@ class TestRenameCommand(TestAdminCommands):
         self.run_ceph_cmd(f'fs mirror disable {orig_fs_name}')
+class TestSwapCmd(TestAdminCommands):
+    """
+    Tests for "ceph fs swap" command.
+    """
+    client_id = 'testuser'
+    client_name = f'client.{client_id}'
+    def setUp(self):
+        super(self.__class__, self).setUp()
+        self.fs1 = self.fs
+        self.fs2 = self.mds_cluster.newfs(name='testcephfs2', create=True)
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self.orig_fs_id_name = {self.fs1.id: self.fs1.name,
+                                self.fs2.id: self.fs2.name}
+        self.mount_a.remount(cephfs_name=self.fs1.name)
+        self.mount_b.remount(cephfs_name=self.fs2.name)
+        self.captesters = (CapTester(self.mount_a), CapTester(self.mount_b))
+        self.mount_a.umount_wait()
+        self.mount_b.umount_wait()
+    def tearDown(self):
+        self.mount_a.umount_wait()
+        self.mount_b.umount_wait()
+        self.run_ceph_cmd(args=f'auth rm {self.client_name}')
+        super(self.__class__, self).tearDown()
+    def _reauthorize_client(self):
+        moncap = gen_mon_cap_str((("rw", self.fs1.name),
+                                  ("rw", self.fs2.name)))
+        osdcap = gen_osd_cap_str((("rw", self.fs1.name),
+                                  ("rw", self.fs2.name)))
+        mdscap = gen_mds_cap_str((("rw", self.fs1.name),
+                                  ("rw", self.fs2.name)))
+        self.run_ceph_cmd(args=f'auth add {self.client_name} mon "{moncap}" '
+                               f'osd "{osdcap}" mds "{mdscap}"')
+    def _remount_both_cephfss(self):
+        keyring = self.fs.mon_manager.get_keyring(self.client_id) + '\n'
+        keyring_path_a = self.mount_a.client_remote.mktemp(data=keyring)
+        keyring_path_b = self.mount_b.client_remote.mktemp(data=keyring)
+        self.mount_a.mount(client_id=self.client_id,
+                           client_keyring_path=keyring_path_a,
+                           cephfs_mntpt='/', cephfs_name=self.fs1.name)
+        self.mount_b.mount(client_id=self.client_id,
+                           client_keyring_path=keyring_path_b,
+                           cephfs_mntpt='/', cephfs_name=self.fs2.name)
+    def run_rw_tests(self):
+        for captester in self.captesters:
+            captester.conduct_pos_test_for_read_caps()
+            captester.conduct_pos_test_for_write_caps()
+    def _check_fs_name_on_fs_pools(self, fss):
+        for fs in fss:
+            for pool in fs.get_data_pool_names(True):
+                self.check_pool_application_metadata_key_value(pool, 'cephfs',
+                    'data', fs.name)
+            self.check_pool_application_metadata_key_value(
+                fs.get_metadata_pool_name(), 'cephfs', 'metadata', fs.name)
+    def _are_fsnames_and_fscids_together(self):
+        '''
+        Are FS names and FSIDs together on same the FS as they were before
+        running "ceph fs swap" command?
+        '''
+        fs1_id_swapped = self.orig_fs_id_name[self.fs1.id] == self.fs1.name
+        fs2_id_swapped = self.orig_fs_id_name[self.fs2.id] == self.fs2.name
+        if fs1_id_swapped and fs2_id_swapped:
+            return True
+        elif not fs1_id_swapped and not fs2_id_swapped:
+            return False
+        else:
+            raise RuntimeError(
+                'Unexpected situation occured: FSID for one FS swapped but '
+                'not for the other FS.')
+    def _bring_both_cephfss_down(self):
+        self.run_ceph_cmd(f'fs fail {self.fs1.name}')
+        self.run_ceph_cmd(f'fs fail {self.fs2.name}')
+    def _bring_both_cephfss_up(self):
+        self.run_ceph_cmd(f'fs set {self.fs1.name} joinable true')
+        self.run_ceph_cmd(f'fs set {self.fs2.name} joinable true')
+    def _refuse_clients_for_both_cephfss(self):
+        self.run_ceph_cmd(f'fs set {self.fs1.name} refuse_client_session true')
+        self.run_ceph_cmd(f'fs set {self.fs2.name} refuse_client_session true')
+    def _accept_clients_for_both_cephfss(self):
+        self.run_ceph_cmd(f'fs set {self.fs1.name} refuse_client_session '
+                           'false')
+        self.run_ceph_cmd(f'fs set {self.fs2.name} refuse_client_session '
+                           'false')
+    def test_swap_fsnames_but_not_fscids(self):
+        '''
+        Test that "ceph fs swap --swap-fscids=no" swaps the FS names but not
+        the FSCIDs.
+        '''
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        # log output to help debug test failures
+        self.run_ceph_cmd('fs', 'dump')
+        self.run_ceph_cmd(f'fs swap {self.fs1.name} {self.fs1.id} '
+                          f'{self.fs2.name} {self.fs2.id} --swap-fscids=no '
+                           '--yes_i_really_mean_it')
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # FS names were swapped but not FSIDs, so both can't be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), False)
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        self.run_rw_tests()
+    def test_swap_fsnames_and_fscids(self):
+        '''
+        Test that "ceph fs swap --swap-fscids=yes" swaps the FS names as well
+        as the FSCIDs.
+        '''
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.run_ceph_cmd(f'fs swap {self.fs1.name} {self.fs1.id} '
+                          f'{self.fs2.name} {self.fs2.id} --swap-fscids=yes '
+                           '--yes_i_really_mean_it')
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        # XXX: Let's ensure that FS mounted on a mountpoint is same before
+        # and after swapping of FS name and FSCIDs. This ensures that data
+        # available on mountpoints before and after the swap is same. This
+        # prevents self.run_rw_tests() from breaking.
+        #
+        # At the beginning of test, testcephfs1 has data (let's say) 'abc1'
+        # and testcephfs2 has data 'abc2'. self.fs1 is mapped to testcephfs1
+        # and self.fs2 mapped to testcephfs2. After swap, data of testcephfs1
+        # and testcephfs2 will be 'abc2' and 'abc1' respectively.
+        #
+        # However, running self.fs1.getinfo() after swap will map self.fs1 to
+        # FS with FSCID 1 i.e. testcephfs1 and not testcephfs2. Thus, data
+        # under self.fs1 will be different than data before swapping. This
+        # breaks self.run_rw_tests() because self.fs1 is always mounted on
+        # the mountpoint of self.mount_a.
+        # To prevent this, therefore, make sure that data on
+        # self.fs1/self.mount_a is same after and before the swap. To ensure
+        # this, swap FS that is represented by self.fs1. Instead of
+        # testcephfs1 it should be testcephfs2 because, after swap,
+        # testcephfs2 containts the data of testcephfs1. This will ensure that
+        # self.mount_rw_tests doesn't break.
+        #
+        # Same for self.fs2.
+        self.fs1.id, self.fs2.id = None, None
+        self.fs1.name, self.fs2.name = self.fs2.name, self.fs1.name
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # both FS name and FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        self.run_rw_tests()
+    def test_swap_without_confirmation_option(self):
+        '''
+        Test that "ceph fs swap --swap-fscids=yes" without the option
+        "--yes-i-really-mean-it" fails.
+        '''
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = ('This is a potentially disruptive operation, client\'s cephx '
+               'credentials may need to be reauthorized to access the file '
+               'systems and its pools. Add --yes-i-really-mean-it if you are '
+               'sure you wish to continue.')
+        self.negtest_ceph_cmd(f'fs swap {self.fs1.name} {self.fs1.id} '
+                              f'{self.fs2.name} {self.fs2.id} '
+                               '--swap-fscids=no',
+            retval=errno.EPERM, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    ###################################################
+    #
+    # Tests for "fs swap" when either FS name is false.
+    #
+    ###################################################
+    def test_swap_when_fs1_is_absent(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" fails
+        when there is no CephFS on cluster by the name "<fs1name>".
+        '''
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        absent_cephfs = 'random_fsname_654'
+        msg = (f"File system '{absent_cephfs}' doesn't exist on this Ceph "
+                "cluster")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {absent_cephfs} {self.fs1.id} {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.ENOENT, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_fs2_is_absent(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" fails
+        when there is no CephFS on cluster by the name "<fs2name>".
+        '''
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        absent_cephfs = 'random_fsname_654'
+        msg = (f"File system '{absent_cephfs}' doesn't exist on this Ceph "
+                "cluster")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs2.id} {absent_cephfs} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.ENOENT, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_both_fss_are_absent(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" fails
+        when there are no CephFSs on the cluster by the name "<fs1name>" and
+        "<fs2name>".
+        '''
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        absent_cephfs1 = 'random_fsname_65'
+        absent_cephfs2 = 'random_fsname_66'
+        msg = (f"Neither file system '{absent_cephfs1}' nor file system "
+               f"'{absent_cephfs2}' exists on this Ceph cluster")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {absent_cephfs1} 123 {absent_cephfs2} 1234 '
+                   '--swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.ENOENT, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    ###################################################
+    #
+    # Tests for "fs swap" when either FSCID is wrong.
+    #
+    ###################################################
+    def test_swap_when_fs1_id_is_wrong(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" fails
+        when "<fs1id>" is not the FSCID of the CephFS named "<fs1nmae>".
+        '''
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = (f"FSCID provided for '{self.fs1.name}' is incorrect.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} 123 {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EINVAL, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_fs2_id_is_wrong(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" fails
+        when "<fs2id>" is not the FSCID of the CephFS named "<fs2nmae>".
+        '''
+        self._bring_both_cephfss_down()
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = (f"FSCID provided for '{self.fs2.name}' is incorrect.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs1.id} {self.fs2.name} '
+                  f'123 --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EINVAL, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_both_fscids_are_wrong(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" fails
+        when "<fs1id>" and "<fs2id>", respectively, are not the FSCIDs of the
+        CephFSs named "<fs1name>" and "<fs2nmae>".
+        '''
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = ('FSCIDs provided for both the CephFSs is incorrect.')
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} 123 {self.fs2.name} 1234 '
+                  f'--swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EINVAL, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_user_swaps_fscids_in_cmd_args(self):
+        '''
+        Test that "ceph fs swap" fails and prints relevant error message when
+        FSCIDs are exchange while writing the command. That is user write the
+        command as -
+        "ceph fs swap <fs1name> <fs2id> <fs2name> <fs1id>"
+        instead of writing -
+        "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>"
+        '''
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = ('FSCIDs provided in command arguments are swapped; perhaps '
+               '`ceph fs swap` has been run before.')
+        proc = self.run_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs2.id} {self.fs2.name} '
+                  f'{self.fs1.id} --swap-fscids=no --yes_i_really_mean_it'),
+            stderr=StringIO())
+        self.assertIn(msg.lower(), proc.stderr.getvalue().lower())
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    #####################################################
+    #
+    # Tests for "fs swap" when mirroring is enabled on FS
+    #
+    #####################################################
+    def test_swap_when_mirroring_enabled_for_1st_FS(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" fails
+        when mirroring is enabled for the CephFS named "<fs1name>".
+        '''
+        self.run_ceph_cmd(f'fs mirror enable {self.fs1.name}')
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = (f"Mirroring is enabled on file system '{self.fs1.name}'. "
+                "Disable mirroring on the file system after ensuring it's "
+                "OK to do so, and then re-try swapping.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs1.id} {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EPERM, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.run_ceph_cmd(f'fs mirror disable {self.fs1.name}')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_mirroring_enabled_for_2nd_FS(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" fails
+        when mirroring is enabled for the CephFS named "<fs2name>".
+        '''
+        self.run_ceph_cmd(f'fs mirror enable {self.fs2.name}')
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = (f"Mirroring is enabled on file system '{self.fs2.name}'. "
+                "Disable mirroring on the file system after ensuring it's "
+                "OK to do so, and then re-try swapping.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs1.id} {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EPERM, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.run_ceph_cmd(f'fs mirror disable {self.fs2.name}')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_mirroring_enabled_for_both_FSs(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" fails
+        when mirroring is enabled for both the CephFSs.
+        '''
+        self.run_ceph_cmd(f'fs mirror enable {self.fs1.name}')
+        self.run_ceph_cmd(f'fs mirror enable {self.fs2.name}')
+        self._bring_both_cephfss_down()
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = (f"Mirroring is enabled on file systems '{self.fs1.name}' and "
+               f"'{self.fs2.name}'. Disable mirroring on both the file "
+                "systems after ensuring it's OK to do so, and then re-try "
+                "swapping.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs1.id} {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EPERM, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.run_ceph_cmd(f'fs mirror disable {self.fs1.name}')
+        self.run_ceph_cmd(f'fs mirror disable {self.fs2.name}')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    ##########################################################
+    #
+    # Tests for "fs swap" when either FS is not down/failed.
+    #
+    #########################################################
+    def test_swap_when_fs1_is_online(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" when
+        CephFS named "<fs1name>" is online (i.e. is not failed).
+        '''
+        self.run_ceph_cmd(f'fs fail {self.fs2.name}')
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = (f"CephFS '{self.fs1.name}' is not offline. Before swapping "
+                "CephFS names, both CephFSs should be marked as failed."
+                "See `ceph fs fail`.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs1.id} {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EPERM, errmsgs=msg)
+        self.run_ceph_cmd(f'fs set {self.fs2.name} joinable true')
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_fs2_is_not_down(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" when
+        CephFS named "<fs2name>" is online (i.e. is not failed).
+        '''
+        self.run_ceph_cmd(f'fs fail {self.fs1.name}')
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = (f"CephFS '{self.fs2.name}' is not offline. Before swapping "
+                "CephFS names, both CephFSs should be marked as failed. "
+                "See `ceph fs fail`.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs1.id} {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EPERM, errmsgs=msg)
+        self.run_ceph_cmd(f'fs set {self.fs1.name} joinable true')
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_both_FSs_are_not_down(self):
+        '''
+        Test that "ceph fs swap <fs1name> <fs1id> <fs2name> <fs2id>" when
+        both the CephFSs are online (i.e. is not failed).
+        '''
+        self._refuse_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = (f"CephFSs '{self.fs1.name}' and '{self.fs2.name}' are not "
+                "offline. Before swapping CephFS names, both CephFSs should "
+                "be marked as failed. See `ceph fs fail`.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs1.id} {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EPERM, errmsgs=msg)
+        self._accept_clients_for_both_cephfss()
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    ##################################################
+    #
+    # Tests for "fs swap" when either FS is offline.
+    #
+    ################################################
+    def test_swap_when_FS1_doesnt_refuse_clients(self):
+        '''
+        Test that the command "ceph fs swap" command fails when
+        "refuse_client_session" is not set for the first of the two of FSs .
+        '''
+        self._bring_both_cephfss_down()
+        self.run_ceph_cmd(f'fs set {self.fs2.name} refuse_client_session true')
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = (f"CephFS '{self.fs1.name}' doesn't refuse clients. Before "
+                "swapping CephFS names, flag 'refuse_client_session' must "
+                "be set. See `ceph fs set`.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs1.id} {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EPERM, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self.run_ceph_cmd(f'fs set {self.fs2.name} refuse_client_session '
+                           'false')
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_FS2_doesnt_refuse_clients(self):
+        '''
+        Test that the command "ceph fs swap" command fails when
+        "refuse_client_session" is not set for the second of the two of FSs .
+        '''
+        self._bring_both_cephfss_down()
+        self.run_ceph_cmd(f'fs set {self.fs1.name} refuse_client_session true')
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        msg = (f"CephFS '{self.fs2.name}' doesn't refuse clients. Before "
+                "swapping CephFS names, flag 'refuse_client_session' must "
+                "be set. See `ceph fs set`.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs1.id} {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EPERM, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self.run_ceph_cmd(f'fs set {self.fs1.name} refuse_client_session '
+                           'false')
+        sleep(5)
+        self.run_ceph_cmd('fs', 'dump')
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
+    def test_swap_when_both_FSs_do_not_refuse_clients(self):
+        '''
+        Test that the command "ceph fs swap" command fails when
+        "refuse_client_session" is not set for both the CephFSs.
+        '''
+        self.run_ceph_cmd('fs', 'dump')
+        self._bring_both_cephfss_down()
+        sleep(5)
+        msg = (f"CephFSs '{self.fs1.name}' and '{self.fs2.name}' do not "
+                "refuse clients. Before swapping CephFS names, flag "
+                "'refuse_client_session' must be set. See `ceph fs set`.")
+        self.negtest_ceph_cmd(
+            args=(f'fs swap {self.fs1.name} {self.fs1.id} {self.fs2.name} '
+                  f'{self.fs2.id} --swap-fscids=no --yes_i_really_mean_it'),
+            retval=errno.EPERM, errmsgs=msg)
+        self._bring_both_cephfss_up()
+        self.run_ceph_cmd('fs', 'dump')
+        sleep(5)
+        self.fs1.getinfo()
+        self.fs2.getinfo()
+        self._reauthorize_client()
+        self._remount_both_cephfss()
+        # check that content of both CephFSs is unaffected by this failure.
+        self.run_rw_tests()
+        self._check_fs_name_on_fs_pools((self.fs1, self.fs2))
+        # neither FS name nor FSIDs were swapped, so both must be together
+        self.assertEqual(self._are_fsnames_and_fscids_together(), True)
 class TestDump(CephFSTestCase):
diff --git a/src/mds/FSMap.cc b/src/mds/FSMap.cc
index 55b87a43b12..a862b0236e0 100644
--- a/src/mds/FSMap.cc
+++ b/src/mds/FSMap.cc
@@ -1219,3 +1219,20 @@ void FSMap::erase_filesystem(fs_cluster_id_t fscid)
+void FSMap::swap_fscids(fs_cluster_id_t fscid1, fs_cluster_id_t fscid2)
+  auto fs1 = std::move(filesystems.at(fscid1));
+  filesystems[fscid1] = std::move(filesystems.at(fscid2));
+  filesystems[fscid2] = std::move(fs1);
+  auto set_fs1_fscid = [fscid1](auto&& fs) {
+    fs->set_fscid(fscid1);
+  };
+  modify_filesystem(fscid1, std::move(set_fs1_fscid));
+  auto set_fs2_fscid = [fscid2](auto&& fs) {
+    fs->set_fscid(fscid2);
+  };
+  modify_filesystem(fscid2, std::move(set_fs2_fscid));
diff --git a/src/mds/FSMap.h b/src/mds/FSMap.h
index f57a4177aa0..85d53ef8336 100644
--- a/src/mds/FSMap.h
+++ b/src/mds/FSMap.h
@@ -236,9 +236,41 @@ public:
     return false;
+  const auto& get_mirror_info() const
+  {
+    return mirror_info;
+  }
+  auto& get_mirror_info()
+  {
+    return mirror_info;
+  }
+  const auto& get_mds_map() const
+  {
+    return mds_map;
+  }
+  auto& get_mds_map()
+  {
+    return mds_map;
+  }
+  auto get_fscid() const
+  {
+    return fscid;
+  }
   fs_cluster_id_t fscid = FS_CLUSTER_ID_NONE;
   MDSMap mds_map;
   MirrorInfo mirror_info;
+  friend class FSMap;
+  void set_fscid(fs_cluster_id_t new_fscid) {
+    fscid = new_fscid;
+  }
@@ -455,6 +487,11 @@ public:
     fs->mds_map.epoch = epoch;
+  /* This is method is written for the option of "ceph fs swap" commmand
+   * that intiates swap of FSCIDs.
+   */
+  void swap_fscids(fs_cluster_id_t fscid1, fs_cluster_id_t fscid2);
    * Apply a mutation to the mds_info_t structure for a particular
    * daemon (identified by GID), and make appropriate updates to epochs.
diff --git a/src/mon/FSCommands.cc b/src/mon/FSCommands.cc
index 05d28cc4c1d..4e19d4a4658 100644
--- a/src/mon/FSCommands.cc
+++ b/src/mon/FSCommands.cc
@@ -1292,6 +1292,207 @@ private:
   Paxos *m_paxos;
+class SwapFilesystemHandler : public FileSystemCommandHandler
+  public:
+  explicit SwapFilesystemHandler(Paxos *paxos)
+    : FileSystemCommandHandler("fs swap"), m_paxos(paxos)
+  {
+  }
+  int handle(Monitor *mon, FSMap& fsmap, MonOpRequestRef op,
+	     const cmdmap_t& cmdmap, std::ostream &ss) override
+  {
+    ceph_assert(m_paxos->is_plugged());
+    // Check for confirmation flag
+    bool confirmation_flag = false;
+    cmd_getval(cmdmap, "yes_i_really_mean_it", confirmation_flag);
+    if (!confirmation_flag) {
+      ss << "This is a potentially disruptive operation, client\'s cephx "
+	"credentials may need to be reauthorized to access the file systems "
+	"and its pools. Add --yes-i-really-mean-it if you are sure you wish "
+	"to continue.";
+      return -EPERM;
+    }
+    string fs1_name, fs2_name;
+    int64_t fs1_id = FS_CLUSTER_ID_NONE;
+    int64_t fs2_id = FS_CLUSTER_ID_NONE;
+    string swap_fscids_flag;
+    cmd_getval(cmdmap, "fs1_name", fs1_name);
+    cmd_getval(cmdmap, "fs2_name", fs2_name);
+    cmd_getval(cmdmap, "fs1_id", fs1_id);
+    cmd_getval(cmdmap, "fs2_id", fs2_id);
+    cmd_getval(cmdmap, "swap_fscids", swap_fscids_flag);
+    auto fs1p = fsmap.get_filesystem(fs1_name);
+    auto fs2p = fsmap.get_filesystem(fs2_name);
+    // Check that CephFSs exists for both given names.
+    if (fs1p == nullptr || fs2p == nullptr) {
+      if (fs1p == nullptr && fs2p != nullptr) {
+	ss << "File system '" << fs1_name << "' doesn\'t exist on this "
+	      "Ceph cluster.";
+	return -ENOENT;
+      } else if (fs1p != nullptr && fs2p == nullptr) {
+	ss << "File system '" << fs2_name << "' doesn\'t exist on this "
+	      "Ceph cluster.";
+	return -ENOENT;
+      } else {
+	ss << "Neither file system '" << fs1_name << "' nor file "
+	      "system '" << fs2_name << "' exists on this Ceph cluster.";
+	return -ENOENT;
+      }
+    }
+    // Check that FSCID provided for both CephFSs is correct.
+    if (fs1_id != fs1p->get_fscid() || fs2_id != fs2p->get_fscid()) {
+      if (fs1_id != fs1p->get_fscid() && fs2_id == fs2p->get_fscid()) {
+	ss << "FSCID provided for '" << fs1_name << "' is incorrect.";
+	return -EINVAL;
+      } else if (fs1_id == fs1p->get_fscid() && fs2_id != fs2p->get_fscid()) {
+	ss << "FSCID provided for '" << fs2_name << "' is incorrect.";
+	return -EINVAL;
+      } else if (fs1_id != fs1p->get_fscid() && fs2_id != fs2p->get_fscid()) {
+	if (fs1_id == fs2p->get_fscid() && fs2_id == fs1p->get_fscid()) {
+	  ss << "FSCIDs provided in command arguments are swapped; perhaps "
+	     << "`ceph fs swap` has been run before.";
+	  return 0;
+	} else {
+	ss << "FSCIDs provided for both the CephFSs is incorrect.";
+	return -EINVAL;
+	}
+      }
+    }
+    // Check that CephFS mirroring for both CephFSs is disabled.
+    if (fs1p->get_mirror_info().mirrored || fs2p->get_mirror_info().mirrored) {
+      if (fs1p->get_mirror_info().mirrored &&
+	  !fs2p->get_mirror_info().mirrored) {
+	ss << "Mirroring is enabled on file system '"<< fs1_name << "'. "
+	   << "Disable mirroring on the file system after ensuring it's OK "
+	   << "to do so, and then re-try swapping.";
+	return -EPERM;
+      } else if (!fs1p->get_mirror_info().mirrored &&
+		 fs2p->get_mirror_info().mirrored) {
+	ss << "Mirroring is enabled on file system '"<< fs2_name << "'. "
+	   << "Disable mirroring on the file system after ensuring it's OK "
+	   << "to do so, and then re-try swapping.";
+	return -EPERM;
+      } else {
+	ss << "Mirroring is enabled on file systems '" << fs1_name << "' "
+	   << "and '" << fs2_name << "'. Disable mirroring on both the "
+	   << "file systems after ensuring it's OK to do so, and then re-try "
+	   << "swapping.";
+	return -EPERM;
+      }
+    }
+    if (!mon->osdmon()->is_writeable()) {
+      // not allowed to write yet, so retry when we can
+      mon->osdmon()->wait_for_writeable(
+	op, new PaxosService::C_RetryMessage(mon->mdsmon(), op));
+      return -EAGAIN;
+    }
+    // Check that both CephFS have been marked as down, IOW has no MDS
+    // associated with it.
+    if (fs1p->get_mds_map().get_num_up_mds() > 0 ||
+        fs2p->get_mds_map().get_num_up_mds() > 0) {
+      if (fs1p->get_mds_map().get_num_up_mds() > 0 &&
+          fs2p->get_mds_map().get_num_up_mds() == 0) {
+	ss << "CephFS '" << fs1_name << "' is not offline. Before swapping "
+	   << "CephFS names, both CephFSs should be marked as failed. See "
+	   << "`ceph fs fail`.";
+	return -EPERM;
+      } else if (fs1p->get_mds_map().get_num_up_mds() == 0 &&
+		 fs2p->get_mds_map().get_num_up_mds() > 0) {
+	ss << "CephFS '" << fs2_name << "' is not offline. Before swapping "
+	   << "CephFS names, both CephFSs should be marked as failed. See "
+	   << "`ceph fs fail`.";
+	return -EPERM;
+      } else {
+	ss << "CephFSs '" << fs1_name << "' and '" << fs2_name << "' "
+	   << "are not offline. Before swapping CephFS names, both CephFSs "
+	   << "should be marked as failed. See `ceph fs fail`.";
+	return -EPERM;
+      }
+    }
+    // Check that refuse_client_session is set.
+    if (!fs1p->get_mds_map().test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION) ||
+	!fs2p->get_mds_map().test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+      if (!fs1p->get_mds_map().test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION) &&
+          fs2p->get_mds_map().test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+	ss << "CephFS '" << fs1_name << "' doesn't refuse clients. Before "
+	   << "swapping CephFS names, flag 'refuse_client_session' must be "
+	    << "set. See `ceph fs set`.";
+	return -EPERM;
+      } else if (
+          fs1p->get_mds_map().test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION) &&
+	  !fs2p->get_mds_map().test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+	ss << "CephFS '" << fs2_name << "' doesn't refuse clients. Before "
+	   << "swapping CephFS names, flag 'refuse_client_session' must be "
+	    << "set. See `ceph fs set`.";
+	return -EPERM;
+      } else if (
+          !fs1p->get_mds_map().test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION) &&
+	  !fs2p->get_mds_map().test_flag(CEPH_MDSMAP_REFUSE_CLIENT_SESSION)) {
+	ss << "CephFSs '" << fs1_name << "' and '" << fs2_name << "' do not "
+	   << "refuse clients. Before swapping CephFS names, flag "
+	   << "'refuse_client_session' must be set. See `ceph fs set`.";
+	return -EPERM;
+      }
+    }
+    // Finally, the swap begins.
+    // Swap CephFS names on OSD pool application tag
+    for (const auto p : fs1p->get_mds_map().get_data_pools()) {
+      mon->osdmon()->do_application_enable(p,
+					   pg_pool_t::APPLICATION_NAME_CEPHFS,
+					   "data", fs2_name, true);
+    }
+    mon->osdmon()->do_application_enable(fs1p->get_mds_map().get_metadata_pool(),
+					 pg_pool_t::APPLICATION_NAME_CEPHFS,
+					 "metadata", fs2_name, true);
+    for (const auto p : fs2p->get_mds_map().get_data_pools()) {
+      mon->osdmon()->do_application_enable(p,
+					   pg_pool_t::APPLICATION_NAME_CEPHFS,
+					   "data", fs1_name, true);
+    }
+    mon->osdmon()->do_application_enable(fs2p->get_mds_map().get_metadata_pool(),
+					 pg_pool_t::APPLICATION_NAME_CEPHFS,
+					 "metadata", fs1_name, true);
+    mon->osdmon()->propose_pending();
+    // Now swap CephFS names and, optionally, FSCIDs.
+    auto renamefunc1 = [fs2_name](auto&& fs) {
+      fs->get_mds_map().set_fs_name(fs2_name);
+    };
+    fsmap.modify_filesystem(fs1_id, std::move(renamefunc1));
+    auto renamefunc2 = [fs1_name](auto&& fs) {
+      fs->get_mds_map().set_fs_name(fs1_name);
+    };
+    fsmap.modify_filesystem(fs2_id, std::move(renamefunc2));
+    if (swap_fscids_flag == "yes") {
+      fsmap.swap_fscids(fs1_id, fs2_id);
+    }
+    ss << "File system names ";
+    if (swap_fscids_flag == "yes") {
+      ss << "and FSCIDs ";
+    }
+    ss << " have been swapped; cephx credentials may need an upgrade.";
+    return 0;
+  }
+  Paxos *m_paxos;
 class RemoveDataPoolHandler : public FileSystemCommandHandler
@@ -1607,6 +1808,7 @@ FileSystemCommandHandler::load(Paxos *paxos)
+  handlers.push_back(std::make_shared<SwapFilesystemHandler>(paxos));
   handlers.push_back(std::make_shared<AliasHandler<SetDefaultHandler> >(
@@ -1745,7 +1947,8 @@ int FileSystemCommandHandler::is_op_allowed(
     if (fs == nullptr) {
       auto prefix = get_prefix();
       /* let "fs rm" and "fs rename" handle idempotent cases where file systems do not exist */
-      if (!(prefix == "fs rm" || prefix == "fs rename") && fsmap.get_filesystem(fs_name) == nullptr) {
+      if (!(prefix == "fs rm" || prefix == "fs rename" || prefix == "fs swap") &&
+	  fsmap.get_filesystem(fs_name) == nullptr) {
         ss << "Filesystem not found: '" << fs_name << "'";
         return -ENOENT;
diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h
index 58e30f64ac3..cb412c1494c 100644
--- a/src/mon/MonCommands.h
+++ b/src/mon/MonCommands.h
@@ -459,6 +459,14 @@ COMMAND("fs rename "
 	"name=new_fs_name,type=CephString,goodchars=" FS_NAME_GOODCHARS
 	" name=yes_i_really_mean_it,type=CephBool,req=false",
 	"rename a ceph file system", "mds", "rw")
+COMMAND("fs swap "
+	"name=fs1_name,type=CephString "
+	"name=fs1_id,type=CephInt,range=0 "
+	"name=fs2_name,type=CephString "
+	"name=fs2_id,type=CephInt,range=0 "
+	"name=swap_fscids,type=CephChoices,strings=yes|no,req=true "
+	"name=yes_i_really_mean_it,type=CephBool,req=false",
+	"swap ceph file system names", "mds", "rw")
  * Monmap commands