Source code for generic_connection_pool.common

import dataclasses as dc
import logging
import time
from collections import OrderedDict
from enum import IntEnum
from typing import Dict, Generic, Hashable, Optional, Tuple, TypeVar

from . import exceptions
from .rankmap import RankMap

logger = logging.getLogger(__package__)

EndpointT = TypeVar('EndpointT', bound=Hashable)
ConnectionT = TypeVar('ConnectionT', bound=Hashable)


class Timer:
    """
    Timer.

    :param timeout: timer timeout
    """

    def __init__(self, timeout: Optional[float]):
        self._timeout = timeout
        self._started_at = time.monotonic()

    @property
    def elapsed(self) -> float:
        """
        Returns number of seconds since the timer was created.
        """

        return time.monotonic() - self._started_at

    @property
    def remains(self) -> Optional[float]:
        """
        Returns number of seconds until the timeout.
        """

        if self._timeout is None:
            return None

        return max(0.0, self._timeout - self.elapsed)

    @property
    def timedout(self) -> bool:
        """
        Returns if the timer timed-out.
        """

        if self._timeout is None:
            return False

        return self.elapsed > self._timeout


@dc.dataclass
class ConnectionInfo(Generic[EndpointT, ConnectionT]):
    endpoint: EndpointT
    conn: ConnectionT
    accessed_at: float = dc.field(default_factory=time.monotonic)
    created_at: float = dc.field(default_factory=time.monotonic)
    acquires: int = 0

    @property
    def lifetime(self) -> float:
        return time.monotonic() - self.created_at

    @property
    def idletime(self) -> float:
        return time.monotonic() - self.accessed_at


class BaseEndpointPool(Generic[EndpointT, ConnectionT]):
    """
    Base endpoint pool. Contains information about connections related to a particular endpoint.
    """

    def __init__(self, max_pool_size: int, max_extra_size: int):
        """
        :param max_pool_size: connection pool max size
        :param max_extra_size: extra connection pool max size
        """

        self._max_pool_size = max_pool_size
        self._max_extra_size = max_extra_size

        # connections from this pool are kept open util theirs lifetime expires;
        # they are acquired using FIFO (round-robin) strategy
        self._pool: OrderedDict[ConnectionT, ConnectionInfo[EndpointT, ConnectionT]] = OrderedDict()
        # connections from this pool are closed after idle-time expires; they are acquired using LIFO strategy
        # to recycle extra connections as soon as possible since they are recycled based on last access time
        self._extra: OrderedDict[ConnectionT, ConnectionInfo[EndpointT, ConnectionT]] = OrderedDict()
        # acquired connections
        self._acquired: Dict[ConnectionT, ConnectionInfo[EndpointT, ConnectionT]] = dict()
        # number of reserved connections (to be established)
        self._reserved = 0

    @property
    def max_size(self) -> int:
        """
        Returns pool max size.
        """

        return self._max_pool_size + self._max_extra_size

    def _has_available_slot(self) -> bool:
        """
        Returns `True` if the pool has available connection slots otherwise `False`.
        """

        return self._size() < self.max_size

    def _is_overflowed(self) -> bool:
        """
        Returns `True` if the pool has extra connections otherwise `False`.
        """

        return self._size() > self._max_pool_size

    def _size(self) -> int:
        return len(self._pool) + len(self._extra) + len(self._acquired) + self._reserved

    def _get_size(self, acquired: Optional[bool] = None) -> int:
        """
        Returns the number of connections.

        :param acquired: if `True` - return the number of acquired connections
                         if `False` - return the number of free connections
                         if `None` - return all connections number (including reserved)

        return: number of connections
        """

        if acquired is None:
            result = self._size()
        elif acquired is True:
            result = len(self._acquired)
        elif acquired is False:
            result = len(self._pool) + len(self._extra)
        else:
            raise AssertionError("unreachable")

        return result

    def _reserve(self) -> bool:
        """
        Reserve one slot in the pool.
        """

        if not self._has_available_slot():
            return False

        self._reserved += 1

        return True

    def _unreserve(self) -> None:
        """
        Un-reserve a pool slot.
        """

        assert self._reserved != 0
        self._reserved -= 1

    def _acquire(self) -> Tuple[Optional[ConnectionInfo[EndpointT, ConnectionT]], bool]:
        """
        Acquires a connection from the pool.
        """

        if self._pool:
            extra = False
            conn, conn_info = self._pool.popitem(last=False)
        elif self._extra:
            extra = True
            conn, conn_info = self._extra.popitem(last=True)
        else:
            return None, False

        self._acquired[conn] = conn_info

        conn_info.acquires += 1
        conn_info.accessed_at = time.monotonic()

        return conn_info, extra

    def _release(self, conn: ConnectionT) -> Tuple[ConnectionInfo[EndpointT, ConnectionT], bool]:
        """
        Releases a connection.

        :param conn: connection to be released
        """

        assert conn in self._acquired, "connection is not acquired"

        conn_info = self._acquired.pop(conn)
        conn_info.accessed_at = time.monotonic()

        free_pool_slots = self._max_pool_size - len(self._pool)
        if len(self._acquired) >= free_pool_slots:
            extra = True
            self._extra[conn] = conn_info
        elif len(self._pool) < self._max_pool_size:
            extra = False
            self._pool[conn] = conn_info
        else:
            raise AssertionError("unreachable")

        return conn_info, extra

    def _detach(self, conn: ConnectionT, acquired: bool = False) -> ConnectionInfo[EndpointT, ConnectionT]:
        """
        Detaches off a connection.

        :param conn: connection to be detached
        :param acquired: is the connection acquired
        """

        if acquired:
            conn_info = self._acquired.pop(conn)
        else:
            if conn in self._pool:
                conn_info = self._pool.pop(conn)
            else:
                conn_info = self._extra.pop(conn)

        return conn_info

    def _attach(self, conn_info: ConnectionInfo[EndpointT, ConnectionT], acquired: bool = False) -> None:
        """
        Attaches a connection to the pool.

        :param conn_info: connection information to be attached
        :param acquired: acquire the connection
        """

        if not self._has_available_slot():
            raise exceptions.ConnectionPoolIsFull

        if acquired:
            self._acquired[conn_info.conn] = conn_info
        else:
            free_pool_slots = self._max_pool_size - len(self._pool)
            if len(self._acquired) >= free_pool_slots:
                self._extra[conn_info.conn] = conn_info
            elif len(self._pool) < self._max_pool_size:
                self._pool[conn_info.conn] = conn_info
            else:
                raise AssertionError("unreachable")


