Source code for aetcd.client

import asyncio
import functools
import inspect
import typing

from . import exceptions
from . import leases
from . import locks
from . import members
from . import rpc
from . import rtypes
from . import transactions
from . import utils
from . import watcher


def _handle_errors(f):
    if inspect.iscoroutinefunction(f):
        async def handler(*args, **kwargs):
            try:
                return await f(*args, **kwargs)
            except rpc.AioRpcError as e:
                exceptions._handle_exception(e)
    elif inspect.isasyncgenfunction(f):
        async def handler(*args, **kwargs):
            try:
                async for data in f(*args, **kwargs):
                    yield data
            except rpc.AioRpcError as e:
                exceptions._handle_exception(e)
    else:
        raise RuntimeError(
            f'provided function {f.__name__!r} is neither a coroutine nor an async generator')

    return functools.wraps(f)(handler)


def _ensure_connected(f):
    if inspect.iscoroutinefunction(f):
        async def handler(self, *args, **kwargs):
            await self.connect()
            return await f(self, *args, **kwargs)
    elif inspect.isasyncgenfunction(f):
        async def handler(self, *args, **kwargs):
            await self.connect()
            async for data in f(self, *args, **kwargs):
                yield data
    else:
        raise RuntimeError(
            f'provided function {f.__name__!r} is neither a coroutine nor an async generator')

    return functools.wraps(f)(handler)


class Transactions:
    def __init__(self):
        self.value = transactions.Value
        self.version = transactions.Version
        self.create = transactions.Create
        self.mod = transactions.Mod

        self.put = transactions.Put
        self.get = transactions.Get
        self.delete = transactions.Delete
        self.txn = transactions.Txn


class Status:
    def __init__(self, version, db_size, leader, raft_index, raft_term):
        self.version = version
        self.db_size = db_size
        self.leader = leader
        self.raft_index = raft_index
        self.raft_term = raft_term


