Source code for generic_connection_pool.contrib.unix

"""
Unix specific functionality.
"""

import errno
import pathlib
import socket
import sys
from typing import Generic, Optional

from generic_connection_pool.contrib.socket import BaseConnectionManager
from generic_connection_pool.threading import EndpointT

from .socket import socket_timeout

if sys.platform not in ('linux', 'darwin', 'freebsd'):
    raise AssertionError('this module is only supported by unix platforms')


UnixSocketEndpoint = pathlib.Path


[docs]class CheckSocketAlivenessMixin(Generic[EndpointT]): """ Socket aliveness checking mixin. """ def check_aliveness(self, endpoint: EndpointT, conn: socket.socket, timeout: Optional[float] = None) -> bool: try: if conn.recv(1, socket.MSG_PEEK | socket.MSG_DONTWAIT) == b'': return False except BlockingIOError as exc: if exc.errno != errno.EAGAIN: raise except OSError: return False return True
[docs]class UnixSocketConnectionManager( CheckSocketAlivenessMixin[UnixSocketEndpoint], BaseConnectionManager[UnixSocketEndpoint, socket.socket], ): """ Unix socket connection manager. """
[docs] def create(self, endpoint: UnixSocketEndpoint, timeout: Optional[float] = None) -> socket.socket: sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) with socket_timeout(sock, timeout): sock.connect(str(endpoint)) return sock
[docs] def dispose(self, endpoint: UnixSocketEndpoint, conn: socket.socket, timeout: Optional[float] = None) -> None: try: conn.shutdown(socket.SHUT_RDWR) except OSError: pass conn.close()