| # Copyright 2016 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import multiprocessing |
| import os |
| import tempfile |
| import time |
| import unittest |
| |
| from six.moves import range # pylint: disable=redefined-builtin |
| |
| from py_utils import lock |
| |
| |
| def _AppendTextToFile(file_name): |
| with open(file_name, 'a') as f: |
| lock.AcquireFileLock(f, lock.LOCK_EX) |
| # Sleep 100 ms to increase the chance of another process trying to acquire |
| # the lock of file as the same time. |
| time.sleep(0.1) |
| f.write('Start') |
| for _ in range(10000): |
| f.write('*') |
| f.write('End') |
| |
| |
| def _ReadFileWithSharedLockBlockingThenWrite(read_file, write_file): |
| with open(read_file, 'r') as f: |
| lock.AcquireFileLock(f, lock.LOCK_SH) |
| content = f.read() |
| with open(write_file, 'a') as f2: |
| lock.AcquireFileLock(f2, lock.LOCK_EX) |
| f2.write(content) |
| |
| |
| def _ReadFileWithExclusiveLockNonBlocking(target_file, status_file): |
| with open(target_file, 'r') as f: |
| try: |
| lock.AcquireFileLock(f, lock.LOCK_EX | lock.LOCK_NB) |
| with open(status_file, 'w') as f2: |
| f2.write('LockException was not raised') |
| except lock.LockException: |
| with open(status_file, 'w') as f2: |
| f2.write('LockException raised') |
| |
| |
| class FileLockTest(unittest.TestCase): |
| def setUp(self): |
| tf = tempfile.NamedTemporaryFile(delete=False) |
| tf.close() |
| self.temp_file_path = tf.name |
| |
| def tearDown(self): |
| os.remove(self.temp_file_path) |
| |
| def testExclusiveLock(self): |
| processess = [] |
| for _ in range(10): |
| p = multiprocessing.Process( |
| target=_AppendTextToFile, args=(self.temp_file_path,)) |
| p.start() |
| processess.append(p) |
| for p in processess: |
| p.join() |
| |
| # If the file lock works as expected, there should be 10 atomic writes of |
| # 'Start***...***End' to the file in some order, which lead to the final |
| # file content as below. |
| expected_file_content = ''.join((['Start'] + ['*']*10000 + ['End']) * 10) |
| with open(self.temp_file_path, 'r') as f: |
| # Use assertTrue instead of assertEquals since the strings are big, hence |
| # assertEquals's assertion failure will contain huge strings. |
| self.assertTrue(expected_file_content == f.read()) |
| |
| def testSharedLock(self): |
| tf = tempfile.NamedTemporaryFile(delete=False) |
| tf.close() |
| temp_write_file = tf.name |
| try: |
| with open(self.temp_file_path, 'w') as f: |
| f.write('0123456789') |
| with open(self.temp_file_path, 'r') as f: |
| # First, acquire a shared lock on temp_file_path |
| lock.AcquireFileLock(f, lock.LOCK_SH) |
| |
| processess = [] |
| # Create 10 processes that also try to acquire shared lock from |
| # temp_file_path then append temp_file_path's content to temp_write_file |
| for _ in range(10): |
| p = multiprocessing.Process( |
| target=_ReadFileWithSharedLockBlockingThenWrite, |
| args=(self.temp_file_path, temp_write_file)) |
| p.start() |
| processess.append(p) |
| for p in processess: |
| p.join() |
| |
| # temp_write_file should contains 10 copy of temp_file_path's content. |
| with open(temp_write_file, 'r') as f: |
| self.assertEquals('0123456789'*10, f.read()) |
| finally: |
| os.remove(temp_write_file) |
| |
| def testNonBlockingLockAcquiring(self): |
| tf = tempfile.NamedTemporaryFile(delete=False) |
| tf.close() |
| temp_status_file = tf.name |
| try: |
| with open(self.temp_file_path, 'w') as f: |
| lock.AcquireFileLock(f, lock.LOCK_EX) |
| p = multiprocessing.Process( |
| target=_ReadFileWithExclusiveLockNonBlocking, |
| args=(self.temp_file_path, temp_status_file)) |
| p.start() |
| p.join() |
| with open(temp_status_file, 'r') as f: |
| self.assertEquals('LockException raised', f.read()) |
| finally: |
| os.remove(temp_status_file) |
| |
| def testUnlockBeforeClosingFile(self): |
| tf = tempfile.NamedTemporaryFile(delete=False) |
| tf.close() |
| temp_status_file = tf.name |
| try: |
| with open(self.temp_file_path, 'r') as f: |
| lock.AcquireFileLock(f, lock.LOCK_SH) |
| lock.ReleaseFileLock(f) |
| p = multiprocessing.Process( |
| target=_ReadFileWithExclusiveLockNonBlocking, |
| args=(self.temp_file_path, temp_status_file)) |
| p.start() |
| p.join() |
| with open(temp_status_file, 'r') as f: |
| self.assertEquals('LockException was not raised', f.read()) |
| finally: |
| os.remove(temp_status_file) |
| |
| def testContextualLock(self): |
| tf = tempfile.NamedTemporaryFile(delete=False) |
| tf.close() |
| temp_status_file = tf.name |
| try: |
| with open(self.temp_file_path, 'r') as f: |
| with lock.FileLock(f, lock.LOCK_EX): |
| # Within this block, accessing self.temp_file_path from another |
| # process should raise exception. |
| p = multiprocessing.Process( |
| target=_ReadFileWithExclusiveLockNonBlocking, |
| args=(self.temp_file_path, temp_status_file)) |
| p.start() |
| p.join() |
| with open(temp_status_file, 'r') as f: |
| self.assertEquals('LockException raised', f.read()) |
| |
| # Accessing self.temp_file_path here should not raise exception. |
| p = multiprocessing.Process( |
| target=_ReadFileWithExclusiveLockNonBlocking, |
| args=(self.temp_file_path, temp_status_file)) |
| p.start() |
| p.join() |
| with open(temp_status_file, 'r') as f: |
| self.assertEquals('LockException was not raised', f.read()) |
| finally: |
| os.remove(temp_status_file) |