import asyncio
import uuid
from . import exceptions
from . import rtypes
[docs]class Lock:
"""A distributed lock.
This can be used as a context manager, with the lock being acquired and
released as you would expect:
.. code-block:: python
client = aetcd.Client()
# Create a lock that expires after 20 seconds
with client.lock(b'key', ttl=20) as lock:
# do something that requires the lock
print(lock.is_acquired())
# refresh the timeout on the lease
lock.refresh()
:param aetcd.client.Client client:
An instance of :class:`~aetcd.client.Client`
:param bytes key:
The key under which the lock will be stored.
:param int ttl:
Length of time for the lock to live for in seconds. The lock
will be released after this time elapses, unless refreshed.
"""
def __init__(self, client, key: bytes, ttl: int):
self._client = client
self.key = key
self.ttl = ttl
self.lease = None
# Store uuid as bytes, since it avoids having to decode each time we
# need to compare
self.uuid = uuid.uuid1().bytes
async def _try_acquire(self):
self.lease = await self._client.lease(self.ttl)
success, metadata = await self._client.transaction(
compare=[
self._client.transactions.create(self.key) == 0,
],
success=[
self._client.transactions.put(
self.key,
self.uuid,
lease=self.lease,
),
],
failure=[
self._client.transactions.get(self.key),
],
)
if success is True:
self.revision = metadata[0].response_put.header.revision
return True
self.revision = metadata[0][0][1].mod_revision
self.lease = None
return False
async def _wait_for_delete_event(self, timeout: int):
try:
await self._client.watch_once(
self.key,
timeout=timeout,
start_revision=self.revision + 1,
kind=rtypes.EventKind.DELETE,
)
except exceptions.WatchTimeoutError:
return
[docs] async def acquire(self, timeout: int = 10):
"""Acquire the lock.
:param int timeout:
Maximum time to wait before returning. ``None`` means
forever, any other value equal or greater than 0 is
the number of seconds.
:return:
``True`` if the lock has been acquired, ``False`` otherwise.
"""
loop = asyncio.get_running_loop()
deadline = None
if timeout is not None:
deadline = loop.time() + timeout
while True:
if await self._try_acquire():
return True
if deadline is not None:
remaining_timeout = max(deadline - loop.time(), 0)
if remaining_timeout == 0:
return False
else:
remaining_timeout = None
await self._wait_for_delete_event(remaining_timeout)
[docs] async def release(self):
"""Release the lock."""
success, _ = await self._client.transaction(
compare=[
self._client.transactions.value(self.key) == self.uuid,
],
success=[self._client.transactions.delete(self.key)],
failure=[],
)
return success
[docs] async def refresh(self):
"""Refresh the time to live on this lock."""
if self.lease is not None:
return await self.lease.refresh()
raise ValueError(f'no lease associated with this lock: {self.key!r}')
[docs] async def is_acquired(self):
"""Check if this lock is currently acquired."""
result = await self._client.get(self.key)
if result is None:
return False
return result.value == self.uuid
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exception_type, exception_value, traceback):
await self.release()