Quickstart#
Runtime#
generic-connection-pool
supports synchronous and asynchronous connection pools. To instantiate one
create ConnectionPool
for threading runtime or
ConnectionPool
for asynchronous runtime:
import socket
from ipaddress import IPv4Address
from typing import Tuple
from generic_connection_pool.contrib.socket import TcpSocketConnectionManager
from generic_connection_pool.threading import ConnectionPool
Port = int
Endpoint = Tuple[IPv4Address, Port]
Connection = socket.socket
pool = ConnectionPool[Endpoint, Connection](
TcpSocketConnectionManager(),
idle_timeout=30.0,
max_lifetime=600.0,
min_idle=3,
max_size=20,
total_max_size=100,
background_collector=True,
)
Connection manager#
Connection pool implements logic common for any connection. Logic specific to a particular
connection type is implemented by a connection manager. It defines connection lifecycle
(how to create, dispose a connection or check its aliveness).
For more information see BaseConnectionManager
or BaseConnectionManager
:
IpAddress = IPv4Address
Port = int
TcpEndpoint = Tuple[IpAddress, Port]
class TcpSocketConnectionManager(BaseConnectionManager[TcpEndpoint, socket.socket]):
def create(self, endpoint: TcpEndpoint, timeout: Optional[float] = None) -> socket.socket:
addr, port = endpoint
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
sock.connect((str(addr), port))
return sock
def dispose(self, endpoint: TcpEndpoint, conn: socket.socket, timeout: Optional[float] = None) -> None:
conn.shutdown(socket.SHUT_RDWR)
conn.close()
You can use one of the predefined connection managers or implement your own one. How to implement custom connection manager see
Connection acquiring#
After the pool is instantiated a connection can be acquired using acquire()
and released using release()
.
To get rid of boilerplate code the connection pool supports automated connection acquiring using
connection()
returning a context manager:
with pool.connection(endpoint=(addr, port), timeout=5.0) as sock:
sock.sendall(...)
response = sock.recv(...)
Connection aliveness checks#
Connection manager provides the api for connection aliveness checks.
To implement that override method check_aliveness()
.
The method must return True
if connection is alive and False
otherwise:
DbEndpoint = str
Connection = psycopg2.extensions.connection
class DbConnectionManager(BaseConnectionManager[DbEndpoint, Connection]):
...
def check_aliveness(self, endpoint: DbEndpoint, conn: Connection, timeout: Optional[float] = None) -> bool:
try:
with conn.cursor() as cur:
cur.execute("SELECT 1;")
cur.fetchone()
except (psycopg2.Error, OSError):
return False
return True
Custom connection manager#
To implement a custom connection manager you must override two methods:
create()
and
dispose()
.
The other methods are optional.
The following example illustrate how to implement a custom connection manager for redis.
import socket
from ipaddress import IPv4Address
from typing import Optional, Tuple
from generic_connection_pool.threading import BaseConnectionManager, ConnectionPool
Port = int
Endpoint = Tuple[IPv4Address, Port]
Connection = socket.socket
class RedisConnectionManager(BaseConnectionManager[Endpoint, socket.socket]):
def __init__(self, username: str, password: str):
self._username = username
self._password = password
def create(self, endpoint: Endpoint, timeout: Optional[float] = None) -> socket.socket:
addr, port = endpoint
sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
sock.connect((str(addr), port))
if (resp := self.cmd(sock, f'AUTH {self._username} {self._password}')) != '+OK':
raise RuntimeError(f"authentication failed: {resp}")
return sock
def cmd(self, sock: socket.socket, cmd: str) -> str:
sock.sendall(f'{cmd}\r\n'.encode())
response = sock.recv(1024)
return response.rstrip(b'\r\n').decode()
def dispose(self, endpoint: Endpoint, conn: socket.socket, timeout: Optional[float] = None) -> None:
try:
conn.shutdown(socket.SHUT_RDWR)
except OSError:
pass
conn.close()
def check_aliveness(self, endpoint: Endpoint, conn: socket.socket, timeout: Optional[float] = None) -> bool:
try:
if self.cmd(conn, 'ping') != '+PONG':
return False
except OSError:
return False
return True
redis_pool = ConnectionPool[Endpoint, Connection](
RedisConnectionManager('', 'secret'),
idle_timeout=30.0,
max_lifetime=600.0,
min_idle=3,
max_size=20,
total_max_size=100,
background_collector=True,
)
def command(addr: IPv4Address, port: int, cmd: str) -> None:
with redis_pool.connection(endpoint=(addr, port), timeout=5.0) as sock:
sock.sendall(cmd.encode() + b'\r\n')
response = sock.recv(1024)
print(response.decode())
try:
command(IPv4Address('127.0.0.1'), 6379, 'CLIENT ID') # tcp connection opened
command(IPv4Address('127.0.0.1'), 6379, 'CLIENT INFO') # tcp connection reused
finally:
redis_pool.close()
Connection manager allows to define methods to be called on connection acquire, release or when a connection determined to be dead. That helps to log pool actions or collect metrics. The following examples illustrate how to collect pool metrics and export them to prometheus.
import ssl
import time
from ssl import SSLSocket
from typing import Any, Dict, Tuple
import prometheus_client as prom
from generic_connection_pool.contrib.socket import SslSocketConnectionManager
from generic_connection_pool.threading import ConnectionPool
Hostname = str
Port = int
Endpoint = Tuple[Hostname, Port]
acquire_latency_hist = prom.Histogram('acquire_latency', 'Connections acquire latency', labelnames=['hostname'])
acquire_total = prom.Counter('acquire_total', 'Connections acquire count', labelnames=['hostname'])
dead_conn_total = prom.Counter('dead_conn_total', 'Dead connections count', labelnames=['hostname'])
class ObservableConnectionManager(SslSocketConnectionManager):
def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self._acquires: Dict[SSLSocket, float] = {}
def on_acquire(self, endpoint: Endpoint, conn: SSLSocket) -> None:
hostname, port = endpoint
acquire_total.labels(hostname).inc()
self._acquires[conn] = time.time()
def on_release(self, endpoint: Endpoint, conn: SSLSocket) -> None:
hostname, port = endpoint
acquired_at = self._acquires.pop(conn)
acquire_latency_hist.labels(hostname).observe(time.time() - acquired_at)
def on_connection_dead(self, endpoint: Endpoint, conn: SSLSocket) -> None:
hostname, port = endpoint
dead_conn_total.labels(hostname).inc()
http_pool = ConnectionPool[Endpoint, SSLSocket](
ObservableConnectionManager(ssl.create_default_context()),
)
Examples#
TCP connection pool#
The following example illustrate how to use synchronous tcp connection pool.
import socket
from ipaddress import IPv4Address, IPv6Address
from typing import Tuple, Union
from generic_connection_pool.contrib.socket import TcpSocketConnectionManager
from generic_connection_pool.threading import ConnectionPool
Port = int
IpAddress = Union[IPv4Address, IPv6Address]
Endpoint = Tuple[IpAddress, Port]
Connection = socket.socket
redis_pool = ConnectionPool[Endpoint, Connection](
TcpSocketConnectionManager(),
idle_timeout=30.0,
max_lifetime=600.0,
min_idle=3,
max_size=20,
total_max_size=100,
background_collector=True,
)
def command(addr: IpAddress, port: int, cmd: str) -> None:
with redis_pool.connection(endpoint=(addr, port), timeout=5.0) as sock:
sock.sendall(cmd.encode() + b'\n')
response = sock.recv(1024)
print(response.decode())
try:
command(IPv4Address('127.0.0.1'), 6379, 'CLIENT ID') # tcp connection opened
command(IPv4Address('127.0.0.1'), 6379, 'INFO') # tcp connection reused
finally:
redis_pool.close()
SSL connection pool#
The library also provide ssl based connection manager. The following example illustrate how to create ssl connection pool.
import ssl
import urllib.parse
from http.client import HTTPResponse
from typing import Tuple
from generic_connection_pool.contrib.socket import SslSocketConnectionManager
from generic_connection_pool.threading import ConnectionPool
Hostname = str
Port = int
Endpoint = Tuple[Hostname, Port]
Connection = ssl.SSLSocket
http_pool = ConnectionPool[Endpoint, Connection](
SslSocketConnectionManager(ssl.create_default_context()),
idle_timeout=30.0,
max_lifetime=600.0,
min_idle=3,
max_size=20,
total_max_size=100,
background_collector=True,
)
def fetch(url: str, timeout: float = 5.0) -> None:
url = urllib.parse.urlsplit(url)
if url.hostname is None:
raise ValueError
port = url.port or 443 if url.scheme == 'https' else 80
with http_pool.connection(endpoint=(url.hostname, port), timeout=timeout) as sock:
request = (
'GET {path} HTTP/1.1\r\n'
'Host: {host}\r\n'
'\r\n'
'\r\n'
).format(host=url.hostname, path=url.path)
sock.write(request.encode())
response = HTTPResponse(sock)
response.begin()
status, body = response.getcode(), response.read(response.length)
print(status)
print(body)
try:
fetch('https://en.wikipedia.org/wiki/HTTP') # http connection opened
fetch('https://en.wikipedia.org/wiki/Python_(programming_language)') # http connection reused
finally:
http_pool.close()
Asynchronous connection pool#
Asynchronous connection pool api looks pretty much the same:
import asyncio
import urllib.parse
from typing import Tuple
from generic_connection_pool.asyncio import ConnectionPool
from generic_connection_pool.contrib.socket_async import TcpStreamConnectionManager
Hostname = str
Port = int
Endpoint = Tuple[Hostname, Port]
Connection = Tuple[asyncio.StreamReader, asyncio.StreamWriter]
async def main() -> None:
http_pool = ConnectionPool[Endpoint, Connection](
TcpStreamConnectionManager(ssl=True),
idle_timeout=30.0,
max_lifetime=600.0,
min_idle=3,
max_size=20,
total_max_size=100,
background_collector=True,
)
async def fetch(url: str, timeout: float = 5.0) -> None:
url = urllib.parse.urlsplit(url)
if url.hostname is None:
raise ValueError
port = url.port or 443 if url.scheme == 'https' else 80
async with http_pool.connection(endpoint=(url.hostname, port), timeout=timeout) as (reader, writer):
request = (
'GET {path} HTTP/1.1\r\n'
'Host: {host}\r\n'
'\r\n'
'\r\n'
).format(host=url.hostname, path=url.path)
writer.write(request.encode())
await writer.drain()
status_line = await reader.readuntil(b'\r\n')
headers = await reader.readuntil(b'\r\n\r\n')
headers = {
pair[0].lower(): pair[1]
for header in headers.split(b'\r\n')
if len(pair := header.decode().split(':', maxsplit=1)) == 2
}
chunks = []
content_length = int(headers['content-length'])
while content_length:
chunk = await reader.read(content_length)
chunks.append(chunk)
content_length -= len(chunk)
print(status_line)
print(b''.join(chunks))
try:
await fetch('https://en.wikipedia.org/wiki/HTTP') # http connection opened
await fetch('https://en.wikipedia.org/wiki/Python_(programming_language)') # http connection reused
finally:
await http_pool.close()
asyncio.run(main())
DB connection pool#
Connection pool can manage any connection including database ones. The library provides
connection manager for postgres DbConnectionManager
.
import psycopg2.extensions
from generic_connection_pool.contrib.psycopg2 import DbConnectionManager
from generic_connection_pool.threading import ConnectionPool
Endpoint = str
Connection = psycopg2.extensions.connection
dsn_params = dict(dbname='postgres', user='postgres', password='secret')
pg_pool = ConnectionPool[Endpoint, Connection](
DbConnectionManager(
dsn_params={
'master': dict(dsn_params, host='db-master.local'),
'replica-1': dict(dsn_params, host='db-replica-1.local'),
'replica-2': dict(dsn_params, host='db-replica-2.local'),
},
),
acquire_timeout=2.0,
idle_timeout=60.0,
max_lifetime=600.0,
min_idle=3,
max_size=10,
total_max_size=15,
background_collector=True,
)
try:
# connection opened
with pg_pool.connection(endpoint='master') as conn:
cur = conn.cursor()
cur.execute("SELECT inet_server_addr()")
print(cur.fetchone())
# connection opened
with pg_pool.connection(endpoint='replica-1') as conn:
cur = conn.cursor()
cur.execute("SELECT inet_server_addr()")
print(cur.fetchone())
# connection reused
with pg_pool.connection(endpoint='master') as conn:
cur = conn.cursor()
cur.execute("SELECT inet_server_addr()")
print(cur.fetchone())
finally:
pg_pool.close()