app.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import os
  2. from operator import itemgetter
  3. from random import randint, sample
  4. import asyncpg
  5. from emmett55 import App, Pipe, request, response
  6. from emmett55.extensions import Extension, Signals, listen_signal
  7. from emmett55.tools import ServicePipe
  8. from renoir import Renoir
  9. class NoResetConnection(asyncpg.Connection):
  10. __slots__ = ()
  11. def get_reset_query(self):
  12. return ""
  13. class AsyncPG(Extension):
  14. __slots__ = ["pool"]
  15. def on_load(self):
  16. self.pool = None
  17. self.pipe = AsyncPGPipe(self)
  18. async def build_pool(self):
  19. self.pool = await asyncpg.create_pool(
  20. user=os.getenv('PGUSER', 'benchmarkdbuser'),
  21. password=os.getenv('PGPASS', 'benchmarkdbpass'),
  22. database='hello_world',
  23. host='tfb-database',
  24. port=5432,
  25. min_size=4,
  26. max_size=4,
  27. connection_class=NoResetConnection,
  28. )
  29. @listen_signal(Signals.after_loop)
  30. def _init_pool(self, loop):
  31. loop.run_until_complete(self.build_pool())
  32. class AsyncPGPipe(Pipe):
  33. __slots__ = ["ext"]
  34. def __init__(self, ext):
  35. self.ext = ext
  36. async def pipe(self, next_pipe, **kwargs):
  37. async with self.ext.pool.acquire() as conn:
  38. kwargs['db'] = conn
  39. return await next_pipe(**kwargs)
  40. class TemplatePipe(Pipe):
  41. __slots__ = ["template"]
  42. output = "str"
  43. def __init__(self, template):
  44. self.template = f"templates/{template}"
  45. async def pipe(self, next_pipe, **kwargs):
  46. response.content_type = "text/html; charset=utf-8"
  47. ctx = await next_pipe(**kwargs)
  48. return templates.render(self.template, ctx)
  49. app = App(__name__)
  50. app.config.handle_static = False
  51. templates = Renoir()
  52. json_routes = app.module(__name__, 'json')
  53. json_routes.pipeline = [ServicePipe('json')]
  54. db_ext = app.use_extension(AsyncPG)
  55. SQL_SELECT = 'SELECT "randomnumber", "id" FROM "world" WHERE id = $1'
  56. SQL_UPDATE = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  57. ROW_ADD = [0, 'Additional fortune added at request time.']
  58. sort_key = itemgetter(1)
  59. @json_routes.route()
  60. async def json():
  61. return {'message': 'Hello, World!'}
  62. @json_routes.route("/db", pipeline=[db_ext.pipe])
  63. async def get_random_world(db):
  64. row_id = randint(1, 10000)
  65. number = await db.fetchval(SQL_SELECT, row_id)
  66. return {'id': row_id, 'randomNumber': number}
  67. def get_qparam():
  68. try:
  69. rv = int(request.query_params.queries or 1)
  70. except ValueError:
  71. return 1
  72. if rv < 1:
  73. return 1
  74. if rv > 500:
  75. return 500
  76. return rv
  77. @json_routes.route("/queries", pipeline=[db_ext.pipe])
  78. async def get_random_worlds(db):
  79. num_queries = get_qparam()
  80. row_ids = sample(range(1, 10000), num_queries)
  81. rows = await db.fetchmany(SQL_SELECT, [(v,) for v in row_ids])
  82. return [{'id': row_id, 'randomNumber': number[0]} for row_id, number in zip(row_ids, rows)]
  83. @app.route(pipeline=[TemplatePipe("fortunes.html"), db_ext.pipe])
  84. async def fortunes(db):
  85. fortunes = await db.fetch('SELECT * FROM Fortune')
  86. fortunes.append(ROW_ADD)
  87. fortunes.sort(key=sort_key)
  88. return {"fortunes": fortunes}
  89. @json_routes.route(pipeline=[db_ext.pipe])
  90. async def updates(db):
  91. num_queries = get_qparam()
  92. updates = list(zip(
  93. sample(range(1, 10000), num_queries),
  94. sorted(sample(range(1, 10000), num_queries))
  95. ))
  96. worlds = [{'id': row_id, 'randomNumber': number} for row_id, number in updates]
  97. await db.executemany(SQL_SELECT, [(i[0],) for i in updates])
  98. await db.executemany(SQL_UPDATE, updates)
  99. return worlds
  100. @app.route(output='bytes')
  101. async def plaintext():
  102. return b'Hello, World!'