Source code for portalocker_tests.tests

from __future__ import print_function
from __future__ import with_statement

import os
import dataclasses
import multiprocessing
import time
import typing

import pytest
import portalocker
from portalocker import utils
from portalocker import LockFlags


[docs]def test_exceptions(tmpfile): # Open the file 2 times a = open(tmpfile, 'a') b = open(tmpfile, 'a') # Lock exclusive non-blocking lock_flags = portalocker.LOCK_EX | portalocker.LOCK_NB # First lock file a portalocker.lock(a, lock_flags) # Now see if we can lock file b with pytest.raises(portalocker.LockException): portalocker.lock(b, lock_flags) # Cleanup a.close() b.close()
[docs]def test_utils_base(): class Test(utils.LockBase): pass
[docs]def test_with_timeout(tmpfile): # Open the file 2 times with pytest.raises(portalocker.AlreadyLocked): with portalocker.Lock(tmpfile, timeout=0.1) as fh: print('writing some stuff to my cache...', file=fh) with portalocker.Lock( tmpfile, timeout=0.1, mode='wb', fail_when_locked=True ): pass print('writing more stuff to my cache...', file=fh)
[docs]def test_without_timeout(tmpfile): # Open the file 2 times with pytest.raises(portalocker.LockException): with portalocker.Lock(tmpfile, timeout=None) as fh: print('writing some stuff to my cache...', file=fh) with portalocker.Lock(tmpfile, timeout=None, mode='w'): pass print('writing more stuff to my cache...', file=fh)
[docs]def test_without_fail(tmpfile): # Open the file 2 times with pytest.raises(portalocker.LockException): with portalocker.Lock(tmpfile, timeout=0.1) as fh: print('writing some stuff to my cache...', file=fh) lock = portalocker.Lock(tmpfile, timeout=0.1) lock.acquire(check_interval=0.05, fail_when_locked=False)
[docs]def test_simple(tmpfile): with open(tmpfile, 'w') as fh: fh.write('spam and eggs') fh = open(tmpfile, 'r+') portalocker.lock(fh, portalocker.LOCK_EX) fh.seek(13) fh.write('foo') # Make sure we didn't overwrite the original text fh.seek(0) assert fh.read(13) == 'spam and eggs' portalocker.unlock(fh) fh.close()
[docs]def test_truncate(tmpfile): with open(tmpfile, 'w') as fh: fh.write('spam and eggs') with portalocker.Lock(tmpfile, mode='a+') as fh: # Make sure we didn't overwrite the original text fh.seek(0) assert fh.read(13) == 'spam and eggs' with portalocker.Lock(tmpfile, mode='w+') as fh: # Make sure we truncated the file assert fh.read() == ''
[docs]def test_class(tmpfile): lock = portalocker.Lock(tmpfile) lock2 = portalocker.Lock(tmpfile, fail_when_locked=False, timeout=0.01) with lock: lock.acquire() with pytest.raises(portalocker.LockException): with lock2: pass with lock2: pass
[docs]def test_acquire_release(tmpfile): lock = portalocker.Lock(tmpfile) lock2 = portalocker.Lock(tmpfile, fail_when_locked=False) lock.acquire() # acquire lock when nobody is using it with pytest.raises(portalocker.LockException): # another party should not be able to acquire the lock lock2.acquire(timeout=0.01) # re-acquire a held lock is a no-op lock.acquire() lock.release() # release the lock lock.release() # second release does nothing
[docs]def test_rlock_acquire_release_count(tmpfile): lock = portalocker.RLock(tmpfile) # Twice acquire h = lock.acquire() assert not h.closed lock.acquire() assert not h.closed # Two release lock.release() assert not h.closed lock.release() assert h.closed
[docs]def test_rlock_acquire_release(tmpfile): lock = portalocker.RLock(tmpfile) lock2 = portalocker.RLock(tmpfile, fail_when_locked=False) lock.acquire() # acquire lock when nobody is using it with pytest.raises(portalocker.LockException): # another party should not be able to acquire the lock lock2.acquire(timeout=0.01) # Now acquire again lock.acquire() lock.release() # release the lock lock.release() # second release does nothing
[docs]def test_release_unacquired(tmpfile): with pytest.raises(portalocker.LockException): portalocker.RLock(tmpfile).release()
[docs]def test_exlusive(tmpfile): with open(tmpfile, 'w') as fh: fh.write('spam and eggs') fh = open(tmpfile, 'r') portalocker.lock(fh, portalocker.LOCK_EX | portalocker.LOCK_NB) # Make sure we can't read the locked file with pytest.raises(portalocker.LockException): with open(tmpfile, 'r') as fh2: portalocker.lock(fh2, portalocker.LOCK_EX | portalocker.LOCK_NB) fh2.read() # Make sure we can't write the locked file with pytest.raises(portalocker.LockException): with open(tmpfile, 'w+') as fh2: portalocker.lock(fh2, portalocker.LOCK_EX | portalocker.LOCK_NB) fh2.write('surprise and fear') # Make sure we can explicitly unlock the file portalocker.unlock(fh) fh.close()
[docs]def test_shared(tmpfile): with open(tmpfile, 'w') as fh: fh.write('spam and eggs') f = open(tmpfile, 'r') portalocker.lock(f, portalocker.LOCK_SH | portalocker.LOCK_NB) # Make sure we can read the locked file with open(tmpfile, 'r') as fh2: portalocker.lock(fh2, portalocker.LOCK_SH | portalocker.LOCK_NB) assert fh2.read() == 'spam and eggs' # Make sure we can't write the locked file with pytest.raises(portalocker.LockException): with open(tmpfile, 'w+') as fh2: portalocker.lock(fh2, portalocker.LOCK_EX | portalocker.LOCK_NB) fh2.write('surprise and fear') # Make sure we can explicitly unlock the file portalocker.unlock(f) f.close()
[docs]def test_blocking_timeout(tmpfile): flags = LockFlags.SHARED with pytest.warns(UserWarning): with portalocker.Lock(tmpfile, timeout=5, flags=flags): pass lock = portalocker.Lock(tmpfile, flags=flags) with pytest.warns(UserWarning): lock.acquire(timeout=5)
[docs]@pytest.mark.skipif(os.name == 'nt', reason='Windows uses an entirely different lockmechanism') def test_nonblocking(tmpfile): with open(tmpfile, 'w') as fh: with pytest.raises(RuntimeError): portalocker.lock(fh, LockFlags.NON_BLOCKING)
[docs]def shared_lock(filename, **kwargs): with portalocker.Lock( filename, timeout=0.1, fail_when_locked=False, flags=LockFlags.SHARED | LockFlags.NON_BLOCKING, ): time.sleep(0.2) return True
[docs]def shared_lock_fail(filename, **kwargs): with portalocker.Lock( filename, timeout=0.1, fail_when_locked=True, flags=LockFlags.SHARED | LockFlags.NON_BLOCKING, ): time.sleep(0.2) return True
[docs]def exclusive_lock(filename, **kwargs): with portalocker.Lock( filename, timeout=0.1, fail_when_locked=False, flags=LockFlags.EXCLUSIVE | LockFlags.NON_BLOCKING, ): time.sleep(0.2) return True
[docs]@dataclasses.dataclass(order=True) class LockResult: exception_class: typing.Union[typing.Type[Exception], None] = None exception_message: typing.Union[str, None] = None exception_repr: typing.Union[str, None] = None
[docs]def lock( filename: str, fail_when_locked: bool, flags: LockFlags ) -> LockResult: # Returns a case of True, False or FileNotFound # https://thedailywtf.com/articles/what_is_truth_0x3f_ # But seriously, the exception properties cannot be safely pickled so we # only return string representations of the exception properties try: with portalocker.Lock( filename, timeout=0.1, fail_when_locked=fail_when_locked, flags=flags, ): time.sleep(0.2) return LockResult() except Exception as exception: # The exceptions cannot be pickled so we cannot return them through # multiprocessing return LockResult( type(exception), str(exception), repr(exception), )
[docs]@pytest.mark.parametrize('fail_when_locked', [True, False]) def test_shared_processes(tmpfile, fail_when_locked): flags = LockFlags.SHARED | LockFlags.NON_BLOCKING with multiprocessing.Pool(processes=2) as pool: args = tmpfile, fail_when_locked, flags results = pool.starmap_async(lock, 2 * [args]) for result in results.get(timeout=3): assert result == LockResult()
[docs]@pytest.mark.parametrize('fail_when_locked', [True, False]) def test_exclusive_processes(tmpfile, fail_when_locked): flags = LockFlags.EXCLUSIVE | LockFlags.NON_BLOCKING with multiprocessing.Pool(processes=2) as pool: # filename, fail_when_locked, flags args = tmpfile, fail_when_locked, flags a, b = pool.starmap_async(lock, 2 * [args]).get(timeout=3) assert not a.exception_class or not b.exception_class assert issubclass( a.exception_class or b.exception_class, portalocker.LockException )
[docs]@pytest.mark.skipif( os.name == 'nt', reason='Locking on Windows requires a file object', ) def test_lock_fileno(tmpfile): # Open the file 2 times a = open(tmpfile, 'a') b = open(tmpfile, 'a') # Lock exclusive non-blocking flags = LockFlags.SHARED | LockFlags.NON_BLOCKING # First lock file a portalocker.lock(a, flags) # Now see if we can lock using fileno() portalocker.lock(b.fileno(), flags) # Cleanup a.close() b.close()