[docs]class Alarm: """A cluster member alarm. :param alarm_type: Type of the alarm. :param member_id: Cluster member ID. """ def __init__(self, alarm_type, member_id): self.alarm_type = alarm_type self.member_id = member_id
[docs]class Client: """Client provides and manages a client session. The client can be used as an async context manager. :param str host: etcd host address, as IP address or a domain name. :param int port: etcd port number to connect to. :param str username: The name of the user used for authentication. :param str password: Password to be used for authentication. :param int timeout: Connection timeout in seconds. :param dict options: Options provided to the underlying gRPC channel. :param int connect_wait_timeout: Connecting wait timeout, since connection could be initiated from multiple coroutines. :return: A :class:`~aetcd.client.Client` instance. """ def __init__( self, host: str = 'localhost', port: int = 2379, username: typing.Optional[str] = None, password: typing.Optional[str] = None, timeout: typing.Optional[int] = None, options: typing.Optional[typing.Dict[str, typing.Any]] = None, connect_wait_timeout: int = 3, ): self._host = host self._port = port self._username = username self._password = password self._timeout = timeout self._options = options or {} self._connect_wait_timeout = connect_wait_timeout self._connected = asyncio.Event() self._is_connecting = False cred_params = (self._username, self._password) if any(cred_params) and None in cred_params: raise Exception( 'if using authentication credentials both username and password ' 'must be provided') self._init_channel_attrs() self.transactions = Transactions() def _init_channel_attrs(self): # These attributes will be assigned during opening of GRPC channel self.channel = None self.metadata = None self.auth_stub = None self.kvstub = None self.clusterstub = None self.leasestub = None self.maintenancestub = None self._watcher = None
[docs] @_handle_errors async def connect(self) -> None: """Establish a connection to an etcd.""" if self._connected.is_set(): return if self._is_connecting: # Another task is establishing a connection, just wait await asyncio.wait_for( self._connected.wait(), self._connect_wait_timeout, ) return try: self._is_connecting = True target = f'{self._host}:{self._port}' self.channel = rpc.insecure_channel(target, options=self._options.items()) cred_params = [c is not None for c in (self._username, self._password)] if all(cred_params): self.auth_stub = rpc.AuthStub(self.channel) auth_request = rpc.AuthenticateRequest( name=self._username, password=self._password, ) resp = await self.auth_stub.Authenticate(auth_request, timeout=self._timeout) self.metadata = (('token', resp.token),) self.kvstub = rpc.KVStub(self.channel) self.clusterstub = rpc.ClusterStub(self.channel) self.leasestub = rpc.LeaseStub(self.channel) self.maintenancestub = rpc.MaintenanceStub(self.channel) # Initialize a watcher self._watcher = watcher.Watcher( rpc.WatchStub(self.channel), timeout=self._timeout, metadata=self.metadata, ) self._connected.set() finally: self._is_connecting = False
[docs] async def close(self) -> None: """Close established connection and frees allocated resources. It could be called while other operation is being executed. """ if not self._connected.is_set() and self._is_connecting: # Wait for the previous request to complete await asyncio.wait_for( self._connected.wait(), self._connect_wait_timeout, ) if self.channel: # Shutdown the watcher if self._watcher is not None: await self._watcher.shutdown() # Close the underlying RPC channel if self.channel: # Check it again since it could be modified by a concurrent task await self.channel.close() self._init_channel_attrs() self._connected.clear()
async def __aenter__(self): await self.connect() return self async def __aexit__(self, *args): await self.close()
[docs] @_handle_errors @_ensure_connected async def get( self, key: bytes, serializable: bool = False, ) -> typing.Optional[rtypes.Get]: """Get a single key from the key-value store. :param bytes key: Key in etcd to get. :param bool serializable: Whether to allow serializable reads. This can result in stale reads. :return: A instance of :class:`~aetcd.rtypes.Get` or ``None``, if the the key was not found. Usage example: .. code-block:: python import aetcd client = aetcd.Client() await client.get(b'key') """ range_request = self._build_get_range_request( key, serializable=serializable, ) range_response = await self.kvstub.Range( range_request, timeout=self._timeout, metadata=self.metadata, ) if range_response.count < 1: return None return rtypes.Get(range_response.header, range_response.kvs.pop())
[docs] @_handle_errors @_ensure_connected async def get_prefix( self, key_prefix: bytes, sort_order: typing.Optional[str] = None, sort_target: str = 'key', keys_only: bool = False, ) -> rtypes.GetRange: """Get a range of keys with a prefix from the key-value store. :param bytes key_prefix: Key prefix to get. :return: An instance of :class:`~aetcd.rtypes.GetRange`. """ range_request = self._build_get_range_request( key=key_prefix, range_end=utils.prefix_range_end(key_prefix), sort_order=sort_order, sort_target=sort_target, keys_only=keys_only, ) range_response = await self.kvstub.Range( range_request, timeout=self._timeout, metadata=self.metadata, ) return rtypes.GetRange( range_response.header, range_response.kvs, range_response.more, range_response.count, )
[docs] @_handle_errors @_ensure_connected async def get_range( self, range_start: bytes, range_end: bytes, sort_order: typing.Optional[str] = None, sort_target: str = 'key', ) -> rtypes.GetRange: """Get a range of keys from the key-value store. :param bytes range_start: First key in range. :param bytes range_end: Last key in range. :return: An instance of :class:`~aetcd.rtypes.GetRange`. """ range_request = self._build_get_range_request( key=range_start, range_end=range_end, sort_order=sort_order, sort_target=sort_target, ) range_response = await self.kvstub.Range( range_request, timeout=self._timeout, metadata=self.metadata, ) return rtypes.GetRange( range_response.header, range_response.kvs, range_response.more, range_response.count, )
[docs] @_handle_errors @_ensure_connected async def get_all( self, sort_order=None, sort_target='key', keys_only=False, ) -> rtypes.GetRange: """Get all keys from the key-value store. :return: An instance of :class:`~aetcd.rtypes.GetRange`. """ range_request = self._build_get_range_request( key=b'\0', range_end=b'\0', sort_order=sort_order, sort_target=sort_target, keys_only=keys_only, ) range_response = await self.kvstub.Range( range_request, timeout=self._timeout, metadata=self.metadata, ) return rtypes.GetRange( range_response.header, range_response.kvs, range_response.more, range_response.count, )
[docs] @_handle_errors @_ensure_connected async def put( self, key: bytes, value: bytes, lease: typing.Optional[typing.Union[int, leases.Lease]] = None, prev_kv: bool = False, ) -> rtypes.Put: """Put the given key into the key-value store. :param bytes key: Key in etcd to set. :param bytes value: Value to set key to. :param lease: Lease to associate with this key. :type lease: either :class:`~aetcd.leases.Lease`, or ``int`` (ID of a lease), or ``None`` :param bool prev_kv: Whether to return the previous key-value pair. :return: A instance of :class:`~aetcd.rtypes.Put`. Usage example: .. code-block:: python import aetcd client = aetcd.Client() await client.put(b'key', b'value') """ put_request = self._build_put_request( key, value, lease=lease, prev_kv=prev_kv, ) put_response = await self.kvstub.Put( put_request, timeout=self._timeout, metadata=self.metadata, ) return rtypes.Put( put_response.header, put_response.prev_kv if prev_kv else None, )
[docs] @_handle_errors @_ensure_connected async def delete( self, key: bytes, prev_kv: bool = False, ) -> typing.Optional[rtypes.Delete]: """Delete a single key from the key-value store. :param bytes key: Key in etcd to delete. :param bool prev_kv: Whether to return the deleted key-value pair. :return: A instance of :class:`~aetcd.rtypes.Delete` or ``None``, if the the key was not found. Usage example: .. code-block:: python import aetcd client = aetcd.Client() await client.put(b'key', b'value') await client.delete(b'key') """ delete_request = self._build_delete_request(key, prev_kv=prev_kv) delete_response = await self.kvstub.DeleteRange( delete_request, timeout=self._timeout, metadata=self.metadata, ) if delete_response.deleted < 1: return None return rtypes.Delete( delete_response.header, delete_response.deleted, delete_response.prev_kvs.pop() if prev_kv else None, )
[docs] @_handle_errors @_ensure_connected async def delete_prefix( self, key_prefix: bytes, prev_kv: bool = False, ) -> rtypes.DeleteRange: """Delete a range of keys with a prefix from the key-value store. :param bytes key_prefix: Key prefix to delete. :param bool prev_kv: Whether to return deleted key-value pairs. :return: An instance of :class:`~aetcd.rtypes.DeleteRange`. """ delete_request = self._build_delete_request( key_prefix, range_end=utils.prefix_range_end(key_prefix), prev_kv=prev_kv, ) delete_response = await self.kvstub.DeleteRange( delete_request, timeout=self._timeout, metadata=self.metadata, ) return rtypes.DeleteRange( delete_response.header, delete_response.deleted, delete_response.prev_kvs, )
[docs] @_handle_errors @_ensure_connected async def delete_range( self, range_start: bytes, range_end: bytes, prev_kv: bool = False, ) -> rtypes.DeleteRange: """Delete a range of keys from the key-value store. :param bytes range_start: First key in range. :param bytes range_end: Last key in range. :param bool prev_kv: Whether to return deleted key-value pairs. :return: An instance of :class:`~aetcd.rtypes.DeleteRange`. """ delete_request = self._build_delete_request( range_start, range_end=range_end, prev_kv=prev_kv, ) delete_response = await self.kvstub.DeleteRange( delete_request, timeout=self._timeout, metadata=self.metadata, ) return rtypes.DeleteRange( delete_response.header, delete_response.deleted, delete_response.prev_kvs, )
[docs] async def replace( self, key: bytes, initial_value: bytes, new_value: bytes, lease: typing.Optional[typing.Union[int, leases.Lease]] = None, ): """Atomically replace the value of a key with a new value. This compares the current value of a key, then replaces it with a new value if it is equal to a specified value. This operation takes place in a transaction. :param key: Key in etcd to replace. :param bytes initial_value: Old value to replace. :param bytes new_value: New value of the key :param lease: Lease to associate with this key. :type lease: either :class:`~aetcd.leases.Lease`, or ``int`` (ID of a lease), or ``None`` :return: A status of transaction, ``True`` if the replace was successful, ``False`` otherwise. """ status, _ = await self.transaction( compare=[ self.transactions.value(key) == initial_value, ], success=[ self.transactions.put(key, new_value, lease=lease), ], failure=[], ) return status
[docs] @_handle_errors @_ensure_connected async def watch( self, key: bytes, range_end: typing.Optional[bytes] = None, start_revision: typing.Optional[int] = None, progress_notify: bool = False, kind: typing.Optional[rtypes.EventKind] = None, prev_kv: bool = False, watch_id: typing.Optional[int] = None, fragment: bool = False, ): """Watch a key. :param bytes key: Key to watch. :param bytes range_end: End of the range ``[key, range_end)`` to watch. If ``range_end`` is not given, only the ``key`` argument is watched. If ``range_end`` is equal to ``x00``, all keys greater than or equal to the ``key`` argument are watched. If the ``range_end`` is one bit larger than the given ``key``, then all keys with the prefix (the given ``key``) will be watched. :param int start_revision: Revision to watch from (inclusive). :param bool progress_notify: If set, the server will periodically send a response with no events to the new watcher if there are no recent events. :param aetcd.rtypes.EventKind kind: Filter the events by :class:`~aetcd.rtypes.EventKind`, at server side before it sends back to the watcher. :param bool prev_kv: If set, created watcher gets the previous key-value before the event happend. If the previous key-value is already compacted, nothing will be returned. :param int watch_id: If provided and non-zero, it will be assigned as ID to this watcher. :param bool fragment: Enable splitting large revisions into multiple watch responses. :return: A instance of :class:`~aetcd.rtypes.Watch`. Usage example: .. code-block:: python async for event in await client.watch(b'key'): print(event) """ events = asyncio.Queue() async def response_callback(response): await events.put(response) watcher_callback = await self._watcher.add_callback( key, response_callback, range_end=range_end, start_revision=start_revision, progress_notify=progress_notify, kind=kind, prev_kv=prev_kv, watch_id=watch_id, fragment=fragment, ) canceled = asyncio.Event() async def cancel(): canceled.set() await events.put(None) await self._watcher.cancel(watcher_callback.watch_id) @_handle_errors async def iterator(): while not canceled.is_set(): event = await events.get() if event is None: canceled.set() if isinstance(event, Exception): canceled.set() raise event if not canceled.is_set(): yield event return rtypes.Watch( iterator, cancel, watcher_callback.watch_id, )
[docs] async def watch_prefix( self, key_prefix: bytes, range_end: typing.Optional[bytes] = None, start_revision: typing.Optional[int] = None, progress_notify: bool = False, kind: typing.Optional[rtypes.EventKind] = None, prev_kv: bool = False, watch_id: typing.Optional[int] = None, fragment: bool = False, ): """Watch a range of keys with a prefix. :param bytes key_prefix: Key prefix to watch. :param bytes range_end: End of the range ``[key, range_end)`` to watch. If ``range_end`` is not given, only the ``key`` argument is watched. If ``range_end`` is equal to ``x00``, all keys greater than or equal to the ``key`` argument are watched. If the ``range_end`` is one bit larger than the given ``key``, then all keys with the prefix (the given ``key``) will be watched. :param int start_revision: Revision to watch from (inclusive). :param bool progress_notify: If set, the server will periodically send a response with no events to the new watcher if there are no recent events. :param aetcd.rtypes.EventKind kind: Filter the events by :class:`~aetcd.rtypes.EventKind`, at server side before it sends back to the watcher. :param bool prev_kv: If set, created watcher gets the previous key-value before the event happend. If the previous key-value is already compacted, nothing will be returned. :param int watch_id: If provided and non-zero, it will be assigned as ID to this watcher. :param bool fragment: Enable splitting large revisions into multiple watch responses. :return: A instance of :class:`~aetcd.rtypes.Watch`. """ return await self.watch( key_prefix, range_end=utils.prefix_range_end(key_prefix), start_revision=start_revision, progress_notify=progress_notify, kind=kind, prev_kv=prev_kv, watch_id=watch_id, fragment=fragment, )
[docs] @_handle_errors @_ensure_connected async def watch_once( self, key: bytes, timeout: typing.Optional[int] = None, range_end: typing.Optional[bytes] = None, start_revision: typing.Optional[int] = None, progress_notify: bool = False, kind: typing.Optional[rtypes.EventKind] = None, prev_kv: bool = False, watch_id: typing.Optional[int] = None, fragment: bool = False, ): """Watch a key and stops after the first event. If the timeout was specified and event didn't arrived method will raise ``WatchTimeoutError`` exception. :param bytes key: Key to watch. :param bytes range_end: End of the range ``[key, range_end)`` to watch. If ``range_end`` is not given, only the ``key`` argument is watched. If ``range_end`` is equal to ``x00``, all keys greater than or equal to the ``key`` argument are watched. If the ``range_end`` is one bit larger than the given ``key``, then all keys with the prefix (the given ``key``) will be watched. :param int start_revision: Revision to watch from (inclusive). :param bool progress_notify: If set, the server will periodically send a response with no events to the new watcher if there are no recent events. :param aetcd.rtypes.EventKind kind: Filter the events by :class:`~aetcd.rtypes.EventKind`, at server side before it sends back to the watcher. :param bool prev_kv: If set, created watcher gets the previous key-value before the event happend. If the previous key-value is already compacted, nothing will be returned. :param int watch_id: If provided and non-zero, it will be assigned as ID to this watcher. :param bool fragment: Enable splitting large revisions into multiple watch responses. :return: An instance of :class:`~aetcd.rtypes.Event`. """ event_queue = asyncio.Queue() watcher_callback = await self._watcher.add_callback( key, event_queue.put, range_end=range_end, start_revision=start_revision, progress_notify=progress_notify, kind=kind, prev_kv=prev_kv, watch_id=watch_id, fragment=fragment, ) try: return await asyncio.wait_for(event_queue.get(), timeout) except asyncio.TimeoutError: raise exceptions.WatchTimeoutError finally: await self._watcher.cancel(watcher_callback.watch_id)
[docs] async def watch_prefix_once( self, key_prefix: bytes, timeout: typing.Optional[int] = None, range_end: typing.Optional[bytes] = None, start_revision: typing.Optional[int] = None, progress_notify: bool = False, kind: typing.Optional[rtypes.EventKind] = None, prev_kv: bool = False, watch_id: typing.Optional[int] = None, fragment: bool = False, ): """Watch a range of keys with a prefix and stops after the first event. If the timeout was specified and event didn't arrived method will raise ``WatchTimeoutError`` exception. :param bytes key_prefix: Key prefix to watch. :param bytes range_end: End of the range ``[key, range_end)`` to watch. If ``range_end`` is not given, only the ``key`` argument is watched. If ``range_end`` is equal to ``x00``, all keys greater than or equal to the ``key`` argument are watched. If the ``range_end`` is one bit larger than the given ``key``, then all keys with the prefix (the given ``key``) will be watched. :param int start_revision: Revision to watch from (inclusive). :param bool progress_notify: If set, the server will periodically send a response with no events to the new watcher if there are no recent events. :param aetcd.rtypes.EventKind kind: Filter the events by :class:`~aetcd.rtypes.EventKind`, at server side before it sends back to the watcher. :param bool prev_kv: If set, created watcher gets the previous key-value before the event happend. If the previous key-value is already compacted, nothing will be returned. :param int watch_id: If provided and non-zero, it will be assigned as ID to this watcher. :param bool fragment: Enable splitting large revisions into multiple watch responses. :return: An instance of :class:`~aetcd.rtypes.Event`. """ return await self.watch_once( key_prefix, timeout=timeout, range_end=utils.prefix_range_end(key_prefix), start_revision=start_revision, progress_notify=progress_notify, kind=kind, prev_kv=prev_kv, watch_id=watch_id, fragment=fragment, )
[docs] @_handle_errors @_ensure_connected async def transaction(self, compare, success=None, failure=None): """Perform a transaction. :param compare: A list of comparisons to make :param success: A list of operations to perform if all the comparisons are true. :param failure: A list of operations to perform if any of the comparisons are false. :return: A tuple of (operation status, responses). Usage example: .. code-block:: python await client.transaction( compare=[ client.transactions.value(b'key') == b'value', client.transactions.version(b'key') > 0, ], success=[ client.transactions.put(b'key', b'success'), ], failure=[ client.transactions.put(b'key', b'failure'), ] ) """ compare = [c.build_message() for c in compare] success_ops = self._ops_to_requests(success) failure_ops = self._ops_to_requests(failure) transaction_request = rpc.TxnRequest( compare=compare, success=success_ops, failure=failure_ops, ) txn_response = await self.kvstub.Txn( transaction_request, timeout=self._timeout, metadata=self.metadata, ) responses = [] for response in txn_response.responses: response_type = response.WhichOneof('response') if response_type in ['response_put', 'response_delete_range', 'response_txn']: responses.append(response) elif response_type == 'response_range': range_kvs = [] for kv in response.response_range.kvs: range_kvs.append( ( kv.value, rtypes.Get(txn_response.header, kv), ), ) responses.append(range_kvs) return txn_response.succeeded, responses
[docs] @_handle_errors @_ensure_connected async def lease(self, ttl, lease_id=None): """Create a new lease. All keys attached to this lease will be expired and deleted if the lease expires. A lease can be sent keep alive messages to refresh the ttl. :param int ttl: Requested time to live. :param lease_id: Requested ID for the lease. :return: A new lease, an instance of :class:`~aetcd.leases.Lease`. """ lease_grant_request = rpc.LeaseGrantRequest(TTL=ttl, ID=lease_id) lease_grant_response = await self.leasestub.LeaseGrant( lease_grant_request, timeout=self._timeout, metadata=self.metadata, ) return leases.Lease( lease_id=lease_grant_response.ID, ttl=lease_grant_response.TTL, etcd_client=self, )
[docs] @_handle_errors @_ensure_connected async def revoke_lease(self, lease_id): """Revoke a lease. :param lease_id: ID of the lease to revoke. """ lease_revoke_request = rpc.LeaseRevokeRequest(ID=lease_id) await self.leasestub.LeaseRevoke( lease_revoke_request, timeout=self._timeout, metadata=self.metadata, )
@_handle_errors @_ensure_connected async def refresh_lease(self, lease_id): async for reply in self.leasestub.LeaseKeepAlive( [rpc.LeaseKeepAliveRequest(ID=lease_id)], timeout=self._timeout, metadata=self.metadata, ): return reply @_handle_errors @_ensure_connected async def get_lease_info(self, lease_id, *, keys=True): ttl_request = rpc.LeaseTimeToLiveRequest( ID=lease_id, keys=keys, ) return await self.leasestub.LeaseTimeToLive( ttl_request, timeout=self._timeout, metadata=self.metadata, )
[docs] def lock(self, key: bytes, ttl: int = 60): """Create a new lock. :param bytes key: The key under which the lock will be stored. :param int ttl: Length of time for the lock to live for in seconds. The lock will be released after this time elapses, unless refreshed. :return: A new lock, an instance of :class:`~aetcd.locks.Lock`. """ return locks.Lock(self, key, ttl)
[docs] @_handle_errors @_ensure_connected async def status(self): """Get the status of the responding member.""" status_request = rpc.StatusRequest() status_response = await self.maintenancestub.Status( status_request, timeout=self._timeout, metadata=self.metadata, ) async for m in self.members(): if m.id == status_response.leader: leader = m break else: # raise exception? leader = None return Status( status_response.version, status_response.dbSize, leader, status_response.raftIndex, status_response.raftTerm, )
[docs] @_handle_errors @_ensure_connected async def add_member(self, urls): """Add a member into the cluster. :return: A new member, an instance of :class:`~aetcd.members.Member`. """ member_add_request = rpc.MemberAddRequest(peerURLs=urls) member_add_response = await self.clusterstub.MemberAdd( member_add_request, timeout=self._timeout, metadata=self.metadata, ) member = member_add_response.member return members.Member( member.ID, member.name, member.peerURLs, member.clientURLs, etcd_client=self, )
[docs] @_handle_errors @_ensure_connected async def remove_member(self, member_id): """Remove an existing member from the cluster. :param member_id: ID of the member to remove. """ member_rm_request = rpc.MemberRemoveRequest(ID=member_id) await self.clusterstub.MemberRemove( member_rm_request, timeout=self._timeout, metadata=self.metadata, )
[docs] @_handle_errors @_ensure_connected async def update_member(self, member_id, peer_urls): """Update the configuration of an existing member in the cluster. :param member_id: ID of the member to update. :param peer_urls: New list of peer URLs the member will use to communicate with the cluster. """ member_update_request = rpc.MemberUpdateRequest( ID=member_id, peerURLs=peer_urls, ) await self.clusterstub.MemberUpdate( member_update_request, timeout=self._timeout, metadata=self.metadata, )
[docs] @_ensure_connected async def members(self): """List of all members associated with the cluster. :return: A sequence of :class:`~aetcd.members.Member`. """ member_list_request = rpc.MemberListRequest() member_list_response = await self.clusterstub.MemberList( member_list_request, timeout=self._timeout, metadata=self.metadata, ) for member in member_list_response.members: yield members.Member( member.ID, member.name, member.peerURLs, member.clientURLs, etcd_client=self, )
[docs] @_handle_errors @_ensure_connected async def compact(self, revision, physical=False): """Compact the event history in etcd up to a given revision. All superseded keys with a revision less than the compaction revision will be removed. :param revision: Revision for the compaction operation. :param physical: If set to ``True``, the request will wait until the compaction is physically applied to the local database such that compacted entries are totally removed from the backend database. """ compact_request = rpc.CompactionRequest( revision=revision, physical=physical, ) await self.kvstub.Compact( compact_request, timeout=self._timeout, metadata=self.metadata, )
[docs] @_handle_errors @_ensure_connected async def defragment(self): """Defragment a member's backend database to recover storage space.""" defrag_request = rpc.DefragmentRequest() await self.maintenancestub.Defragment( defrag_request, timeout=self._timeout, metadata=self.metadata, )
[docs] @_handle_errors @_ensure_connected async def hash(self): """Return the hash of the local KV state. :return: KV state hash. """ hash_request = rpc.HashRequest() return (await self.maintenancestub.Hash(hash_request)).hash
[docs] @_handle_errors @_ensure_connected async def create_alarm(self, member_id=0): """Create an alarm. If no member id is given, the alarm is activated for all the members of the cluster. Only the `no space` alarm can be raised. :param member_id: The cluster member ID to create an alarm to. If 0, the alarm is created for all the members of the cluster. :return: A sequence of :class:`~aetcd.client.Alarm`. """ alarm_request = self._build_alarm_request('activate', member_id, 'no space') alarm_response = await self.maintenancestub.Alarm( alarm_request, timeout=self._timeout, metadata=self.metadata, ) return [ Alarm(alarm.alarm, alarm.memberID) for alarm in alarm_response.alarms ]
[docs] @_handle_errors @_ensure_connected async def list_alarms(self, member_id=0, alarm_type='none'): """List the activated alarms. :param member_id: The cluster member ID. :param alarm_type: The cluster member ID to create an alarm to. If 0, the alarm is created for all the members of the cluster. :return: A sequence of :class:`~aetcd.client.Alarm`. """ alarm_request = self._build_alarm_request('get', member_id, alarm_type) alarm_response = await self.maintenancestub.Alarm( alarm_request, timeout=self._timeout, metadata=self.metadata, ) for alarm in alarm_response.alarms: yield Alarm(alarm.alarm, alarm.memberID)
[docs] @_handle_errors @_ensure_connected async def disarm_alarm(self, member_id=0): """Cancel an alarm. :param member_id: The cluster member id to cancel an alarm. If 0, the alarm is canceled for all the members of the cluster. :return: A sequence of :class:`~aetcd.client.Alarm`. """ alarm_request = self._build_alarm_request('deactivate', member_id, 'no space') alarm_response = await self.maintenancestub.Alarm( alarm_request, timeout=self._timeout, metadata=self.metadata, ) return [ Alarm(alarm.alarm, alarm.memberID) for alarm in alarm_response.alarms ]
[docs] @_handle_errors @_ensure_connected async def snapshot(self, file_obj): """Take a snapshot of the database. :param file_obj: A file-like object to write the database contents in. """ snapshot_request = rpc.SnapshotRequest() snapshot_responses = self.maintenancestub.Snapshot( snapshot_request, timeout=self._timeout, metadata=self.metadata, ) async for response in snapshot_responses: file_obj.write(response.blob)
@staticmethod def _build_get_range_request( key: bytes, range_end: typing.Optional[bytes] = None, limit: typing.Optional[int] = None, revision: typing.Optional[int] = None, sort_order: typing.Optional[str] = None, sort_target: str = 'key', serializable: bool = False, keys_only: bool = False, count_only: typing.Optional[int] = None, min_mod_revision: typing.Optional[int] = None, max_mod_revision: typing.Optional[int] = None, min_create_revision: typing.Optional[int] = None, max_create_revision: typing.Optional[int] = None, ) -> rpc.RangeRequest: # TODO: Add missing request parameters: limit, revision, count_only, # mid_mod_revision, max_mod_revision, min_create_revision, max_create_revision range_request = rpc.RangeRequest() range_request.key = key if range_end is not None: range_request.range_end = range_end if sort_order is None: range_request.sort_order = rpc.RangeRequest.NONE elif sort_order == 'ascend': range_request.sort_order = rpc.RangeRequest.ASCEND elif sort_order == 'descend': range_request.sort_order = rpc.RangeRequest.DESCEND else: raise ValueError(f'unknown sort order: {sort_order!r}') if sort_target is None or sort_target == 'key': range_request.sort_target = rpc.RangeRequest.KEY elif sort_target == 'version': range_request.sort_target = rpc.RangeRequest.VERSION elif sort_target == 'create': range_request.sort_target = rpc.RangeRequest.CREATE elif sort_target == 'mod': range_request.sort_target = rpc.RangeRequest.MOD elif sort_target == 'value': range_request.sort_target = rpc.RangeRequest.VALUE else: raise ValueError('sort_target must be one of "key", ' '"version", "create", "mod" or "value"') range_request.serializable = serializable range_request.keys_only = keys_only return range_request @staticmethod def _build_put_request( key: bytes, value: bytes, lease: typing.Optional[typing.Union[int, leases.Lease]] = None, prev_kv: bool = False, ignore_value: bool = False, ignore_lease: bool = False, ): # TODO: Add missing request parameters: ignore_value, ignore_lease put_request = rpc.PutRequest() put_request.key = key put_request.value = value put_request.lease = utils.lease_to_id(lease) put_request.prev_kv = prev_kv return put_request @staticmethod def _build_delete_request( key: bytes, range_end: typing.Optional[bytes] = None, prev_kv: bool = False, ): delete_request = rpc.DeleteRangeRequest() delete_request.key = key if range_end is not None: delete_request.range_end = range_end delete_request.prev_kv = prev_kv return delete_request def _ops_to_requests(self, ops): """Return a list of gRPC requests. Returns list from an input list of etcd3.transactions.{Put, Get, Delete, Txn} objects. """ request_ops = [] for op in ops: if isinstance(op, transactions.Put): request = self._build_put_request(op.key, op.value, op.lease, op.prev_kv) request_op = rpc.RequestOp(request_put=request) request_ops.append(request_op) elif isinstance(op, transactions.Get): request = self._build_get_range_request(op.key, op.range_end) request_op = rpc.RequestOp(request_range=request) request_ops.append(request_op) elif isinstance(op, transactions.Delete): request = self._build_delete_request(op.key, op.range_end, op.prev_kv) request_op = rpc.RequestOp(request_delete_range=request) request_ops.append(request_op) elif isinstance(op, transactions.Txn): compare = [c.build_message() for c in op.compare] success_ops = self._ops_to_requests(op.success) failure_ops = self._ops_to_requests(op.failure) request = rpc.TxnRequest( compare=compare, success=success_ops, failure=failure_ops, ) request_op = rpc.RequestOp(request_txn=request) request_ops.append(request_op) else: raise Exception(f'unknown request class {op.__class__!r}') return request_ops @staticmethod def _build_alarm_request(alarm_action, member_id, alarm_type): alarm_request = rpc.AlarmRequest() if alarm_action == 'get': alarm_request.action = rpc.AlarmRequest.GET elif alarm_action == 'activate': alarm_request.action = rpc.AlarmRequest.ACTIVATE elif alarm_action == 'deactivate': alarm_request.action = rpc.AlarmRequest.DEACTIVATE else: raise ValueError(f'unknown alarm action: {alarm_action!r}') alarm_request.memberID = member_id if alarm_type == 'none': alarm_request.alarm = rpc.NONE elif alarm_type == 'no space': alarm_request.alarm = rpc.NOSPACE else: raise ValueError(f'unknown alarm type: {alarm_type!r}') return alarm_request