app.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import multiprocessing
  2. import os
  3. import asyncpg
  4. import random
  5. import asyncio
  6. from operator import itemgetter
  7. import blacksheep as bs
  8. import jinja2
  9. import msgspec
  10. from pathlib import Path
  11. READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
  12. WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  13. ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
  14. MAX_POOL_SIZE = 1000 // multiprocessing.cpu_count()
  15. MIN_POOL_SIZE = max(int(MAX_POOL_SIZE / 2), 1)
  16. db_pool = None
  17. key = itemgetter(1)
  18. try:
  19. import uvloop
  20. asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
  21. except Exception:
  22. ...
  23. async def setup_db(app):
  24. global db_pool
  25. db_pool = await asyncpg.create_pool(
  26. user=os.getenv('PGUSER', "benchmarkdbuser"),
  27. password=os.getenv('PGPASS', "benchmarkdbpass"),
  28. database='hello_world',
  29. host="tfb-database",
  30. port=5432,
  31. min_size=MIN_POOL_SIZE,
  32. max_size=MAX_POOL_SIZE,
  33. )
  34. def load_fortunes_template():
  35. with Path("templates/fortune.html").open("r") as f:
  36. return jinja2.Template(f.read())
  37. fortune_template = load_fortunes_template()
  38. app = bs.Application()
  39. app.on_start += setup_db
  40. def get_num_queries(request):
  41. try:
  42. value = request.query.get('queries')
  43. if value is None:
  44. return 1
  45. query_count = int(value[0])
  46. except (KeyError, IndexError, ValueError):
  47. return 1
  48. if query_count < 1:
  49. return 1
  50. if query_count > 500:
  51. return 500
  52. return query_count
  53. ENCODER = msgspec.json.Encoder()
  54. DECODER = msgspec.json.Decoder()
  55. JSON_CONTENT_TYPE = b"application/json"
  56. def jsonify(
  57. data,
  58. status=200,
  59. headers=None,
  60. ):
  61. """
  62. Returns a response with application/json content,
  63. and given status (default HTTP 200 OK).
  64. """
  65. return bs.Response(
  66. status=status,
  67. headers=headers,
  68. content=bs.Content(content_type=JSON_CONTENT_TYPE, data=ENCODER.encode(data)),
  69. )
  70. class Result(msgspec.Struct):
  71. id: int
  72. randomNumber: int
  73. # ------------------------------------------------------------------------------------------
  74. @bs.get('/json')
  75. async def json_test(request):
  76. return jsonify( {'message': 'Hello, world!'} )
  77. @bs.get('/db')
  78. async def single_db_query_test(request):
  79. row_id = random.randint(1, 10000)
  80. async with db_pool.acquire() as db_conn:
  81. number = await db_conn.fetchval(READ_ROW_SQL, row_id)
  82. return jsonify(Result(id=row_id, randomNumber=number))
  83. # return ({'id': row_id, 'randomNumber': number})
  84. @bs.get('/queries')
  85. async def multiple_db_queries_test(request):
  86. num_queries = get_num_queries(request)
  87. row_ids = random.sample(range(1, 10000), num_queries)
  88. worlds = []
  89. async with db_pool.acquire() as db_conn:
  90. statement = await db_conn.prepare(READ_ROW_SQL)
  91. for row_id in row_ids:
  92. number = await statement.fetchval(row_id)
  93. # worlds.append( {"id": row_id, "randomNumber": number} )
  94. worlds.append(Result(id=row_id, randomNumber=number))
  95. return jsonify(worlds)
  96. @bs.get('/fortunes')
  97. async def fortunes_test(request):
  98. async with db_pool.acquire() as db_conn:
  99. fortunes = await db_conn.fetch("SELECT * FROM Fortune")
  100. fortunes.append(ADDITIONAL_ROW)
  101. fortunes.sort(key = key)
  102. data = fortune_template.render(fortunes=fortunes)
  103. return bs.html(data)
  104. @bs.get('/updates')
  105. async def db_updates_test(request):
  106. num_queries = get_num_queries(request)
  107. ids = sorted(random.sample(range(1, 10000 + 1), num_queries))
  108. numbers = sorted(random.sample(range(1, 10000), num_queries))
  109. updates = list(zip(ids, numbers))
  110. # worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
  111. worlds = [Result(id=row_id, randomNumber=number) for row_id, number in updates]
  112. async with db_pool.acquire() as db_conn:
  113. statement = await db_conn.prepare(READ_ROW_SQL)
  114. for row_id, _ in updates:
  115. await statement.fetchval(row_id)
  116. await db_conn.executemany(WRITE_ROW_SQL, updates)
  117. return jsonify(worlds)
  118. @bs.get('/plaintext')
  119. async def plaintext_test(request):
  120. return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
  121. #return bs.text('Hello, World!')