app.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import os
  2. from operator import itemgetter
  3. from random import randint, sample
  4. import asyncpg
  5. from emmett55 import App, Pipe, current, request, response
  6. from emmett55.extensions import Extension, Signals, listen_signal
  7. from emmett55.tools import service
  8. from renoir import Renoir
  9. class AsyncPG(Extension):
  10. __slots__ = ["pool"]
  11. def on_load(self):
  12. self.pool = None
  13. self.pipe = AsyncPGPipe(self)
  14. async def build_pool(self):
  15. self.pool = await asyncpg.create_pool(
  16. user=os.getenv('PGUSER', 'benchmarkdbuser'),
  17. password=os.getenv('PGPASS', 'benchmarkdbpass'),
  18. database='hello_world',
  19. host='tfb-database',
  20. port=5432,
  21. min_size=16,
  22. max_size=16,
  23. max_queries=64_000_000_000,
  24. max_inactive_connection_lifetime=0
  25. )
  26. @listen_signal(Signals.after_loop)
  27. def _init_pool(self, loop):
  28. loop.run_until_complete(self.build_pool())
  29. class AsyncPGPipe(Pipe):
  30. __slots__ = ["ext"]
  31. def __init__(self, ext):
  32. self.ext = ext
  33. async def open(self):
  34. conn = current._db_conn = self.ext.pool.acquire()
  35. current.db = await conn.__aenter__()
  36. async def close(self):
  37. await current._db_conn.__aexit__()
  38. app = App(__name__)
  39. app.config.handle_static = False
  40. templates = Renoir()
  41. db_ext = app.use_extension(AsyncPG)
  42. SQL_SELECT = 'SELECT "randomnumber", "id" FROM "world" WHERE id = $1'
  43. SQL_UPDATE = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  44. ROW_ADD = [0, 'Additional fortune added at request time.']
  45. sort_key = itemgetter(1)
  46. @app.route()
  47. @service.json
  48. async def json():
  49. return {'message': 'Hello, World!'}
  50. @app.route("/db", pipeline=[db_ext.pipe])
  51. @service.json
  52. async def get_random_world():
  53. row_id = randint(1, 10000)
  54. number = await current.db.fetchval(SQL_SELECT, row_id)
  55. return {'id': row_id, 'randomNumber': number}
  56. def get_qparam():
  57. try:
  58. rv = int(request.query_params.queries or 1)
  59. except ValueError:
  60. return 1
  61. if rv < 1:
  62. return 1
  63. if rv > 500:
  64. return 500
  65. return rv
  66. @app.route("/queries", pipeline=[db_ext.pipe])
  67. @service.json
  68. async def get_random_worlds():
  69. num_queries = get_qparam()
  70. row_ids = sample(range(1, 10000), num_queries)
  71. worlds = []
  72. statement = await current.db.prepare(SQL_SELECT)
  73. for row_id in row_ids:
  74. number = await statement.fetchval(row_id)
  75. worlds.append({'id': row_id, 'randomNumber': number})
  76. return worlds
  77. @app.route(pipeline=[db_ext.pipe], output='str')
  78. async def fortunes():
  79. response.content_type = "text/html; charset=utf-8"
  80. fortunes = await current.db.fetch('SELECT * FROM Fortune')
  81. fortunes.append(ROW_ADD)
  82. fortunes.sort(key=sort_key)
  83. return templates.render("templates/fortunes.html", {"fortunes": fortunes})
  84. @app.route(pipeline=[db_ext.pipe])
  85. @service.json
  86. async def updates():
  87. num_queries = get_qparam()
  88. updates = list(zip(
  89. sample(range(1, 10000), num_queries),
  90. sorted(sample(range(1, 10000), num_queries))
  91. ))
  92. worlds = [{'id': row_id, 'randomNumber': number} for row_id, number in updates]
  93. statement = await current.db.prepare(SQL_SELECT)
  94. for row_id, _ in updates:
  95. await statement.fetchval(row_id)
  96. await current.db.executemany(SQL_UPDATE, updates)
  97. return worlds
  98. @app.route(output='bytes')
  99. async def plaintext():
  100. return b'Hello, World!'