Quickstart#

Runtime#

generic-connection-pool supports synchronous and asynchronous connection pools. To instantiate one create ConnectionPool for threading runtime or ConnectionPool for asynchronous runtime:

import socket
from ipaddress import IPv4Address
from typing import Tuple

from generic_connection_pool.contrib.socket import TcpSocketConnectionManager
from generic_connection_pool.threading import ConnectionPool

Port = int
Endpoint = Tuple[IPv4Address, Port]
Connection = socket.socket

pool = ConnectionPool[Endpoint, Connection](
    TcpSocketConnectionManager(),
    idle_timeout=30.0,
    max_lifetime=600.0,
    min_idle=3,
    max_size=20,
    total_max_size=100,
    background_collector=True,
)

Connection manager#

Connection pool implements logic common for any connection. Logic specific to a particular connection type is implemented by a connection manager. It defines connection lifecycle (how to create, dispose a connection or check its aliveness). For more information see BaseConnectionManager or BaseConnectionManager:

IpAddress = IPv4Address
Port = int
TcpEndpoint = Tuple[IpAddress, Port]

class TcpSocketConnectionManager(BaseConnectionManager[TcpEndpoint, socket.socket]):
    def create(self, endpoint: TcpEndpoint, timeout: Optional[float] = None) -> socket.socket:
        addr, port = endpoint

        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        sock.connect((str(addr), port))

        return sock

    def dispose(self, endpoint: TcpEndpoint, conn: socket.socket, timeout: Optional[float] = None) -> None:
        conn.shutdown(socket.SHUT_RDWR)
        conn.close()

You can use one of the predefined connection managers or implement your own one. How to implement custom connection manager see

Connection acquiring#

After the pool is instantiated a connection can be acquired using acquire() and released using release(). To get rid of boilerplate code the connection pool supports automated connection acquiring using connection() returning a context manager:

with pool.connection(endpoint=(addr, port), timeout=5.0) as sock:
    sock.sendall(...)
    response = sock.recv(...)

Connection aliveness checks#

Connection manager provides the api for connection aliveness checks. To implement that override method check_aliveness(). The method must return True if connection is alive and False otherwise:

DbEndpoint = str
Connection = psycopg2.extensions.connection

class DbConnectionManager(BaseConnectionManager[DbEndpoint, Connection]):
    ...

    def check_aliveness(self, endpoint: DbEndpoint, conn: Connection, timeout: Optional[float] = None) -> bool:
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT 1;")
                cur.fetchone()
        except (psycopg2.Error, OSError):
            return False

        return True

Custom connection manager#

To implement a custom connection manager you must override two methods: create() and dispose(). The other methods are optional.

The following example illustrate how to implement a custom connection manager for redis.

import socket
from ipaddress import IPv4Address
from typing import Optional, Tuple

from generic_connection_pool.threading import BaseConnectionManager, ConnectionPool

Port = int
Endpoint = Tuple[IPv4Address, Port]
Connection = socket.socket


class RedisConnectionManager(BaseConnectionManager[Endpoint, socket.socket]):
    def __init__(self, username: str, password: str):
        self._username = username
        self._password = password

    def create(self, endpoint: Endpoint, timeout: Optional[float] = None) -> socket.socket:
        addr, port = endpoint

        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        sock.connect((str(addr), port))

        if (resp := self.cmd(sock, f'AUTH {self._username} {self._password}')) != '+OK':
            raise RuntimeError(f"authentication failed: {resp}")

        return sock

    def cmd(self, sock: socket.socket, cmd: str) -> str:
        sock.sendall(f'{cmd}\r\n'.encode())

        response = sock.recv(1024)
        return response.rstrip(b'\r\n').decode()

    def dispose(self, endpoint: Endpoint, conn: socket.socket, timeout: Optional[float] = None) -> None:
        try:
            conn.shutdown(socket.SHUT_RDWR)
        except OSError:
            pass

        conn.close()

    def check_aliveness(self, endpoint: Endpoint, conn: socket.socket, timeout: Optional[float] = None) -> bool:
        try:
            if self.cmd(conn, 'ping') != '+PONG':
                return False
        except OSError:
            return False

        return True


redis_pool = ConnectionPool[Endpoint, Connection](
    RedisConnectionManager('', 'secret'),
    idle_timeout=30.0,
    max_lifetime=600.0,
    min_idle=3,
    max_size=20,
    total_max_size=100,
    background_collector=True,
)


