app.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. from multiprocessing import cpu_count, pool
  2. from os import getenv
  3. from random import randint, sample
  4. from asyncpg import create_pool
  5. from heaven import Application, Context, Request, Response
  6. from heaven.constants import STARTUP, SHUTDOWN
  7. from orjson import dumps
  8. #############
  9. # constants #
  10. #############
  11. APPLICATION_JSON = 'application/json'
  12. CONTENT_TYPE = 'Content-Type'
  13. POOL = 'pool'
  14. READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
  15. WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  16. ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
  17. MAX_POOL_SIZE = 1000//cpu_count()
  18. MIN_POOL_SIZE = max(int(MAX_POOL_SIZE / 2), 1)
  19. #########################################
  20. # HOOKS for the app (shutdown, startup) #
  21. #########################################
  22. async def up_database(app: Application):
  23. pool = await create_pool(
  24. user=getenv("PGUSER", "benchmarkdbuser"),
  25. password=getenv("PGPASS", "benchmarkdbpass"),
  26. database="hello_world",
  27. host="tfb-database",
  28. port=5432,
  29. min_size=MIN_POOL_SIZE,
  30. max_size=MAX_POOL_SIZE,
  31. )
  32. app.keep(POOL, pool)
  33. async def down_database(app: Application):
  34. pool = app.unkeep(POOL)
  35. await pool.close()
  36. ################
  37. # Helper utils #
  38. ################
  39. def get_num_queries(queries):
  40. try: query_count = int(queries)
  41. except (ValueError, TypeError): return 1
  42. if query_count < 1: return 1
  43. if query_count > 500: return 500
  44. return query_count
  45. ###############################
  46. # Handlers for the app routes #
  47. ###############################
  48. async def database(req: Request, res: Response, ctx: Context):
  49. row_id = randint(1, 10000)
  50. pool = req.app.peek(POOL)
  51. async with pool.acquire() as connection:
  52. number = await connection.fetchval(READ_ROW_SQL, row_id)
  53. res.headers = CONTENT_TYPE, APPLICATION_JSON
  54. res.body = dumps({"id": row_id, "randomNumber": number})
  55. async def json(req: Request, res: Response, ctx: Context):
  56. res.headers = CONTENT_TYPE, APPLICATION_JSON
  57. res.body = dumps({'message': 'Hello, World!'})
  58. async def queries(req: Request, res: Response, ctx: Context):
  59. pool = req.app.peek(POOL)
  60. queries = req.params.get('queries')
  61. num_queries = get_num_queries(queries)
  62. row_ids = sample(range(1, 10001), num_queries)
  63. worlds = []
  64. async with pool.acquire() as connection:
  65. statement = await connection.prepare(READ_ROW_SQL)
  66. for row_id in row_ids:
  67. number = await statement.fetchval(row_id)
  68. worlds.append({"id": row_id, "randomNumber": number})
  69. res.headers = CONTENT_TYPE, APPLICATION_JSON
  70. res.body = dumps(worlds)
  71. async def fortunes(req: Request, res: Response, ctx: Context):
  72. pool = req.app.peek(POOL)
  73. async with pool.acquire() as connection:
  74. fortunes = await connection.fetch("SELECT * FROM Fortune")
  75. fortunes.append(ADDITIONAL_ROW)
  76. fortunes.sort(key=lambda row: row[1])
  77. await res.render("fortune.html", fortunes=fortunes, request=req)
  78. async def updates(req: Request, res: Response, ctx: Context):
  79. pool = req.app.peek(POOL)
  80. queries = req.params.get('queries')
  81. num_queries = get_num_queries(queries)
  82. # To avoid deadlock
  83. ids = sorted(sample(range(1, 10000 + 1), num_queries))
  84. numbers = sorted(sample(range(1, 10000), num_queries))
  85. updates = list(zip(ids, numbers))
  86. worlds = [
  87. {"id": row_id, "randomNumber": number} for row_id, number in updates
  88. ]
  89. async with pool.acquire() as connection:
  90. statement = await connection.prepare(READ_ROW_SQL)
  91. for row_id, _ in updates:
  92. await statement.fetchval(row_id)
  93. await connection.executemany(WRITE_ROW_SQL, updates)
  94. res.headers = CONTENT_TYPE, APPLICATION_JSON
  95. res.body = dumps(worlds)
  96. async def plaintext(req: Request, res: Response, ctx: Context):
  97. res.headers = 'Content-Type', 'text/plain'
  98. res.body = b"Hello, World!"
  99. ################
  100. # App creation #
  101. ################
  102. app = Application()
  103. app.TEMPLATES('templates')
  104. ###################
  105. # Register hooks #
  106. ###################
  107. app.ON(STARTUP, up_database)
  108. app.ON(SHUTDOWN, down_database)
  109. ###################
  110. # Register routes #
  111. ###################
  112. app.GET('/db', database)
  113. app.GET('/queries', queries)
  114. app.GET('/fortunes', fortunes)
  115. app.GET('/updates', updates)
  116. app.GET('/plaintext', plaintext)
  117. app.GET('/json', json)