Source code for aetcd3.locks

import uuid

import tenacity


# from etcd3 import exceptions


lock_prefix = '/locks/'


[docs]class Lock(object): """ A distributed lock. This can be used as a context manager, with the lock being acquired and released as you would expect: .. code-block:: python etcd = etcd3.client() # create a lock that expires after 20 seconds with etcd.lock('toot', ttl=20) as lock: # do something that requires the lock print(lock.is_acquired()) # refresh the timeout on the lease lock.refresh() :param name: name of the lock :type name: string or bytes :param ttl: length of time for the lock to live for in seconds. The lock will be released after this time elapses, unless refreshed :type ttl: int """ def __init__(self, name, ttl=60, etcd_client=None): self.name = name self.ttl = ttl if etcd_client is not None: self.etcd_client = etcd_client self.key = lock_prefix + self.name self.lease = None # store uuid as bytes, since it avoids having to decode each time we # need to compare self.uuid = uuid.uuid1().bytes
[docs] async def acquire(self, timeout=10): """Acquire the lock. :params timeout: Maximum time to wait before returning. `None` means forever, any other value equal or greater than 0 is the number of seconds. :returns: True if the lock has been acquired, False otherwise. """ stop = ( tenacity.stop_never if timeout is None else tenacity.stop_after_delay(timeout) ) def wait(retry_state): # if timeout is None: # remaining_timeout = None # else: # remaining_timeout = max(timeout - retry_state.start_time, 0) # TODO(jd): Wait for a DELETE event to happen: that'd mean the lock # has been released, rather than retrying on PUT events too # try: # await self.etcd_client.watch_once(self.key, # remaining_timeout) # except exceptions.WatchTimedOut: # pass return 0 @tenacity.retry(retry=tenacity.retry_never, stop=stop, wait=wait) async def _acquire(): # TODO: save the created revision so we can check it later to make # sure we still have the lock self.lease = await self.etcd_client.lease(self.ttl) success, _ = await self.etcd_client.transaction( compare=[ self.etcd_client.transactions.create(self.key) == 0, ], success=[ self.etcd_client.transactions.put( self.key, self.uuid, lease=self.lease, ), ], failure=[ self.etcd_client.transactions.get(self.key), ], ) if success is True: return True self.lease = None raise tenacity.TryAgain try: return await _acquire() except tenacity.RetryError: return False
[docs] async def release(self): """Release the lock.""" success, _ = await self.etcd_client.transaction( compare=[ self.etcd_client.transactions.value(self.key) == self.uuid, ], success=[self.etcd_client.transactions.delete(self.key)], failure=[], ) return success
[docs] async def refresh(self): """Refresh the time to live on this lock.""" if self.lease is not None: return await self.lease.refresh() else: raise ValueError('No lease associated with this lock - have you ' 'acquired the lock yet?')
[docs] async def is_acquired(self): """Check if this lock is currently acquired.""" uuid, _ = await self.etcd_client.get(self.key) if uuid is None: return False return uuid == self.uuid
async def __aenter__(self): await self.acquire() return self async def __aexit__(self, exception_type, exception_value, traceback): await self.release()