def command(addr: IPv4Address, port: int, cmd: str) -> None:
    with redis_pool.connection(endpoint=(addr, port), timeout=5.0) as sock:
        sock.sendall(cmd.encode() + b'\r\n')
        response = sock.recv(1024)

        print(response.decode())


try:
    command(IPv4Address('127.0.0.1'), 6379, 'CLIENT ID')  # tcp connection opened
    command(IPv4Address('127.0.0.1'), 6379, 'CLIENT INFO')  # tcp connection reused
finally:
    redis_pool.close()

Connection manager allows to define methods to be called on connection acquire, release or when a connection determined to be dead. That helps to log pool actions or collect metrics. The following examples illustrate how to collect pool metrics and export them to prometheus.

import ssl
import time
from ssl import SSLSocket
from typing import Any, Dict, Tuple

import prometheus_client as prom

from generic_connection_pool.contrib.socket import SslSocketConnectionManager
from generic_connection_pool.threading import ConnectionPool

Hostname = str
Port = int
Endpoint = Tuple[Hostname, Port]

acquire_latency_hist = prom.Histogram('acquire_latency', 'Connections acquire latency', labelnames=['hostname'])
acquire_total = prom.Counter('acquire_total', 'Connections acquire count', labelnames=['hostname'])
dead_conn_total = prom.Counter('dead_conn_total', 'Dead connections count', labelnames=['hostname'])


class ObservableConnectionManager(SslSocketConnectionManager):

    def __init__(self, *args: Any, **kwargs: Any):
        super().__init__(*args, **kwargs)
        self._acquires: Dict[SSLSocket, float] = {}

    def on_acquire(self, endpoint: Endpoint, conn: SSLSocket) -> None:
        hostname, port = endpoint

        acquire_total.labels(hostname).inc()
        self._acquires[conn] = time.time()

    def on_release(self, endpoint: Endpoint, conn: SSLSocket) -> None:
        hostname, port = endpoint

        acquired_at = self._acquires.pop(conn)
        acquire_latency_hist.labels(hostname).observe(time.time() - acquired_at)

    def on_connection_dead(self, endpoint: Endpoint, conn: SSLSocket) -> None:
        hostname, port = endpoint

        dead_conn_total.labels(hostname).inc()


http_pool = ConnectionPool[Endpoint, SSLSocket](
    ObservableConnectionManager(ssl.create_default_context()),
)

Examples#

TCP connection pool#

The following example illustrate how to use synchronous tcp connection pool.

import socket
from ipaddress import IPv4Address, IPv6Address
from typing import Tuple, Union

from generic_connection_pool.contrib.socket import TcpSocketConnectionManager
from generic_connection_pool.threading import ConnectionPool

Port = int
IpAddress = Union[IPv4Address, IPv6Address]
Endpoint = Tuple[IpAddress, Port]
Connection = socket.socket


redis_pool = ConnectionPool[Endpoint, Connection](
    TcpSocketConnectionManager(),
    idle_timeout=30.0,
    max_lifetime=600.0,
    min_idle=3,
    max_size=20,
    total_max_size=100,
    background_collector=True,
)


def command(addr: IpAddress, port: int, cmd: str) -> None:
    with redis_pool.connection(endpoint=(addr, port), timeout=5.0) as sock:
        sock.sendall(cmd.encode() + b'\n')
        response = sock.recv(1024)

        print(response.decode())


try:
    command(IPv4Address('127.0.0.1'), 6379, 'CLIENT ID')  # tcp connection opened
    command(IPv4Address('127.0.0.1'), 6379, 'INFO')  # tcp connection reused
finally:
    redis_pool.close()

SSL connection pool#

The library also provide ssl based connection manager. The following example illustrate how to create ssl connection pool.

import ssl
import urllib.parse
from http.client import HTTPResponse
from typing import Tuple

from generic_connection_pool.contrib.socket import SslSocketConnectionManager
from generic_connection_pool.threading import ConnectionPool

Hostname = str
Port = int
Endpoint = Tuple[Hostname, Port]
Connection = ssl.SSLSocket


http_pool = ConnectionPool[Endpoint, Connection](
    SslSocketConnectionManager(ssl.create_default_context()),
    idle_timeout=30.0,
    max_lifetime=600.0,
    min_idle=3,
    max_size=20,
    total_max_size=100,
    background_collector=True,
)


