import multiprocessing import os from random import randint, sample import asyncpg import jinja2 from panther import Panther from panther.app import API from panther.request import Request from panther.response import Response, PlainTextResponse, HTMLResponse READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1' WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2' ADDITIONAL_ROW = [0, 'Additional fortune added at request time.'] MAX_POOL_SIZE = 1000 // multiprocessing.cpu_count() MIN_POOL_SIZE = max(int(MAX_POOL_SIZE / 2), 1) pool = None async def create_db_pool(): global pool pool = await asyncpg.create_pool( user='benchmarkdbuser', password='benchmarkdbpass', database='hello_world', host='tfb-database', port=5432, min_size=MIN_POOL_SIZE, max_size=MAX_POOL_SIZE, ) async def clean_db_pool(): await pool.close() def load_fortunes_template(): path = os.path.join('templates', 'fortune.html') with open(path, 'r') as template_file: template_text = template_file.read() return jinja2.Template(template_text) fortune_template = load_fortunes_template() def get_num_queries(request): value = request.query_params.get('queries') if value is None: return 1 try: query_count = int(value) except ValueError: return 1 if query_count < 1: return 1 if query_count > 500: return 500 return query_count @API() async def json_serialization(): return Response(data={'message': 'Hello, world!'}) @API() async def single_database_query(): row_id = randint(1, 10000) async with pool.acquire() as connection: number = await connection.fetchval(READ_ROW_SQL, row_id) return Response(data={'id': row_id, 'randomNumber': number}) @API() async def multiple_database_queries(request: Request): num_queries = get_num_queries(request) row_ids = sample(range(1, 10000), num_queries) async with pool.acquire() as connection: statement = await connection.prepare(READ_ROW_SQL) worlds = [{'id': i, 'randomNumber': await statement.fetchval(i)} for i in row_ids] return Response(data=worlds) @API() async def fortunes(): async with pool.acquire() as connection: fortune_records = await connection.fetch('SELECT * FROM Fortune') fortune_records.append(ADDITIONAL_ROW) fortune_records.sort(key=lambda row: row[1]) data = fortune_template.render(fortunes=fortune_records) return HTMLResponse(data=data) @API() async def database_updates(request: Request): num_queries = get_num_queries(request) ids = sorted(sample(range(1, 10000 + 1), num_queries)) numbers = sorted(sample(range(1, 10000), num_queries)) updates = list(zip(ids, numbers)) worlds = [ {'id': row_id, 'randomNumber': number} for row_id, number in updates ] async with pool.acquire() as connection: statement = await connection.prepare(READ_ROW_SQL) for row_id, _ in updates: await statement.fetchval(row_id) await connection.executemany(WRITE_ROW_SQL, updates) return Response(data=worlds) @API() async def plaintext(): return PlainTextResponse(b'Hello, world!') url_routing = { 'json': json_serialization, 'db': single_database_query, 'queries': multiple_database_queries, 'fortunes': fortunes, 'updates': database_updates, 'plaintext': plaintext, } app = Panther(__name__, configs=__name__, urls=url_routing, startup=create_db_pool, shutdown=clean_db_pool)