db.py 1.0 KB

123456789101112131415161718192021222324252627282930313233343536
  1. import asyncio
  2. from contextlib import asynccontextmanager
  3. import asyncpg
  4. class Connection(asyncpg.Connection):
  5. async def reset(self, *, timeout=None):
  6. pass
  7. class Pool:
  8. def __init__(self, connect_url, max_size=10, connection_class=None):
  9. self._connect_url = connect_url
  10. self._connection_class = connection_class or Connection
  11. self._queue = asyncio.LifoQueue(max_size)
  12. def __await__(self):
  13. return self._async_init__().__await__()
  14. async def _async_init__(self):
  15. for _ in range(self._queue.maxsize):
  16. self._queue.put_nowait(await asyncpg.connect(self._connect_url, connection_class=self._connection_class))
  17. return self
  18. @asynccontextmanager
  19. async def acquire(self):
  20. conn = await self._queue.get()
  21. try:
  22. yield conn
  23. finally:
  24. self._queue.put_nowait(conn)
  25. async def close(self):
  26. for _ in range(self._queue.maxsize):
  27. conn = await self._queue.get()
  28. await conn.close()