| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import os
- from operator import itemgetter
- from random import randint, sample
- import asyncpg
- from emmett55 import App, Pipe, request, response
- from emmett55.extensions import Extension, Signals, listen_signal
- from emmett55.tools import ServicePipe
- from renoir import Renoir
- class NoResetConnection(asyncpg.Connection):
- __slots__ = ()
- def get_reset_query(self):
- return ""
- class AsyncPG(Extension):
- __slots__ = ["pool"]
- def on_load(self):
- self.pool = None
- self.pipe = AsyncPGPipe(self)
- async def build_pool(self):
- self.pool = await asyncpg.create_pool(
- user=os.getenv('PGUSER', 'benchmarkdbuser'),
- password=os.getenv('PGPASS', 'benchmarkdbpass'),
- database='hello_world',
- host='tfb-database',
- port=5432,
- min_size=4,
- max_size=4,
- connection_class=NoResetConnection,
- )
- @listen_signal(Signals.after_loop)
- def _init_pool(self, loop):
- loop.run_until_complete(self.build_pool())
- class AsyncPGPipe(Pipe):
- __slots__ = ["ext"]
- def __init__(self, ext):
- self.ext = ext
- async def pipe(self, next_pipe, **kwargs):
- async with self.ext.pool.acquire() as conn:
- kwargs['db'] = conn
- return await next_pipe(**kwargs)
- class TemplatePipe(Pipe):
- __slots__ = ["template"]
- output = "str"
- def __init__(self, template):
- self.template = f"templates/{template}"
- async def pipe(self, next_pipe, **kwargs):
- response.content_type = "text/html; charset=utf-8"
- ctx = await next_pipe(**kwargs)
- return templates.render(self.template, ctx)
- app = App(__name__)
- app.config.handle_static = False
- templates = Renoir()
- json_routes = app.module(__name__, 'json')
- json_routes.pipeline = [ServicePipe('json')]
- db_ext = app.use_extension(AsyncPG)
- SQL_SELECT = 'SELECT "randomnumber", "id" FROM "world" WHERE id = $1'
- SQL_UPDATE = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
- ROW_ADD = [0, 'Additional fortune added at request time.']
- sort_key = itemgetter(1)
- @json_routes.route()
- async def json():
- return {'message': 'Hello, World!'}
- @json_routes.route("/db", pipeline=[db_ext.pipe])
- async def get_random_world(db):
- row_id = randint(1, 10000)
- number = await db.fetchval(SQL_SELECT, row_id)
- return {'id': row_id, 'randomNumber': number}
- def get_qparam():
- try:
- rv = int(request.query_params.queries or 1)
- except ValueError:
- return 1
- if rv < 1:
- return 1
- if rv > 500:
- return 500
- return rv
- @json_routes.route("/queries", pipeline=[db_ext.pipe])
- async def get_random_worlds(db):
- num_queries = get_qparam()
- row_ids = sample(range(1, 10000), num_queries)
- rows = await db.fetchmany(SQL_SELECT, [(v,) for v in row_ids])
- return [{'id': row_id, 'randomNumber': number[0]} for row_id, number in zip(row_ids, rows)]
- @app.route(pipeline=[TemplatePipe("fortunes.html"), db_ext.pipe])
- async def fortunes(db):
- fortunes = await db.fetch('SELECT * FROM Fortune')
- fortunes.append(ROW_ADD)
- fortunes.sort(key=sort_key)
- return {"fortunes": fortunes}
- @json_routes.route(pipeline=[db_ext.pipe])
- async def updates(db):
- num_queries = get_qparam()
- updates = list(zip(
- sample(range(1, 10000), num_queries),
- sorted(sample(range(1, 10000), num_queries))
- ))
- worlds = [{'id': row_id, 'randomNumber': number} for row_id, number in updates]
- await db.executemany(SQL_SELECT, [(i[0],) for i in updates])
- await db.executemany(SQL_UPDATE, updates)
- return worlds
- @app.route(output='bytes')
- async def plaintext():
- return b'Hello, World!'
|