app.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import multiprocessing
  2. import os
  3. import asyncpg
  4. import platform
  5. import random
  6. import asyncio
  7. import blacksheep as bs
  8. import jinja2
  9. import msgspec
  10. from pathlib import Path
  11. try:
  12. import uvloop
  13. asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
  14. except Exception:
  15. ...
  16. READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
  17. WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  18. ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
  19. MAX_CONNECTIONS = 1900
  20. CORE_COUNT = multiprocessing.cpu_count()
  21. MAX_POOL_SIZE = max(1,int(os.getenv('MAX_POOL_SIZE', MAX_CONNECTIONS // CORE_COUNT)))
  22. MIN_POOL_SIZE = max(1,int(os.getenv('MIN_POOL_SIZE', MAX_POOL_SIZE // 2)))
  23. db_pool = None
  24. async def setup_db(app):
  25. global db_pool
  26. db_pool = await asyncpg.create_pool(
  27. user=os.getenv('PGUSER', "benchmarkdbuser"),
  28. password=os.getenv('PGPASS', "benchmarkdbpass"),
  29. database='hello_world',
  30. host="tfb-database",
  31. port=5432,
  32. min_size=MIN_POOL_SIZE,
  33. max_size=MAX_POOL_SIZE,
  34. )
  35. async def shutdown_db(app):
  36. """Close asyncpg connection pool for the current process."""
  37. global db_pool
  38. if db_pool is not None:
  39. await db_pool.close()
  40. db_pool = None
  41. def load_fortunes_template():
  42. with Path("templates/fortune.html").open("r") as f:
  43. return jinja2.Template(f.read())
  44. fortune_template = load_fortunes_template()
  45. app = bs.Application()
  46. app.on_start += setup_db
  47. app.on_stop += shutdown_db
  48. def get_num_queries(request):
  49. try:
  50. value = request.query.get('queries')
  51. if value is None:
  52. return 1
  53. query_count = int(value[0])
  54. except (KeyError, IndexError, ValueError):
  55. return 1
  56. return min(max(query_count, 1), 500)
  57. ENCODER = msgspec.json.Encoder()
  58. JSON_CONTENT_TYPE = b"application/json"
  59. def jsonify(
  60. data,
  61. status=200,
  62. headers=None,
  63. ):
  64. """
  65. Returns a response with application/json content,
  66. and given status (default HTTP 200 OK).
  67. """
  68. return bs.Response(
  69. status=status,
  70. headers=headers,
  71. content=bs.Content(content_type=JSON_CONTENT_TYPE, data=ENCODER.encode(data)),
  72. )
  73. class Result(msgspec.Struct):
  74. id: int
  75. randomNumber: int
  76. # ------------------------------------------------------------------------------------------
  77. @bs.get('/json')
  78. async def json_test(request):
  79. return jsonify( {'message': 'Hello, world!'} )
  80. @bs.get('/db')
  81. async def single_db_query_test(request):
  82. row_id = random.randint(1, 10000)
  83. async with db_pool.acquire() as db_conn:
  84. number = await db_conn.fetchval(READ_ROW_SQL, row_id)
  85. return jsonify(Result(id=row_id, randomNumber=number))
  86. # return ({'id': row_id, 'randomNumber': number})
  87. @bs.get('/queries')
  88. async def multiple_db_queries_test(request):
  89. num_queries = get_num_queries(request)
  90. row_ids = random.sample(range(1, 10000), num_queries)
  91. worlds = []
  92. async with db_pool.acquire() as db_conn:
  93. statement = await db_conn.prepare(READ_ROW_SQL)
  94. for row_id in row_ids:
  95. number = await statement.fetchval(row_id)
  96. # worlds.append( {"id": row_id, "randomNumber": number} )
  97. worlds.append(Result(id=row_id, randomNumber=number))
  98. return jsonify(worlds)
  99. @bs.get('/fortunes')
  100. async def fortunes_test(request):
  101. async with db_pool.acquire() as db_conn:
  102. fortunes = await db_conn.fetch("SELECT * FROM Fortune")
  103. fortunes.append(ADDITIONAL_ROW)
  104. fortunes.sort(key=lambda row: row[1])
  105. data = fortune_template.render(fortunes=fortunes)
  106. return bs.html(data)
  107. @bs.get('/updates')
  108. async def db_updates_test(request):
  109. num_queries = get_num_queries(request)
  110. updates = list(zip(
  111. random.sample(range(1, 10000), num_queries),
  112. sorted(random.sample(range(1, 10000), num_queries))
  113. ))
  114. worlds = [Result(id=row_id, randomNumber=number) for row_id, number in updates]
  115. # worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
  116. async with db_pool.acquire() as db_conn:
  117. statement = await db_conn.prepare(READ_ROW_SQL)
  118. for row_id, _ in updates:
  119. await statement.fetchval(row_id)
  120. await db_conn.executemany(WRITE_ROW_SQL, updates)
  121. return jsonify(worlds)
  122. @bs.get('/plaintext')
  123. async def plaintext_test(request):
  124. return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
  125. #return bs.text('Hello, World!')
  126. if platform.python_implementation() == 'PyPy':
  127. from socketify import ASGI
  128. workers = int(multiprocessing.cpu_count())
  129. if _is_travis:
  130. workers = 2
  131. def run_app():
  132. ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run()
  133. def create_fork():
  134. n = os.fork()
  135. # n greater than 0 means parent process
  136. if not n > 0:
  137. run_app()
  138. # fork limiting the cpu count - 1
  139. for i in range(1, workers):
  140. create_fork()
  141. run_app()