app-socketify.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import multiprocessing
  2. import os
  3. import psycopg
  4. import platform
  5. import random
  6. import asyncio
  7. import blacksheep as bs
  8. import jinja2
  9. from pathlib import Path
  10. from psycopg_pool import AsyncConnectionPool
  11. READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
  12. WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
  13. ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
  14. CORE_COUNT = multiprocessing.cpu_count()
  15. MAX_DB_CONNECTIONS = 2000
  16. MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 32)
  17. MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2)
  18. db_pool = None
  19. async def setup_db(app):
  20. global db_pool
  21. conninfo = (
  22. f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}"
  23. f"@tfb-database:5432/hello_world"
  24. )
  25. db_pool = AsyncConnectionPool(
  26. conninfo=conninfo,
  27. min_size=MIN_POOL_SIZE,
  28. max_size=MAX_POOL_SIZE,
  29. open=False,
  30. timeout=5.0,
  31. max_lifetime=1800,
  32. )
  33. await db_pool.open()
  34. async def shutdown_db(app):
  35. global db_pool
  36. if db_pool is not None:
  37. await db_pool.close()
  38. db_pool = None
  39. def load_fortunes_template():
  40. with Path("templates/fortune.html").open("r") as f:
  41. return jinja2.Template(f.read())
  42. fortune_template = load_fortunes_template()
  43. app = bs.Application()
  44. app.on_start += setup_db
  45. app.on_stop += shutdown_db
  46. def get_num_queries(request):
  47. try:
  48. value = request.query.get('queries')
  49. if value is None:
  50. return 1
  51. query_count = int(value[0])
  52. except (KeyError, IndexError, ValueError):
  53. return 1
  54. return min(max(query_count, 1), 500)
  55. JSON_CONTENT_TYPE = b"application/json"
  56. @bs.get('/json')
  57. async def json_test(request):
  58. return bs.json({'message': 'Hello, world!'})
  59. @bs.get('/db')
  60. async def single_db_query_test(request):
  61. row_id = random.randint(1, 10000)
  62. async with db_pool.connection() as db_conn:
  63. async with db_conn.cursor() as cursor:
  64. await cursor.execute(READ_ROW_SQL, (row_id,))
  65. number = await cursor.fetchone()
  66. return bs.json({'id': row_id, 'randomNumber': number[1]})
  67. @bs.get('/queries')
  68. async def multiple_db_queries_test(request):
  69. num_queries = get_num_queries(request)
  70. row_ids = random.sample(range(1, 10000), num_queries)
  71. worlds = []
  72. async with db_pool.connection() as db_conn:
  73. async with db_conn.cursor() as cursor:
  74. for row_id in row_ids:
  75. await cursor.execute(READ_ROW_SQL, (row_id,))
  76. number = await cursor.fetchone()
  77. worlds.append({"id": row_id, "randomNumber": number[1]})
  78. return bs.json(worlds)
  79. @bs.get('/fortunes')
  80. async def fortunes_test(request):
  81. async with db_pool.connection() as db_conn:
  82. async with db_conn.cursor() as cursor:
  83. await cursor.execute("SELECT * FROM Fortune")
  84. fortunes = await cursor.fetchall()
  85. fortunes.append(ADDITIONAL_ROW)
  86. fortunes.sort(key=lambda row: row[1])
  87. data = fortune_template.render(fortunes=fortunes)
  88. return bs.html(data)
  89. @bs.get('/updates')
  90. async def db_updates_test(request):
  91. num_queries = get_num_queries(request)
  92. updates = sorted(zip(
  93. random.sample(range(1, 10000), num_queries),
  94. random.sample(range(1, 10000), num_queries)
  95. ), key=lambda x: x[1])
  96. worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates]
  97. for _ in range(5):
  98. async with db_pool.connection() as db_conn:
  99. try:
  100. await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
  101. async with db_conn.cursor() as cursor:
  102. for row_id, number in updates:
  103. await cursor.execute(READ_ROW_SQL, (row_id,))
  104. await cursor.fetchone()
  105. for _ in range(5):
  106. try:
  107. await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
  108. return bs.json(worlds)
  109. except psycopg.errors.DeadlockDetected:
  110. await db_conn.rollback()
  111. continue
  112. # await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
  113. except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted):
  114. await db_conn.rollback()
  115. continue
  116. raise Exception("connect error")
  117. @bs.get('/plaintext')
  118. async def plaintext_test(request):
  119. return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
  120. if platform.python_implementation() == 'PyPy':
  121. import logging
  122. from socketify import ASGI
  123. workers = int(multiprocessing.cpu_count())
  124. if os.environ.get('TRAVIS') == 'true':
  125. workers = 2
  126. def run_app():
  127. ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run()
  128. def create_fork():
  129. n = os.fork()
  130. if not n > 0:
  131. run_app()
  132. for i in range(1, workers):
  133. create_fork()
  134. run_app()