Преглед на файлове

[Python/Blacksheep] use psqlpy (#9975)

* try use psqlpy

* fix

* fix

* try fix

* roll back

* fix

* success fix
nazo преди 2 месеца
родител
ревизия
d9e0beef75
променени са 2 файла, в които са добавени 39 реда и са изтрити 51 реда
  1. 38 50
      frameworks/Python/blacksheep/app-socketify.py
  2. 1 1
      frameworks/Python/blacksheep/requirements-pypy.txt

+ 38 - 50
frameworks/Python/blacksheep/app-socketify.py

@@ -1,22 +1,22 @@
 import multiprocessing
 import multiprocessing
 import os
 import os
-import psycopg
+# import psycopg
 import platform
 import platform
 import random
 import random
 import asyncio
 import asyncio
 import blacksheep as bs
 import blacksheep as bs
 import jinja2
 import jinja2
 from pathlib import Path
 from pathlib import Path
-from psycopg_pool import AsyncConnectionPool
-
-READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
-WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
+# from psycopg_pool import AsyncConnectionPool
+import psqlpy
+from psqlpy import ConnectionPoolBuilder
+READ_ROW_SQL = 'SELECT "randomnumber" FROM "world" WHERE id = $1'
+WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
 ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
 ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
 CORE_COUNT = multiprocessing.cpu_count()
 CORE_COUNT = multiprocessing.cpu_count()
 MAX_DB_CONNECTIONS = 2000
 MAX_DB_CONNECTIONS = 2000
 
 
-MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 32)
-MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2)
+MAX_POOL_SIZE = min(CORE_COUNT * 2, MAX_DB_CONNECTIONS // CORE_COUNT, 35)
 db_pool = None
 db_pool = None
 
 
 async def setup_db(app):
 async def setup_db(app):
@@ -25,15 +25,17 @@ async def setup_db(app):
         f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}"
         f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}"
         f"@tfb-database:5432/hello_world"
         f"@tfb-database:5432/hello_world"
     )
     )
-    db_pool = AsyncConnectionPool(
-        conninfo=conninfo,
-        min_size=MIN_POOL_SIZE,
-        max_size=MAX_POOL_SIZE,
-        open=False,
-        timeout=5.0,
-        max_lifetime=1800,
+    db_pool = (
+        ConnectionPoolBuilder()
+        .max_pool_size(MAX_POOL_SIZE)
+        .user(os.getenv('PGUSER', 'benchmarkdbuser'))
+        .password(os.getenv('PGPASS', 'benchmarkdbpass'))
+        .dbname("hello_world")
+        .host("tfb-database")
+        .port(5432)
+        .build()
     )
     )
-    await db_pool.open()
+    # await db_pool.open()
 
 
 async def shutdown_db(app):
 async def shutdown_db(app):
     global db_pool
     global db_pool
@@ -70,31 +72,31 @@ async def json_test(request):
 @bs.get('/db')
 @bs.get('/db')
 async def single_db_query_test(request):
 async def single_db_query_test(request):
     row_id = random.randint(1, 10000)
     row_id = random.randint(1, 10000)
-    async with db_pool.connection() as db_conn:
-        async with db_conn.cursor() as cursor:
-            await cursor.execute(READ_ROW_SQL, (row_id,))
-            number = await cursor.fetchone()
-    return bs.json({'id': row_id, 'randomNumber': number[1]})
+    async with db_pool.acquire() as connection:
+        number = await connection.fetch_val(
+            READ_ROW_SQL, [row_id]
+        )
+    return bs.json({'id': row_id, 'randomNumber': number})
 
 
 @bs.get('/queries')
 @bs.get('/queries')
 async def multiple_db_queries_test(request):
 async def multiple_db_queries_test(request):
     num_queries = get_num_queries(request)
     num_queries = get_num_queries(request)
     row_ids = random.sample(range(1, 10000), num_queries)
     row_ids = random.sample(range(1, 10000), num_queries)
     worlds = []
     worlds = []
-    async with db_pool.connection() as db_conn:
-        async with db_conn.cursor() as cursor:
-            for row_id in row_ids:
-                await cursor.execute(READ_ROW_SQL, (row_id,))
-                number = await cursor.fetchone()
-                worlds.append({"id": row_id, "randomNumber": number[1]})
+    async with db_pool.acquire() as connection:
+        for row_id in row_ids:
+            number = await connection.fetch_val(
+                READ_ROW_SQL, [row_id]
+            )
+            worlds.append({"id": row_id, "randomNumber": number})
     return bs.json(worlds)
     return bs.json(worlds)
 
 
 @bs.get('/fortunes')
 @bs.get('/fortunes')
 async def fortunes_test(request):
 async def fortunes_test(request):
-    async with db_pool.connection() as db_conn:
-        async with db_conn.cursor() as cursor:
-            await cursor.execute("SELECT * FROM Fortune")
-            fortunes = await cursor.fetchall()
+    async with db_pool.acquire() as connection:
+        fortunes_fetch = await connection.fetch("SELECT * FROM Fortune")
+        # fortunes = fortunes_fetch.result()
+        fortunes = [list(item.values()) for item in fortunes_fetch.result()]
     fortunes.append(ADDITIONAL_ROW)
     fortunes.append(ADDITIONAL_ROW)
     fortunes.sort(key=lambda row: row[1])
     fortunes.sort(key=lambda row: row[1])
     data = fortune_template.render(fortunes=fortunes)
     data = fortune_template.render(fortunes=fortunes)
@@ -108,26 +110,12 @@ async def db_updates_test(request):
         random.sample(range(1, 10000), num_queries)
         random.sample(range(1, 10000), num_queries)
     ), key=lambda x: x[1])
     ), key=lambda x: x[1])
     worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates]
     worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates]
-    for _ in range(5):
-        async with db_pool.connection() as db_conn:
-            try:
-                await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
-                async with db_conn.cursor() as cursor:
-                    for row_id, number in updates:
-                        await cursor.execute(READ_ROW_SQL, (row_id,))
-                        await cursor.fetchone()
-                    for _ in range(5):
-                        try:
-                            await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
-                            return bs.json(worlds)
-                        except psycopg.errors.DeadlockDetected:
-                            await db_conn.rollback()
-                            continue
-                    # await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
-            except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted):
-                await db_conn.rollback()
-                continue
-    raise Exception("connect error")
+    async with db_pool.acquire() as connection:
+        for row_id, _ in updates:
+            await connection.fetch_val(READ_ROW_SQL, [row_id])
+        # await db_conn.executemany(WRITE_ROW_SQL, updates)
+        await connection.execute_many(WRITE_ROW_SQL, updates)
+    return bs.json(worlds)
 
 
 @bs.get('/plaintext')
 @bs.get('/plaintext')
 async def plaintext_test(request):
 async def plaintext_test(request):

+ 1 - 1
frameworks/Python/blacksheep/requirements-pypy.txt

@@ -1,3 +1,3 @@
-psycopg[pool]
+psqlpy
 git+https://github.com/cirospaciari/socketify.py.git@main#egg=socketify
 git+https://github.com/cirospaciari/socketify.py.git@main#egg=socketify
 h11
 h11