Source code for generic_connection_pool.contrib.unix_async
"""
Asynchronous unix specific functionality.
"""
import asyncio
import pathlib
import socket
import sys
from generic_connection_pool.contrib.socket_async import BaseConnectionManager, SocketAlivenessCheckingMixin, Stream
from generic_connection_pool.contrib.socket_async import StreamAlivenessCheckingMixin
if sys.platform not in ('linux', 'darwin', 'freebsd'):
raise AssertionError('this module is only supported by unix platforms')
UnixSocketEndpoint = pathlib.Path
[docs]class UnixSocketConnectionManager(
SocketAlivenessCheckingMixin[UnixSocketEndpoint],
BaseConnectionManager[UnixSocketEndpoint, socket.socket],
):
"""
Asynchronous unix socket connection manager.
"""
[docs] async def create(self, endpoint: UnixSocketEndpoint) -> socket.socket:
loop = asyncio.get_running_loop()
sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
sock.setblocking(False)
await loop.sock_connect(sock, address=(str(endpoint)))
return sock
[docs] async def dispose(self, endpoint: UnixSocketEndpoint, conn: socket.socket) -> None:
try:
conn.shutdown(socket.SHUT_RDWR)
except OSError:
pass
conn.close()
[docs]class UnixSocketStreamConnectionManager(
StreamAlivenessCheckingMixin[UnixSocketEndpoint],
BaseConnectionManager[UnixSocketEndpoint, Stream],
):
"""
Asynchronous unix socket stream connection manager.
"""
[docs] async def create(self, endpoint: UnixSocketEndpoint) -> Stream:
reader, writer = await asyncio.open_unix_connection(path=endpoint)
return reader, writer
[docs] async def dispose(self, endpoint: UnixSocketEndpoint, conn: Stream) -> None:
reader, writer = conn
if writer.can_write_eof():
writer.write_eof()
writer.close()
await writer.wait_closed()