| # Copyright 2015 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import os |
| import shutil |
| import stat |
| import sys |
| import tempfile |
| import unittest |
| import uuid |
| import zipfile |
| |
| import mock |
| |
| from dependency_manager import dependency_manager_util |
| from dependency_manager import exceptions |
| |
| |
| class DependencyManagerUtilTest(unittest.TestCase): |
| # This class intentionally uses actual file I/O to test real system behavior. |
| |
| def setUp(self): |
| self.tmp_dir = os.path.abspath(tempfile.mkdtemp(prefix='telemetry')) |
| self.sub_dir = os.path.join(self.tmp_dir, 'sub_dir') |
| os.mkdir(self.sub_dir) |
| |
| self.read_only_path = (os.path.join(self.tmp_dir, 'read_only')) |
| with open(self.read_only_path, 'w+') as read_file: |
| read_file.write('Read-only file') |
| os.chmod(self.read_only_path, stat.S_IRUSR) |
| |
| self.writable_path = (os.path.join(self.tmp_dir, 'writable')) |
| with open(self.writable_path, 'w+') as writable_file: |
| writable_file.write('Writable file') |
| os.chmod(self.writable_path, stat.S_IRUSR | stat.S_IWUSR) |
| |
| self.executable_path = (os.path.join(self.tmp_dir, 'executable')) |
| with open(self.executable_path, 'w+') as executable_file: |
| executable_file.write('Executable file') |
| os.chmod(self.executable_path, stat.S_IRWXU) |
| |
| self.sub_read_only_path = (os.path.join(self.sub_dir, 'read_only')) |
| with open(self.sub_read_only_path, 'w+') as read_file: |
| read_file.write('Read-only sub file') |
| os.chmod(self.sub_read_only_path, stat.S_IRUSR) |
| |
| self.sub_writable_path = (os.path.join(self.sub_dir, 'writable')) |
| with open(self.sub_writable_path, 'w+') as writable_file: |
| writable_file.write('Writable sub file') |
| os.chmod(self.sub_writable_path, stat.S_IRUSR | stat.S_IWUSR) |
| |
| self.sub_executable_path = (os.path.join(self.sub_dir, 'executable')) |
| with open(self.sub_executable_path, 'w+') as executable_file: |
| executable_file.write('Executable sub file') |
| os.chmod(self.sub_executable_path, stat.S_IRWXU) |
| |
| self.AssertExpectedDirFiles(self.tmp_dir) |
| self.archive_path = self.CreateZipArchiveFromDir(self.tmp_dir) |
| |
| def tearDown(self): |
| if os.path.isdir(self.tmp_dir): |
| dependency_manager_util.RemoveDir(self.tmp_dir) |
| if os.path.isfile(self.archive_path): |
| os.remove(self.archive_path) |
| |
| def AssertExpectedDirFiles(self, top_dir): |
| sub_dir = os.path.join(top_dir, 'sub_dir') |
| read_only_path = (os.path.join(top_dir, 'read_only')) |
| writable_path = (os.path.join(top_dir, 'writable')) |
| executable_path = (os.path.join(top_dir, 'executable')) |
| sub_read_only_path = (os.path.join(sub_dir, 'read_only')) |
| sub_writable_path = (os.path.join(sub_dir, 'writable')) |
| sub_executable_path = (os.path.join(sub_dir, 'executable')) |
| # assert contents as expected |
| self.assertTrue(os.path.isdir(top_dir)) |
| self.assertTrue(os.path.isdir(sub_dir)) |
| self.assertTrue(os.path.isfile(read_only_path)) |
| self.assertTrue(os.path.isfile(writable_path)) |
| self.assertTrue(os.path.isfile(executable_path)) |
| self.assertTrue(os.path.isfile(sub_read_only_path)) |
| self.assertTrue(os.path.isfile(sub_writable_path)) |
| self.assertTrue(os.path.isfile(sub_executable_path)) |
| |
| # assert permissions as expected |
| self.assertTrue( |
| stat.S_IRUSR & stat.S_IMODE(os.stat(read_only_path).st_mode)) |
| self.assertTrue( |
| stat.S_IRUSR & stat.S_IMODE(os.stat(sub_read_only_path).st_mode)) |
| self.assertTrue( |
| stat.S_IRUSR & stat.S_IMODE(os.stat(writable_path).st_mode)) |
| self.assertTrue( |
| stat.S_IWUSR & stat.S_IMODE(os.stat(writable_path).st_mode)) |
| self.assertTrue( |
| stat.S_IRUSR & stat.S_IMODE(os.stat(sub_writable_path).st_mode)) |
| self.assertTrue( |
| stat.S_IWUSR & stat.S_IMODE(os.stat(sub_writable_path).st_mode)) |
| if not sys.platform.startswith('win'): |
| self.assertEqual( |
| stat.S_IRWXU, |
| stat.S_IRWXU & stat.S_IMODE(os.stat(executable_path).st_mode)) |
| self.assertEqual( |
| stat.S_IRWXU, |
| stat.S_IRWXU & stat.S_IMODE(os.stat(sub_executable_path).st_mode)) |
| |
| def CreateZipArchiveFromDir(self, dir_path): |
| try: |
| base_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) |
| archive_path = shutil.make_archive(base_path, 'zip', dir_path) |
| self.assertTrue(os.path.exists(archive_path)) |
| self.assertTrue(zipfile.is_zipfile(archive_path)) |
| except: |
| if os.path.isfile(archive_path): |
| os.remove(archive_path) |
| raise |
| return archive_path |
| |
| def testRemoveDirWithSubDir(self): |
| dependency_manager_util.RemoveDir(self.tmp_dir) |
| |
| self.assertFalse(os.path.exists(self.tmp_dir)) |
| self.assertFalse(os.path.exists(self.sub_dir)) |
| self.assertFalse(os.path.exists(self.read_only_path)) |
| self.assertFalse(os.path.exists(self.writable_path)) |
| self.assertFalse(os.path.isfile(self.executable_path)) |
| self.assertFalse(os.path.exists(self.sub_read_only_path)) |
| self.assertFalse(os.path.exists(self.sub_writable_path)) |
| self.assertFalse(os.path.isfile(self.sub_executable_path)) |
| |
| def testUnzipFile(self): |
| self.AssertExpectedDirFiles(self.tmp_dir) |
| unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) |
| dependency_manager_util.UnzipArchive(self.archive_path, unzip_path) |
| self.AssertExpectedDirFiles(unzip_path) |
| self.AssertExpectedDirFiles(self.tmp_dir) |
| dependency_manager_util.RemoveDir(unzip_path) |
| |
| def testUnzipFileContainingLongPath(self): |
| try: |
| dir_path = self.tmp_dir |
| if sys.platform.startswith('win'): |
| dir_path = u'\\\\?\\' + dir_path |
| |
| archive_suffix = '' |
| # 260 is the Windows API path length limit. |
| while len(archive_suffix) < 260: |
| archive_suffix = os.path.join(archive_suffix, 'really') |
| contents_dir_path = os.path.join(dir_path, archive_suffix) |
| os.makedirs(contents_dir_path) |
| filename = os.path.join(contents_dir_path, 'longpath.txt') |
| open(filename, 'a').close() |
| |
| base_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) |
| archive_path = shutil.make_archive(base_path, 'zip', dir_path) |
| self.assertTrue(os.path.exists(archive_path)) |
| self.assertTrue(zipfile.is_zipfile(archive_path)) |
| except: |
| if os.path.isfile(archive_path): |
| os.remove(archive_path) |
| raise |
| |
| unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) |
| dependency_manager_util.UnzipArchive(archive_path, unzip_path) |
| dependency_manager_util.RemoveDir(unzip_path) |
| |
| def testUnzipFileFailure(self): |
| # zipfile is not used on MacOS. See crbug.com/700097. |
| if sys.platform.startswith('darwin'): |
| return |
| unzip_path = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) |
| self.assertFalse(os.path.exists(unzip_path)) |
| with mock.patch( |
| 'dependency_manager.dependency_manager_util.zipfile.ZipFile.extractall' # pylint: disable=line-too-long |
| ) as zipfile_mock: |
| zipfile_mock.side_effect = IOError |
| self.assertRaises( |
| IOError, dependency_manager_util.UnzipArchive, self.archive_path, |
| unzip_path) |
| self.AssertExpectedDirFiles(self.tmp_dir) |
| self.assertFalse(os.path.exists(unzip_path)) |
| |
| def testVerifySafeArchivePasses(self): |
| with zipfile.ZipFile(self.archive_path) as archive: |
| dependency_manager_util.VerifySafeArchive(archive) |
| |
| def testVerifySafeArchiveFailsOnRelativePathWithPardir(self): |
| tmp_file = tempfile.NamedTemporaryFile(delete=False) |
| tmp_file_name = tmp_file.name |
| tmp_file.write('Bad file!') |
| tmp_file.close() |
| with zipfile.ZipFile(self.archive_path, 'w') as archive: |
| archive.write(tmp_file_name, '../../foo') |
| self.assertRaises( |
| exceptions.ArchiveError, dependency_manager_util.VerifySafeArchive, |
| archive) |
| |