12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182 |
- import os
- import multiprocessing
- import asyncpg
- from aiohttp import web
- from sqlalchemy.engine.url import URL
- from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
- from sqlalchemy.orm import sessionmaker
- from .views import (
- json,
- single_database_query_orm,
- multiple_database_queries_orm,
- fortunes,
- updates,
- plaintext,
- single_database_query_raw,
- multiple_database_queries_raw,
- fortunes_raw,
- updates_raw,
- )
- CONNECTION_ORM = os.getenv('CONNECTION', 'ORM').upper() == 'ORM'
- def pg_dsn(dialect=None) -> str:
- """
- :return: DSN url suitable for sqlalchemy and aiopg.
- """
- return str(URL.create(
- database='hello_world',
- password=os.getenv('PGPASS', 'benchmarkdbpass'),
- host='tfb-database',
- port='5432',
- username=os.getenv('PGUSER', 'benchmarkdbuser'),
- drivername='postgresql+{}'.format(dialect) if dialect else 'postgresql',
- ))
- async def db_ctx(app: web.Application):
- # number of gunicorn workers = multiprocessing.cpu_count() as per gunicorn_conf.py
- # max_connections = 2000 as per toolset/setup/linux/databases/postgresql/postgresql.conf:64
- # give 10% leeway
- max_size = min(1800 / multiprocessing.cpu_count(), 160)
- max_size = max(int(max_size), 1)
- min_size = max(int(max_size / 2), 1)
- print(f'connection pool: min size: {min_size}, max size: {max_size}, orm: {CONNECTION_ORM}')
- if CONNECTION_ORM:
- dsn = pg_dsn('asyncpg')
- engine = create_async_engine(dsn, future=True, pool_size=max_size)
- app['db_session'] = sessionmaker(engine, class_=AsyncSession)
- else:
- dsn = pg_dsn()
- app['pg'] = await asyncpg.create_pool(dsn=dsn, min_size=min_size, max_size=max_size, loop=app.loop)
- yield
- if not CONNECTION_ORM:
- await app['pg'].close()
- def setup_routes(app):
- if CONNECTION_ORM:
- app.router.add_get('/json', json)
- app.router.add_get('/db', single_database_query_orm)
- app.router.add_get('/queries/{queries:.*}', multiple_database_queries_orm)
- app.router.add_get('/fortunes', fortunes)
- app.router.add_get('/updates/{queries:.*}', updates)
- app.router.add_get('/plaintext', plaintext)
- else:
- app.router.add_get('/db', single_database_query_raw)
- app.router.add_get('/queries/{queries:.*}', multiple_database_queries_raw)
- app.router.add_get('/fortunes', fortunes_raw)
- app.router.add_get('/updates/{queries:.*}', updates_raw)
- def create_app():
- app = web.Application()
- app.cleanup_ctx.append(db_ctx)
- setup_routes(app)
- return app
|