Parcourir la source

Fix installation of vibora & replace psycopg2 with asyncpg as the PG driver (#7578)

* fix vibora's failure to install

* Vibora:replace psycopg2 with asyncpg;fix fortune query

Co-authored-by: IterableTrucks <[email protected]>
IterableTrucks il y a 2 ans
Parent
commit
b5efea5eb5

+ 56 - 39
frameworks/Python/vibora/app.py

@@ -1,10 +1,13 @@
-from vibora import Vibora
-from vibora.request import Request
-from vibora.responses import JsonResponse, Response
-from vibora.hooks import Events
+import multiprocessing
+import os
 from random import randint
 from operator import itemgetter
-import psycopg2
+
+from vibora import Vibora, Request, JsonResponse, Response
+from vibora.hooks import Events
+
+from db import Pool
+DEFAULT_POOL_SIZE = 1000//multiprocessing.cpu_count()
 
 READ_ROW_SQL = 'SELECT * FROM "world" WHERE id={0}'
 WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"={0} WHERE id={1} RETURNING id, randomNumber'
@@ -12,17 +15,26 @@ READ_ALL_FORTUNES = 'SELECT * FROM "fortune"'
 ADDITIONAL_ROW = [0, 'Additional fortune added at request time.']
 sort_fortunes_key = itemgetter(1)
 
-app = Vibora()
-con = psycopg2.connect("dbname=hello_world user=benchmarkdbuser host=tfb-database password=benchmarkdbpass")
-cur = con.cursor()
+app = Vibora(template_dirs=['templates'])
+
+
[email protected](Events.BEFORE_SERVER_START)
+async def init_db(app: Vibora):
+    app.components.add(await Pool("postgresql://%s:%s@%s:5432/%s" % (os.getenv("PGUSER", "benchmarkdbuser"), os.getenv("PSPASS", "benchmarkdbpass"), os.getenv("PGADDR", "tfb-database"), os.getenv("PGDB", "hello_world")), max_size=int(os.getenv("PGPOOLSIZE", DEFAULT_POOL_SIZE))))
+
+
[email protected](Events.BEFORE_SERVER_STOP)
+async def close_db(app: Vibora):
+    await asyncio.wait_for(app.components.get(Pool).close(), timeout=10)
+
 
 def getQueriesTotal(params):
     try:
-        queries = params['queries']
+        queries = params['queries'][0]
         query_count = int(queries)
     except:
         return 1
-    
+
     if query_count < 1:
         return 1
     if query_count > 500:
@@ -30,69 +42,74 @@ def getQueriesTotal(params):
     return query_count
 
 
-async def fetchWorld():
-    rand = randint(1, 10000)
-    cur.execute(READ_ROW_SQL.format(rand))
-    res = cur.fetchone()
-    return res
+async def fetchWorld(pool):
+    async with pool.acquire() as conn:
+        return await conn.fetchrow(READ_ROW_SQL.format(randint(1, 10000)))
 
-async def updateWorld(world_id):
-    new_num_rand = randint(1, 10000)
-    cur.execute(WRITE_ROW_SQL.format(new_num_rand, world_id))
-    res = cur.fetchone()
-    return res
 
-async def fetchMultipleWorlds(total):
+async def updateWorld(world_id, pool):
+    async with pool.acquire() as conn:
+        return await conn.fetchrow(WRITE_ROW_SQL.format(randint(1, 10000), world_id))
+
+
+async def fetchMultipleWorlds(total, pool):
     worlds = []
     for x in range(total):
-        res = await fetchWorld()
+        res = await fetchWorld(pool)
         worlds.append({'id': res[0], 'randomNumber': res[1]})
     return worlds
 
-async def updateMultipleWorlds(total):
+
+async def updateMultipleWorlds(total, pool):
     worlds = []
     for x in range(total):
-        res = await fetchWorld()
-        updated = await updateWorld(res[0])
+        res = await fetchWorld(pool)
+        updated = await updateWorld(res[0], pool)
         worlds.append({'id': updated[0], 'randomNumber': updated[1]})
     return worlds
 
-async def fetchFortunes():
-    cur.execute(READ_ALL_FORTUNES)
-    res = cur.fetchall()
-    return res
+
+async def fetchFortunes(pool):
+    async with pool.acquire() as conn:
+        return await conn.fetch(READ_ALL_FORTUNES)
+
 
 @app.route('/fortunes')
-async def fortunes():
-    fortunes = await fetchFortunes()
+async def fortunes(pool: Pool):
+    fortunes = await fetchFortunes(pool)
     fortunes.append(ADDITIONAL_ROW)
-    fortunes = fortunes.sort(key=sort_fortunes_key)
+    fortunes.sort(key=sort_fortunes_key)
     return await app.render('index.html', fortunes=fortunes)
 
+
 @app.route('/db')
-async def single_query(request: Request):
-    res = await fetchWorld()
+async def single_query(request: Request, pool: Pool):
+    res = await fetchWorld(pool)
     return JsonResponse({'id': res[0], 'randomNumber': res[1]}, headers={'Server': 'Vibora'})
 
+
 @app.route('/plaintext')
 async def plaintext():
     return Response(b'Hello, World!', headers={'Server': 'Vibora', 'Content-Type': 'text/plain'})
 
+
 @app.route('/json')
 async def json():
     return JsonResponse({'message': 'Hello, World!'}, headers={'Server': 'Vibora'})
 
+
 @app.route('/queries')
-async def multiple_queries(request: Request):
+async def multiple_queries(request: Request, pool: Pool):
     total_queries = getQueriesTotal(request.args)
-    worlds = await fetchMultipleWorlds(total_queries)
+    worlds = await fetchMultipleWorlds(total_queries, pool)
     return JsonResponse(worlds, headers={'Server': 'Vibora', 'Content-Type': 'application/json', 'Content-Length': str(total_queries)})
 
+
 @app.route('/updates')
-async def update_queries(request: Request):
+async def update_queries(request: Request, pool: Pool):
     total_queries = getQueriesTotal(request.args)
-    worlds = updateMultipleWorlds(total_queries)
+    worlds = await updateMultipleWorlds(total_queries, pool)
     return JsonResponse(worlds, headers={'Server': 'Vibora', 'Content-Type': 'application/json', 'Content-Length': str(total_queries)})
 
 if __name__ == '__main__':
-    app.run(host="0.0.0.0", port=8000)
+    app.run(host="0.0.0.0", port=8000, workers=multiprocessing.cpu_count())

+ 36 - 0
frameworks/Python/vibora/db.py

@@ -0,0 +1,36 @@
+import asyncio
+from contextlib import asynccontextmanager
+import asyncpg
+
+
+class Connection(asyncpg.Connection):
+    async def reset(self, *, timeout=None):
+        pass
+
+
+class Pool:
+    def __init__(self, connect_url, max_size=10, connection_class=None):
+        self._connect_url = connect_url
+        self._connection_class = connection_class or Connection
+        self._queue = asyncio.LifoQueue(max_size)
+
+    def __await__(self):
+        return self._async_init__().__await__()
+
+    async def _async_init__(self):
+        for _ in range(self._queue.maxsize):
+            self._queue.put_nowait(await asyncpg.connect(self._connect_url, connection_class=self._connection_class))
+        return self
+
+    @asynccontextmanager
+    async def acquire(self):
+        conn = await self._queue.get()
+        try:
+            yield conn
+        finally:
+            self._queue.put_nowait(conn)
+
+    async def close(self):
+        for _ in range(self._queue.maxsize):
+            conn = await self._queue.get()
+            await conn.close()

+ 2 - 9
frameworks/Python/vibora/requirements.txt

@@ -1,9 +1,2 @@
-django-jsonresponse==0.10.0
-pendulum==2.0.4
-psycopg2-binary==2.7.7
-python-dateutil==2.7.5
-pytzdata==2018.7
-six==1.12.0
-ujson==1.35
-uvloop==0.12.0
-vibora==0.0.6
+asyncpg==0.26.0
+git+https://github.com/IterableTrucks/vibora.git@f622101bbf9d4fb045b04afcb32221a5c0673ed9#egg=vibora[fast]

+ 1 - 1
frameworks/Python/vibora/vibora.dockerfile

@@ -1,4 +1,4 @@
-FROM python:3.6.6-stretch
+FROM python:3.10.6
 
 ADD ./ /vibora