def fetch(url: str, timeout: float = 5.0) -> None:
    url = urllib.parse.urlsplit(url)
    if url.hostname is None:
        raise ValueError

    port = url.port or 443 if url.scheme == 'https' else 80

    with http_pool.connection(endpoint=(url.hostname, port), timeout=timeout) as sock:
        request = (
            'GET {path} HTTP/1.1\r\n'
            'Host: {host}\r\n'
            '\r\n'
            '\r\n'
        ).format(host=url.hostname, path=url.path)

        sock.write(request.encode())

        response = HTTPResponse(sock)
        response.begin()
        status, body = response.getcode(), response.read(response.length)

        print(status)
        print(body)


try:
    fetch('https://en.wikipedia.org/wiki/HTTP')  # http connection opened
    fetch('https://en.wikipedia.org/wiki/Python_(programming_language)')  # http connection reused
finally:
    http_pool.close()

Asynchronous connection pool#

Asynchronous connection pool api looks pretty much the same:

import asyncio
import urllib.parse
from typing import Tuple

from generic_connection_pool.asyncio import ConnectionPool
from generic_connection_pool.contrib.socket_async import TcpStreamConnectionManager

Hostname = str
Port = int
Endpoint = Tuple[Hostname, Port]
Connection = Tuple[asyncio.StreamReader, asyncio.StreamWriter]


async def main() -> None:
    http_pool = ConnectionPool[Endpoint, Connection](
        TcpStreamConnectionManager(ssl=True),
        idle_timeout=30.0,
        max_lifetime=600.0,
        min_idle=3,
        max_size=20,
        total_max_size=100,
        background_collector=True,
    )

    async def fetch(url: str, timeout: float = 5.0) -> None:
        url = urllib.parse.urlsplit(url)
        if url.hostname is None:
            raise ValueError

        port = url.port or 443 if url.scheme == 'https' else 80

        async with http_pool.connection(endpoint=(url.hostname, port), timeout=timeout) as (reader, writer):
            request = (
                'GET {path} HTTP/1.1\r\n'
                'Host: {host}\r\n'
                '\r\n'
                '\r\n'
            ).format(host=url.hostname, path=url.path)

            writer.write(request.encode())
            await writer.drain()

            status_line = await reader.readuntil(b'\r\n')
            headers = await reader.readuntil(b'\r\n\r\n')
            headers = {
                pair[0].lower(): pair[1]
                for header in headers.split(b'\r\n')
                if len(pair := header.decode().split(':', maxsplit=1)) == 2
            }

            chunks = []
            content_length = int(headers['content-length'])
            while content_length:
                chunk = await reader.read(content_length)
                chunks.append(chunk)
                content_length -= len(chunk)

            print(status_line)
            print(b''.join(chunks))

    try:
        await fetch('https://en.wikipedia.org/wiki/HTTP')  # http connection opened
        await fetch('https://en.wikipedia.org/wiki/Python_(programming_language)')  # http connection reused
    finally:
        await http_pool.close()

asyncio.run(main())

DB connection pool#

Connection pool can manage any connection including database ones. The library provides connection manager for postgres DbConnectionManager.

import psycopg2.extensions

from generic_connection_pool.contrib.psycopg2 import DbConnectionManager
from generic_connection_pool.threading import ConnectionPool

Endpoint = str
Connection = psycopg2.extensions.connection


dsn_params = dict(dbname='postgres', user='postgres', password='secret')

pg_pool = ConnectionPool[Endpoint, Connection](
    DbConnectionManager(
        dsn_params={
            'master': dict(dsn_params, host='db-master.local'),
            'replica-1': dict(dsn_params, host='db-replica-1.local'),
            'replica-2': dict(dsn_params, host='db-replica-2.local'),
        },
    ),
    acquire_timeout=2.0,
    idle_timeout=60.0,
    max_lifetime=600.0,
    min_idle=3,
    max_size=10,
    total_max_size=15,
    background_collector=True,
)

try:
    # connection opened
    with pg_pool.connection(endpoint='master') as conn:
        cur = conn.cursor()
        cur.execute("SELECT inet_server_addr()")
        print(cur.fetchone())

    # connection opened
    with pg_pool.connection(endpoint='replica-1') as conn:
        cur = conn.cursor()
        cur.execute("SELECT inet_server_addr()")
        print(cur.fetchone())

    # connection reused
    with pg_pool.connection(endpoint='master') as conn:
        cur = conn.cursor()
        cur.execute("SELECT inet_server_addr()")
        print(cur.fetchone())

finally:
    pg_pool.close()