app-socketify.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import multiprocessing
  2. import os
  3. import psycopg
  4. import platform
  5. import random
  6. import asyncio
  7. import blacksheep as bs
  8. import jinja2
  9. from pathlib import Path
  10. from psycopg_pool import AsyncConnectionPool
  11. READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
  12. WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
  13. ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
  14. CORE_COUNT = multiprocessing.cpu_count()
  15. MAX_POOL_SIZE = CORE_COUNT * 2
  16. MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2)
  17. db_pool = None
  18. async def setup_db(app):
  19. global db_pool
  20. conninfo = (
  21. f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}"
  22. f"@tfb-database:5432/hello_world"
  23. )
  24. db_pool = AsyncConnectionPool(
  25. conninfo=conninfo,
  26. min_size=MIN_POOL_SIZE,
  27. max_size=MAX_POOL_SIZE,
  28. open=False,
  29. timeout=5.0,
  30. max_lifetime=1800,
  31. )
  32. await db_pool.open()
  33. async def shutdown_db(app):
  34. global db_pool
  35. if db_pool is not None:
  36. await db_pool.close()
  37. db_pool = None
  38. def load_fortunes_template():
  39. with Path("templates/fortune.html").open("r") as f:
  40. return jinja2.Template(f.read())
  41. fortune_template = load_fortunes_template()
  42. app = bs.Application()
  43. app.on_start += setup_db
  44. app.on_stop += shutdown_db
  45. def get_num_queries(request):
  46. try:
  47. value = request.query.get('queries')
  48. if value is None:
  49. return 1
  50. query_count = int(value[0])
  51. except (KeyError, IndexError, ValueError):
  52. return 1
  53. return min(max(query_count, 1), 500)
  54. JSON_CONTENT_TYPE = b"application/json"
  55. @bs.get('/json')
  56. async def json_test(request):
  57. return bs.json({'message': 'Hello, world!'})
  58. @bs.get('/db')
  59. async def single_db_query_test(request):
  60. row_id = random.randint(1, 10000)
  61. async with db_pool.connection() as db_conn:
  62. async with db_conn.cursor() as cursor:
  63. await cursor.execute(READ_ROW_SQL, (row_id,))
  64. number = await cursor.fetchone()
  65. return bs.json({'id': row_id, 'randomNumber': number[1]})
  66. @bs.get('/queries')
  67. async def multiple_db_queries_test(request):
  68. num_queries = get_num_queries(request)
  69. row_ids = random.sample(range(1, 10000), num_queries)
  70. worlds = []
  71. async with db_pool.connection() as db_conn:
  72. async with db_conn.cursor() as cursor:
  73. for row_id in row_ids:
  74. await cursor.execute(READ_ROW_SQL, (row_id,))
  75. number = await cursor.fetchone()
  76. worlds.append({"id": row_id, "randomNumber": number[1]})
  77. return bs.json(worlds)
  78. @bs.get('/fortunes')
  79. async def fortunes_test(request):
  80. async with db_pool.connection() as db_conn:
  81. async with db_conn.cursor() as cursor:
  82. await cursor.execute("SELECT * FROM Fortune")
  83. fortunes = await cursor.fetchall()
  84. fortunes.append(ADDITIONAL_ROW)
  85. fortunes.sort(key=lambda row: row[1])
  86. data = fortune_template.render(fortunes=fortunes)
  87. return bs.html(data)
  88. @bs.get('/updates')
  89. async def db_updates_test(request):
  90. num_queries = get_num_queries(request)
  91. updates = sorted(zip(
  92. random.sample(range(1, 10000), num_queries),
  93. random.sample(range(1, 10000), num_queries)
  94. ), key=lambda x: x[1])
  95. worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates]
  96. for _ in range(5):
  97. async with db_pool.connection() as db_conn:
  98. try:
  99. await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
  100. async with db_conn.cursor() as cursor:
  101. for row_id, number in updates:
  102. await cursor.execute(READ_ROW_SQL, (row_id,))
  103. await cursor.fetchone()
  104. for _ in range(5):
  105. try:
  106. await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
  107. return bs.json(worlds)
  108. except psycopg.errors.DeadlockDetected:
  109. await db_conn.rollback()
  110. continue
  111. # await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
  112. except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted):
  113. await db_conn.rollback()
  114. continue
  115. raise Exception("connect error")
  116. @bs.get('/plaintext')
  117. async def plaintext_test(request):
  118. return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
  119. if platform.python_implementation() == 'PyPy':
  120. import logging
  121. from socketify import ASGI
  122. workers = int(multiprocessing.cpu_count())
  123. if os.environ.get('TRAVIS') == 'true':
  124. workers = 2
  125. def run_app():
  126. ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run()
  127. def create_fork():
  128. n = os.fork()
  129. if not n > 0:
  130. run_app()
  131. for i in range(1, workers):
  132. create_fork()
  133. run_app()