Source code for portalocker.portalocker

import os

import typing

from . import constants
from . import exceptions


# Alias for readability. Due to import recursion issues we cannot do:
# from .constants import LockFlags
LockFlags = constants.LockFlags


if os.name == 'nt':  # pragma: no cover
    import msvcrt
    import pywintypes
    import win32con
    import win32file
    import winerror

    __overlapped = pywintypes.OVERLAPPED()


    def lock(file_: typing.IO, flags: LockFlags):
        mode = 0
        if flags & LockFlags.NON_BLOCKING:
            mode |= win32con.LOCKFILE_FAIL_IMMEDIATELY

        if flags & LockFlags.EXCLUSIVE:
            mode |= win32con.LOCKFILE_EXCLUSIVE_LOCK

        # Save the old position so we can go back to that position but
        # still lock from the beginning of the file
        savepos = file_.tell()
        if savepos:
            file_.seek(0)

        os_fh = msvcrt.get_osfhandle(file_.fileno())
        try:
            win32file.LockFileEx(os_fh, mode, 0, -0x10000, __overlapped)
        except pywintypes.error as exc_value:
            # error: (33, 'LockFileEx', 'The process cannot access the file
            # because another process has locked a portion of the file.')
            if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION:
                raise exceptions.AlreadyLocked(
                    exceptions.LockException.LOCK_FAILED,
                    exc_value.strerror,
                    fh=file_
                )
            else:
                # Q:  Are there exceptions/codes we should be dealing with
                # here?
                raise
        finally:
            if savepos:
                file_.seek(savepos)


    def unlock(file_: typing.IO):
        try:
            savepos = file_.tell()
            if savepos:
                file_.seek(0)

            os_fh = msvcrt.get_osfhandle(file_.fileno())
            try:
                win32file.UnlockFileEx(
                    os_fh, 0, -0x10000, __overlapped
                )
            except pywintypes.error as exc:
                if exc.winerror == winerror.ERROR_NOT_LOCKED:
                    # error: (158, 'UnlockFileEx',
                    #         'The segment is already unlocked.')
                    # To match the 'posix' implementation, silently
                    # ignore this error
                    pass
                else:
                    # Q:  Are there exceptions/codes we should be
                    # dealing with here?
                    raise
            finally:
                if savepos:
                    file_.seek(savepos)
        except IOError as exc:
            raise exceptions.LockException(
                exceptions.LockException.LOCK_FAILED, exc.strerror,
                fh=file_
            )

elif os.name == 'posix':  # pragma: no cover
    import fcntl


[docs] def lock(file_: typing.IO, flags: LockFlags): locking_exceptions = IOError, try: # pragma: no cover locking_exceptions += BlockingIOError, # type: ignore except NameError: # pragma: no cover pass # Locking with NON_BLOCKING without EXCLUSIVE or SHARED enabled results # in an error if ((flags & LockFlags.NON_BLOCKING) and not flags & (LockFlags.SHARED | LockFlags.EXCLUSIVE)): raise RuntimeError('When locking in non-blocking mode the SHARED ' 'or EXCLUSIVE flag must be specified as well') try: fcntl.flock(file_, flags) except locking_exceptions as exc_value: # The exception code varies on different systems so we'll catch # every IO error raise exceptions.LockException(exc_value, fh=file_)
[docs] def unlock(file_: typing.IO, ): fcntl.flock(file_.fileno(), LockFlags.UNBLOCK)
else: # pragma: no cover raise RuntimeError('PortaLocker only defined for nt and posix platforms')