From c8ac160ef0c8649c43656bb8e7b5aac7a8edf34c Mon Sep 17 00:00:00 2001 From: buhtz Date: Sat, 22 Oct 2022 16:41:12 +0200 Subject: [PATCH] Test backing up permissions in a "SSH local" profile. Introducing class `test_snapshots.py::TestSshPermissions` testing `snapshots.py::Snapshot.backupPermissions()` in a "SSH local" environment. The case was untested before. Especially it is needed to support the fix of #1247 (rsync incompatibility). I slightly improved the method `Snapshots.backupPermissions()` because before it was silent and didn't care about if the used `rsync` call succeeded or not. No the return code of `rsync` is returned and can be used in a unittest. The "helper" functions in `test_snapshots.py` are refactored and improved a bit. Now they are used by this new `TestSshPermissions` and the before introduced `TestSshRemoveSnapshot` tests. --- common/mount.py | 13 +- common/snapshots.py | 29 ++- common/test/generic.py | 3 + common/test/test_snapshots.py | 407 +++++++++++++++++++++++++++------- 4 files changed, 358 insertions(+), 94 deletions(-) diff --git a/common/mount.py b/common/mount.py index 1fd0603f6..eeeac76f8 100644 --- a/common/mount.py +++ b/common/mount.py @@ -883,22 +883,29 @@ def setSymlink(self, profile_id = None, hash_id = None, tmp_mount = None): """ if not self.symlink: return + if profile_id is None: profile_id = self.profile_id + if hash_id is None: hash_id = self.hash_id + if tmp_mount is None: tmp_mount = self.tmp_mount - dst = self.config.snapshotsPath(profile_id = profile_id, - mode = self.mode, - tmp_mount = tmp_mount) + + dst = self.config.snapshotsPath(profile_id=profile_id, + mode=self.mode, + tmp_mount=tmp_mount) mountpoint = self.mountpoint(hash_id) + if self.symlink_subfolder is None: src = mountpoint else: src = os.path.join(mountpoint, self.symlink_subfolder) + if os.path.exists(dst): os.remove(dst) + os.symlink(src, dst) def removeSymlink(self, profile_id = None, tmp_mount = None): diff --git a/common/snapshots.py b/common/snapshots.py index 1ca885bf6..f56137a43 100644 --- a/common/snapshots.py +++ b/common/snapshots.py @@ -937,11 +937,15 @@ def backupPermissions(self, sid): Args: sid (SID): snapshot that should be scanned + + Returns: + int: Return code of rsync. """ logger.info('Save permissions', self) self.setTakeSnapshotMessage(0, _('Saving permissions...')) fileInfoDict = FileInfoDict() + if self.config.snapshotsMode() == 'ssh_encfs': decode = encfstools.Decode(self.config, False) else: @@ -953,19 +957,30 @@ def backupPermissions(self, sid): rsync = ['rsync', '--dry-run', '-r', '--out-format=%n'] rsync.extend(tools.rsyncSshArgs(self.config)) - rsync.append(self.rsyncRemotePath(sid.pathBackup(use_mode = ['ssh', 'ssh_encfs'])) + os.sep) + rsync.append( + self.rsyncRemotePath( + sid.pathBackup( + use_mode=['ssh', 'ssh_encfs'] + ) + ) + os.sep + ) + with TemporaryDirectory() as d: + rsync.append(d + os.sep) + proc = tools.Execute(rsync, - callback = self.backupPermissionsCallback, - user_data = (fileInfoDict, decode), - parent = self, - conv_str = False, - join_stderr = False) - proc.run() + callback=self.backupPermissionsCallback, + user_data=(fileInfoDict, decode), + parent=self, + conv_str=False, + join_stderr=False) + rc = proc.run() sid.fileInfo = fileInfoDict + return rc + def backupPermissionsCallback(self, line, user_data): """ Rsync callback for :py:func:`Snapshots.backupPermissions`. diff --git a/common/test/generic.py b/common/test/generic.py index 2ef7d0953..c4bbc6d81 100644 --- a/common/test/generic.py +++ b/common/test/generic.py @@ -64,6 +64,8 @@ except ConnectionRefusedError: sshdPortAvailable = False +SKIP_SSH_TEST_MESSAGE = 'Skip as this test requires a local ssh server, ' \ + 'public and private keys installed' LOCAL_SSH = all((tools.processExists('sshd'), os.path.isfile(PRIV_KEY_FILE), KEY_IN_AUTH, @@ -73,6 +75,7 @@ ON_RTD = os.environ.get('READTHEDOCS', 'None').lower() == 'true' + class TestCase(unittest.TestCase): """Base class for Back In Time unit- and integration testing. diff --git a/common/test/test_snapshots.py b/common/test/test_snapshots.py index ec5514744..b7048c0f8 100644 --- a/common/test/test_snapshots.py +++ b/common/test/test_snapshots.py @@ -166,10 +166,13 @@ def test_groupName_invalid(self): def test_rsyncRemotePath(self): self.assertEqual(self.sn.rsyncRemotePath('/foo'), '/foo') + # "quote" is ignored because the "mode" isn't ssh or ssh_encfs self.assertEqual(self.sn.rsyncRemotePath('/foo', quote = '\\\"'), '/foo') self.assertEqual(self.sn.rsyncRemotePath('/foo', use_mode = ['local']), '/foo') + + # The same as above. self.assertEqual(self.sn.rsyncRemotePath('/foo', use_mode = ['local'], quote = '\\\"'), '/foo') @@ -547,6 +550,7 @@ def test_backupPermissions(self): include = self.cfg.include()[0][0] with TemporaryDirectory(dir = include) as tmp: + file_path = os.path.join(tmp, 'foo') with open(file_path, 'wt') as f: f.write('bar') @@ -743,9 +747,9 @@ def test_delete_pardir_readonly(self): class TestRemoveSnapshot(generic.SnapshotsWithSidTestCase): """Integration test about removing a snapshot. """ - # TODO: add test with SSH def test_remove(self): + self.assertTrue(self.sid.exists()) self.sn.remove(self.sid) self.assertFalse(self.sid.exists()) @@ -760,7 +764,7 @@ def test_remove_read_only(self): self.assertFalse(self.sid.exists()) -@unittest.skipIf(not generic.LOCAL_SSH, 'Skip as this test requires a local ssh server, public and private keys installed') +@unittest.skipIf(not generic.LOCAL_SSH, generic.SKIP_SSH_TEST_MESSAGE) class TestSshSnapshots(generic.SSHTestCase): def setUp(self): super(TestSshSnapshots, self).setUp() @@ -771,98 +775,336 @@ def test_statFreeSpaceSsh(self): self.assertIsInstance(self.sn.statFreeSpaceSsh(), int) -@unittest.skipIf(not generic.LOCAL_SSH, 'Skip as this test requires a local ssh server, public and private keys installed') -class TestSshRemoveSnapshots(unittest.TestCase): +def _rand_string(self, max_length=10, min_length=1): + """Create a string with random uppercase characters and digits and + a random length between `min_length` and `max_length`. + + Args: + max_length (int): Max length of the string (default: 10). + min_length (int): Min string length (default: 1) + + Returns: + (string): The created random string. """ + return ''.join(random.choices( + string.ascii_uppercase+string.digits, + k=random.randint(min_length, max_length) + )) + + +def _create_selfdestructing_path(test_case, path): + """Create a path that removes itself after the test. + + Args: + test_case (unittest.TestCase): Test instance used to call + `addCleanup()`. + path (str, pathlib.Path): Path or its name to create. + + Returns: + (pathlib.Path): The created path object. """ + p = pathlib.Path(path) + p.mkdir() - def setUp(self): - self._init_logging_and_config() + test_case.addCleanup(lambda: shutil.rmtree(p)) - def _init_logging_and_config(self): - # Initialize logging - logger.APP_NAME = 'BIT_unittest' - logger.openlog() - logger.DEBUG = '-v' in sys.argv + return p - # Path to config file (in "common/test/config") - self.cfgFile = pathlib.Path(__file__).parent / 'config' - self.cfgFile = str(self.cfgFile) # WORKAROUND - # config instance - self.sharePath = TemporaryDirectory() - self.cfg = config.Config(self.cfgFile, self.sharePath.name) +def _init_basic_config(data_dir_prefix='DATADIR', data_dir_suffix=''): + """Prepare configuration for a test. - # mock notifyplugin to suppress notifications - patcher = patch('notifyplugin.NotifyPlugin.message') - self.mockNotifyPlugin = patcher.start() + Args: + data_dir_prefix (str): Prefix of the temporary data directory. + data_dir_suffix (str): Suffix of the temporary data directory. - self.cfg.PLUGIN_MANAGER.load() + Returns: + config.Config: The configuriation object. - def _init_ssh_snapshot_profile(self, - path_prefix='BIT', - path_suffix=''): - # configure a SSH snapshot profile - self.cfg.setSnapshotsMode('ssh') - self.cfg.setSshHost('localhost') - self.cfg.setSshPrivateKeyFile(generic.PRIV_KEY_FILE) + It is a helper function to setup an environment for integration and unit + tests. - # use a TemporaryDirectory for remote snapshot path - # e.g. /tmp/tmp_mzi0qqo/foo - self.tmpDir = TemporaryDirectory(prefix=path_prefix, - suffix=path_suffix) - self.remotePath = os.path.join(self.tmpDir.name, 'foo') + What it does: - # set remote snapshot path to config - self.cfg.setSshSnapshotsPath(self.remotePath) + - Prepare the logging. + - Read the config (for tests) and instanticate a + :py:class:`config.Config` object. + - Creates a (temporary) "data" directory as specified by + `XDG_DATA_HOME`. + - Loading the plugin manager. - # Create a snapshot profile paths - # e.g. /tmp/tmp_mzi0qqo/foo/backintime/test-host/test-user/1 - self.snapshotPath = self.cfg.sshSnapshotsFullPath() - os.makedirs(self.snapshotPath) + The "data" directory is a ``TemporaryDirectory()`` instance. It is alive + as long as the returned configuraton objects is alive because it is + attached to it as a member. + """ - # use a tmp-file for flock because test_flockExclusive would deadlock - # otherwise if a regular snapshot is running in background - snapshots.Snapshots.GLOBAL_FLOCK = generic.TMP_FLOCK.name + # Initialize logging + logger.APP_NAME = 'BIT_unittest' + logger.openlog() + logger.DEBUG = '-v' in sys.argv - # The snapshot instance - self.sn = snapshots.Snapshots(self.cfg) + # Path to config file (in "common/test/config") + config_path = pathlib.Path(__file__).parent / 'config' - def _init_concrete_snapshot(self, sid_name='20151219-010324-123'): - # +++ Create a concrete snapshot (SID) - self.sid = snapshots.SID(sid_name, self.cfg) + # data path + # e.g. /tmp/l9pxsfh2 + # Used for + # e.g. /tmp/lpxsfh2/.local/share/backintime/mnt/83B5CA1B/mountpoin + data_dir = TemporaryDirectory(prefix=data_dir_prefix, + suffix=data_dir_suffix) - # e.g. /tmp/tmpq8cbewug/foo/backintime/test-host/test-user/1/ - # 20151219-010324-123/ - # backup - self.remoteSIDBackupPath \ - = pathlib.Path(self.snapshotPath) / self.sid.sid / 'backup' + # BUHTZ 2022-10-19 Because of unusual importing of "config.Config" we + # can not use the variable name "config". + # Fix this in the future when migrated to source layout. + # The "Config" class should be imported explicte or should be named + # with its full package path. e.g. "backintime.config.Config()". - self.remoteSIDBackupPath.mkdir(parents=True) + # config instance + cfg = config.Config( + config_path=str(config_path), + data_path=data_dir.name + ) - # e.g. /tmp/tmp5eo0y_fh/foo/backintime/test-host/ - # test-user/1/20151219-010324-123/backup - sid_path = pathlib.Path(self.remoteSIDBackupPath) - sid_path = sid_path / 'tmp' / self._rand_string(10) - sid_path = str(sid_path) + # keep the (temporary) data dir alive + cfg._tmp_data_dir = data_dir - generic.create_test_files(sid_path) + # ? + cfg.PLUGIN_MANAGER.load() - def _rand_string(self, max_length=10, min_length=1): - """Create a string with random uppercase characters and digits and - a random length between `min_length` and `max_length`. + return cfg - Args: - max_length (int): Max length of the string (default: 10). - min_length (int): Min string length (default: 1) - Returns: - (string): The created random string. - """ - return ''.join(random.choices( - string.ascii_uppercase+string.digits, - k=random.randint(min_length, max_length) - )) +def _init_source_path(cfg, + source_dir_prefix='SOURCEDIR', + source_dir_suffix=''): + """Prepare the (backup) source directory but keep it empty. + + Args: + cfg (config.Config): The configuration instance. + source_dir_prefix (str): Prefix of the (temporary) source directory. + source_dir_suffix (str): Suffix of the (temporary) source directory. + + It is a helper function to setup an environment for integration and unit + tests. The term "source directory" means the directory that is to be + backed up. + + What it does: + - Create a (temporary) directory as backup source. + - Add it as an include folder to the profile configuration. + """ + source_dir = TemporaryDirectory(prefix=source_dir_prefix, + suffix=source_dir_suffix) + + # set it as include folder + cfg.setInclude([(source_dir.name, 0)]) + + # keep the (temporary) dir alive + cfg._tmp_source_dir = source_dir + + +def _init_ssh_profile(cfg, + destination_dir_prefix='DESTINATIONDIRparent', + destination_dir_suffix=''): + """Setup a "SSH local" snapshots profile and it's snapshots folder. + + Args: + cfg (config.Config): The configuration instance. + destination_dir_prefix (str): Prefix of the (temporary) destination + directory. + destination_dir_suffix (str): Suffix of the (temporary) destination + directory. + + Returns: + snapshots.Snapshots: The instance representing the "SSH local" profile. + + It is a helper function to setup an environment for integration and unit + tests. The term "snapshots folder" is synonmy with the "destination + directory" means the directory where the backed up files are stored. + + What it does: + - Modify the configuration to a "SSH local" profile. + - Create a (temporary) directory as the parent(!) of the snapshots + folder. + - Create the snapshots folder (aka "backup destination") with name + `foo` in it. + - Return a :py:class:`snapshots.Snapshots` instance. + + """ + # configure a SSH snapshot profile + cfg.setSnapshotsMode('ssh') + cfg.setSshHost('localhost') + cfg.setSshPrivateKeyFile(generic.PRIV_KEY_FILE) + + # use a TemporaryDirectory for remote snapshot path + # e.g. /tmp/tmp_mzi0qqo/foo + remote_dir_parent = TemporaryDirectory(prefix=destination_dir_prefix, + suffix=destination_dir_suffix) + remote_path = pathlib.Path(remote_dir_parent.name) / 'foo' + + # keep dir alive + cfg._tmp_remote_dir_parent = remote_dir_parent + + # set remote snapshot path to config + cfg.setSshSnapshotsPath(str(remote_path)) + + # Create a full snapshot profile paths + # e.g. /tmp/tmp_mzi0qqo/foo/backintime/test-host/test-user/1 + snapshots_path = pathlib.Path(cfg.sshSnapshotsFullPath()) + + snapshots_path.mkdir(parents=True) + + # use a tmp-file for flock because test_flockExclusive would deadlock + # otherwise if a regular snapshot is running in background + snapshots.Snapshots.GLOBAL_FLOCK = generic.TMP_FLOCK.name + + # The snapshot instance + return snapshots.Snapshots(cfg) + + +def _init_concrete_snapshot(cfg, sid_name='20151219-010324-123'): + """Create a SID instance which is a snapshot on a specific timepoint. + + Args: + cfg (config.Config): The configuration instance. + sid_name (str): The name of the snapshot. + + Returns: + snapshots.SID: The SID instance. + + It is a helper function to setup an environment or integration and unit + tests. The term "SID" means a snapshot that was taken on a specific point + in time. It could be described as the resulting files of one run of a + backup job. + + What that function does: + - Create the SID instance. + - Create the the full directory tree for that SID. + - Create files and folders (via + :py:func:`generic.create_test_files()`) in it. + """ + # +++ Create a concrete snapshot (SID) + sid = snapshots.SID(sid_name, cfg) + + # e.g. /tmp/DESTINATIONDIRparent89h5l0f9/foo/backintime/ \ + # test-host/test-user/1/20151219-010324-123/backup + sid_path = pathlib.Path(cfg.sshSnapshotsFullPath()) / sid.sid / 'backup' + + # The source path should be reflected in the destination snapshot. + # It means the folder structure (including the parent folders) of the + # backup source are mirroed into the backup destination. + # e.g. /tmp/tmpq8cbewug/foo/backintime/test-host/test-user/1/ + # 20151219-010324-123/backup/tmp/xyz + sid_path = sid_path / cfg.include()[0][0][1:] + sid_path.mkdir(parents=True) + + generic.create_test_files(str(sid_path)) + + return sid + + +def _init_mounting(cfg): + """Handle the mounting for integration and unittesting. + + Args: + cfg (config.Config): The configuration instance. + + Returns: + mount.Mount: The mount object. + + Development note (BUHTZ 2022-10-22): I didn't understand all details here. + But it seems to be neccessary. + + Unmounting is not done automatically! It is recommended to use + ``unittest.TestCase.addCleanup()`` for that:: + + class MyTest(unittest.TestCase): + def test_mytest(self): + # cfg = ... + mount_obj = _init_mounting(cfg) + self.addCleanup(lambda: mount_obj.umount(cfg.current_hash_id)) + """ + # mount + mount_obj = mount.Mount(cfg=cfg) + + hash_id = mount_obj.mount() # e.g. FA3E732E + + # ? + cfg.setCurrentHashId(hash_id) + + return mount_obj + + +@unittest.skipIf(not generic.LOCAL_SSH, generic.SKIP_SSH_TEST_MESSAGE) +class TestSshPermissions(unittest.TestCase): + """Testing to backup the file permissions in a "SSH local" + snapshot profile. + """ + + def test_backupPermissions(self): + """Backup file permissions in SSH backup mode.""" + + # --- prepare environment --- + + # config instance + cfg = _init_basic_config() + # snapshots profile + snapshot = _init_ssh_profile(cfg) + # backup source directory + _init_source_path(cfg) + # simulate a taken snapshot + sid = _init_concrete_snapshot(cfg) + + # BUHTZ 2022-10-21: The mounting is needed but I didn't understand + # all details yet. + # mount + mount_obj = _init_mounting(cfg) + # ...unmount when test finished + self.addCleanup(lambda: mount_obj.umount(cfg.current_hash_id)) + + # --- prepare the backup source --- + + # Does the concrete snapshot exists? + self.assertTrue(sid.exists()) + + # The backup source path + # e.g. /tmp/e2uij3y + source_path = pathlib.Path(cfg.include()[0][0]) + # ...exists? + self.assertTrue(source_path.exists()) + + # create the test files in the backup source directory + generic.create_test_files(str(source_path)) + + # --- Do the job to test. --- + + # backup permissions of files/folders in the backup source + rc = snapshot.backupPermissions(sid) + + self.assertEqual(rc, 0, 'rsync return code') + + # --- Validate the job. --- + + # resulted permissions + fileInfo = sid.fileInfo + + # source path in the fileInfo present? + self.assertIn(str(source_path).encode(), fileInfo) + + # expected fiel where the permissions are stored in + # e.g. /tmp/BITa6ekd80lTEST/foo/backintime/test-host/test-user/1 + infoFilePath = pathlib.Path(cfg.snapshotsFullPath()) + # ...'/20151219-010324-123/fileinfo.bz2' + infoFilePath = infoFilePath / str(sid.sid) / 'fileinfo.bz2' + + # Does it exists as a file? + self.assertTrue(infoFilePath.exists()) + self.assertTrue(infoFilePath.is_file()) + + +@unittest.skipIf(not generic.LOCAL_SSH, generic.SKIP_SSH_TEST_MESSAGE) +class TestSshRemoveSnapshots(unittest.TestCase): + """Testing to remove snapshots(SID) in a "SSH local" snapshot profile. + """ def test_remove(self): """Remove concrete snapshot. @@ -879,23 +1121,20 @@ def test_remove_with_blank(self): def _generic_test_remove(self, path_suffix): """ """ - # init - self._init_ssh_snapshot_profile(path_suffix=path_suffix) - self._init_concrete_snapshot() + cfg = _init_basic_config() + snapshot = _init_ssh_profile(cfg) + sid = _init_concrete_snapshot(cfg) # mount - mount_obj = mount.Mount(cfg=self.cfg) - hash_id = mount_obj.mount() - self.cfg.setCurrentHashId(hash_id) + mount_obj = _init_mounting(cfg) + # ...unmount when test finished + self.addCleanup(lambda: mount_obj.umount(cfg.current_hash_id)) # Does the snapshot exists? - self.assertTrue(self.sid.exists()) + self.assertTrue(sid.exists()) # Remove it - self.assertTrue(self.sn.remove(self.sid)) + self.assertTrue(snapshot.remove(sid)) # Shouldn't exist anymore. - self.assertFalse(self.sid.exists()) - - # - mount_obj.umount(self.cfg.current_hash_id) + self.assertFalse(sid.exists())