main.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import multiprocessing
  2. import os
  3. import platform
  4. from aiohttp import web
  5. from sqlalchemy.engine.url import URL
  6. from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
  7. from .views import (
  8. json,
  9. single_database_query_orm,
  10. multiple_database_queries_orm,
  11. fortunes,
  12. updates,
  13. plaintext,
  14. single_database_query_raw,
  15. multiple_database_queries_raw,
  16. fortunes_raw,
  17. updates_raw,
  18. )
  19. if platform.python_implementation() != "PyPy":
  20. import asyncpg
  21. class NoResetConnection(asyncpg.Connection):
  22. __slots__ = ()
  23. def get_reset_query(self):
  24. return ""
  25. CONNECTION_ORM = os.getenv('CONNECTION', 'ORM').upper() == 'ORM'
  26. def pg_dsn(dialect=None) -> str:
  27. """
  28. :return: DSN url suitable for sqlalchemy and aiopg.
  29. """
  30. url = URL.create(
  31. database='hello_world',
  32. password=os.getenv('PGPASS', 'benchmarkdbpass'),
  33. host='tfb-database',
  34. port='5432',
  35. username=os.getenv('PGUSER', 'benchmarkdbuser'),
  36. drivername='postgresql+{}'.format(dialect) if dialect else 'postgresql',
  37. )
  38. return url.render_as_string(hide_password=False)
  39. async def db_ctx(app: web.Application):
  40. # number of gunicorn workers = multiprocessing.cpu_count() as per gunicorn_conf.py
  41. # max_connections = 2000 as per toolset/setup/linux/databases/postgresql/postgresql.conf:64
  42. # since the world table contains only 10,000 rows, a large connection pool is unnecessary
  43. # the server hardware provides 56 CPU cores producing high concurrency
  44. # https://wiki.postgresql.org/wiki/Number_Of_Database_Connections
  45. max_size = 2
  46. min_size = 2
  47. print(f'connection pool: min size: {min_size}, max size: {max_size}, orm: {CONNECTION_ORM}')
  48. if CONNECTION_ORM:
  49. dsn = pg_dsn('asyncpg')
  50. engine = create_async_engine(dsn, pool_size=max_size)
  51. app['db_session'] = async_sessionmaker(engine)
  52. else:
  53. dsn = pg_dsn()
  54. app['pg'] = await asyncpg.create_pool(dsn=dsn, min_size=min_size, max_size=max_size, loop=app.loop, connection_class=NoResetConnection)
  55. yield
  56. if CONNECTION_ORM:
  57. await app['db_session'].dispose()
  58. else:
  59. await app['pg'].close()
  60. def setup_routes(app):
  61. if CONNECTION_ORM:
  62. app.router.add_get('/db', single_database_query_orm)
  63. app.router.add_get('/queries/{queries:.*}', multiple_database_queries_orm)
  64. app.router.add_get('/fortunes', fortunes)
  65. app.router.add_get('/updates/{queries:.*}', updates)
  66. else:
  67. app.router.add_get('/json', json)
  68. app.router.add_get('/plaintext', plaintext)
  69. app.router.add_get('/db', single_database_query_raw)
  70. app.router.add_get('/queries/{queries:.*}', multiple_database_queries_raw)
  71. app.router.add_get('/fortunes', fortunes_raw)
  72. app.router.add_get('/updates/{queries:.*}', updates_raw)
  73. def create_app():
  74. app = web.Application()
  75. if platform.python_implementation() != "PyPy":
  76. app.cleanup_ctx.append(db_ctx)
  77. setup_routes(app)
  78. return app