KeyType = TypeVar('KeyType', bound=Hashable)


@dc.dataclass(frozen=True, order=True)
class Event(Generic[KeyType]):
    """
    Connection pool event.

    :param timestamp: event raise time
    :param key: event key (must be equal for the same event)
    """

    timestamp: float = dc.field(hash=False, compare=True)
    key: KeyType = dc.field(hash=True, compare=False)

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, Event):
            return False

        return self.key == other.key


class BaseEventQueue(Generic[KeyType]):
    """
    Connection pool event queue.
    """

    def __init__(self) -> None:
        self._queue: RankMap[KeyType, Event[KeyType]] = RankMap()

    def _insert(self, timestamp: float, key: KeyType) -> None:
        """
        Adds a new event to the queue.
        """

        self._queue.insert_or_replace(key, Event(timestamp, key))

    def _remove(self, key: KeyType) -> None:
        """
        Remove an event from the queue.
        """

        self._queue.remove(key)

    def _clear(self) -> None:
        """
        Clears the queue.
        """

        self._queue.clear()

    def _try_get_next_event(self) -> Tuple[Optional[KeyType], Optional[float]]:
        """
        Tries to pop the next event from the queue.
        If the queue is empty returns `None `
        If the first event has not occurred returns `None` and backoff timeout.

        :return: event data, backoff timeout
        """

        if (event := self._queue.top()) and event.timestamp <= time.monotonic():
            return event.key, 0.0

        backoff = event.timestamp - time.monotonic() if event is not None else None

        return None, backoff

    def _top(self) -> Optional[KeyType]:
        """
        Pops an event from the queue regardless whether the event occurred or not.
        """

        if (event := self._queue.top()) is None:
            return None

        return event.key


class EventType(IntEnum):
    LIFETIME = 1
    IDLETIME = 2


[docs]class BaseConnectionPool(Generic[EndpointT, ConnectionT]): """ Asynchronous connection pool. :param idle_timeout: inactivity time (in seconds) after which an extra connection will be disposed (a connection considered as extra if the number of endpoint connection exceeds ``min_idle``). :param max_lifetime: number of seconds after which any connection will be disposed. :param min_idle: minimum number of connections the pool tries to hold for each endpoint. Connections that exceed that number will be considered as extra and will be disposed after ``idle_timeout`` of inactivity. :param max_size: maximum number of endpoint connections. :param total_max_size: maximum number of all connections in the pool. """ def __init__( self, *, idle_timeout: float = 60.0, max_lifetime: float = 3600.0, min_idle: int = 1, max_size: int = 10, total_max_size: int = 100, ): assert min_idle <= max_size, "min_idle can't be greater than max_size" assert max_size <= total_max_size, "max_size can't be greater than total_max_size" assert idle_timeout <= max_lifetime, "idle_timeout can't be greater than max_lifetime" self._idle_timeout = idle_timeout self._max_lifetime = max_lifetime self._min_idle = min_idle self._max_size = max_size self._total_max_size = total_max_size self._pool_size = 0 def get_size(self) -> int: return self._pool_size @property def idle_timeout(self) -> float: return self._idle_timeout @property def max_lifetime(self) -> float: return self._max_lifetime @property def min_idle(self) -> int: return self._min_idle @property def max_size(self) -> int: return self._max_size @property def total_max_size(self) -> int: return self._total_max_size @property def is_full(self) -> bool: return self._pool_size >= self._total_max_size