| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 | import loggingfrom operator import itemgetterfrom random import randintimport asyncpg.exceptionsimport jinja2from aioworkers_pg.base import Connectorfrom aioworkers.core.base import AbstractEntityfrom aioworkers.core.config import ValueExtractorfrom aioworkers.net.uri import URIREAD_ROW_SQL = 'SELECT "randomnumber", "id" FROM "world" WHERE id = $1'WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'ADDITIONAL_ROW = [0, "Additional fortune added at request time."]sort_fortunes_key = itemgetter(1)logger = logging.getLogger(__name__)class PG(Connector):    def set_config(self, config: ValueExtractor) -> None:        cfg = config.connection        dsn: URI = cfg.get_uri("dsn").with_auth(            username=cfg.get("username"),            password=cfg.get("password"),        )        super().set_config(config.new_child(dsn=dsn))class Templates(AbstractEntity):    fortune: jinja2.Template    def set_config(self, config):        super().set_config(config)        self.fortune = jinja2.Template(config.fortune)def get_num_queries(request):    query_count = request.url.query.get_int("queries")    if query_count is None:        return 1    elif query_count < 1:        return 1    elif query_count > 500:        return 500    return query_countasync def single_database_query(context):    row_id = randint(1, 10000)    async with context.pg.pool.acquire() as connection:        number = await connection.fetchval(READ_ROW_SQL, row_id)    return {"id": row_id, "randomNumber": number}async def multiple_database_queries(context, request):    num_queries = get_num_queries(request)    row_ids = [randint(1, 10000) for _ in range(num_queries)]    worlds = []    async with context.pg.pool.acquire() as connection:        statement = await connection.prepare(READ_ROW_SQL)        for row_id in row_ids:            number = await statement.fetchval(row_id)            worlds.append({"id": row_id, "randomNumber": number})    return worldsasync def fortunes(context, request):    async with context.pg.pool.acquire() as connection:        fortunes = await connection.fetch("SELECT * FROM Fortune")    fortunes.append(ADDITIONAL_ROW)    fortunes.sort(key=sort_fortunes_key)    content = context.templates.fortune.render(fortunes=fortunes)    return request.response(        content.encode(),        headers=[            ("Content-Type", "text/html; charset=utf-8"),        ],    )async def database_updates(context, request):    num_queries = get_num_queries(request)    uniq = {randint(1, 10000) for _ in range(num_queries)}    while len(uniq) < num_queries:        uniq.add(randint(1, 10000))    updates = [        (row_id, randint(1, 10000)) for row_id in uniq    ]    worlds = [        {"id": row_id, "randomNumber": number} for row_id, number in updates    ]    async with context.pg.pool.acquire() as connection:        statement = await connection.prepare(READ_ROW_SQL)        for row_id, number in updates:            await statement.fetchval(row_id)        for _ in range(99):            try:                await connection.executemany(WRITE_ROW_SQL, updates)            except asyncpg.exceptions.DeadlockDetectedError as e:                logger.debug('Deadlock %s', e)            else:                break        else:            worlds.clear()    return worlds
 |