Browse Source

[Python] FastWSGI: add ASGI support and async DB tests (#8184)

Oleg S 2 years ago
parent
commit
3b27a3020f

+ 280 - 0
frameworks/Python/fastwsgi/app-asgi.py

@@ -0,0 +1,280 @@
+import os
+import sys
+import asyncio
+import asyncpg
+import jinja2
+import random
+from operator import itemgetter
+from urllib.parse import parse_qs
+import asyncache
+import cachetools
+
+try:
+    from ujson import dumps as jsonify
+except:
+    from json import dumps as jsonify
+
+
+db_pool = None
+
+async def db_setup():
+    global db_pool
+    db_pool = await asyncpg.create_pool(
+        user=os.getenv('PGUSER', 'benchmarkdbuser'),
+        password=os.getenv('PGPASS', 'benchmarkdbpass'),
+        database='hello_world',
+        host='tfb-database',
+        port=5432
+    )
+
+READ_ROW_SQL = 'SELECT "randomnumber", "id" FROM "world" WHERE id = $1'
+WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
+ADDITIONAL_ROW = [0, 'Additional fortune added at request time.']
+
+JSON_RESPONSE = {
+    'type': 'http.response.start',
+    'status': 200,
+    'headers': [
+        [b'Content-Type', b'application/json'],
+    ]
+}
+
+HTML_RESPONSE = {
+    'type': 'http.response.start',
+    'status': 200,
+    'headers': [
+        [b'Content-Type', b'text/html; charset=utf-8'],
+    ]
+}
+
+PLAINTEXT_RESPONSE = {
+    'type': 'http.response.start',
+    'status': 200,
+    'headers': [
+        [b'Content-Type', b'text/plain; charset=utf-8'],
+    ]
+}
+
+
+def get_num_queries(scope, name = b'queries'):
+    try:
+        query_string = scope['query_string']
+        query_count = int(parse_qs(query_string)[name][0])
+    except (KeyError, IndexError, ValueError):
+        return 1
+    if query_count < 1:
+        return 1
+    if query_count > 500:
+        return 500
+    return query_count
+
+
+async def json_serialization(scope, receive, send):
+    content = jsonify( {'message': 'Hello, world!'} )
+    await send(JSON_RESPONSE)
+    await send({
+        'type': 'http.response.body',
+        'body': content.encode('utf8'),
+        'more_body': False
+    })
+
+
+async def single_database_query(scope, receive, send):
+    row_id = random.randint(1, 10000)
+    db_conn = await db_pool.acquire()
+    try:
+        number = await db_conn.fetchval(READ_ROW_SQL, row_id)
+        world = {'id': row_id, 'randomNumber': number}
+    finally:
+        await db_pool.release(db_conn)
+
+    content = jsonify(world)
+    await send(JSON_RESPONSE)
+    await send({
+        'type': 'http.response.body',
+        'body': content.encode('utf8'),
+        'more_body': False
+    })
+
+
+async def multiple_database_queries(scope, receive, send):
+    num_queries = get_num_queries(scope)
+    row_ids = random.sample(range(1, 10000), num_queries)
+    worlds = [ ]
+
+    db_conn = await db_pool.acquire()
+    try:
+        statement = await db_conn.prepare(READ_ROW_SQL)
+        for row_id in row_ids:
+            number = await statement.fetchval(row_id)
+            worlds.append( {'id': row_id, 'randomNumber': number} )
+    finally:
+        await db_pool.release(db_conn)
+
+    content = jsonify(worlds)
+    await send(JSON_RESPONSE)
+    await send({
+        'type': 'http.response.body',
+        'body': content.encode('utf8'),
+        'more_body': False
+    })
+
+
+_get_item1 = itemgetter(1)
+fortunes_template = None
+fn = os.path.join('templates', 'fortune.html')
+with open(fn, 'r') as file:
+    text = file.read()
+    fortunes_template = jinja2.Template(text)
+
+
+async def fortunes(scope, receive, send):
+    db_conn = await db_pool.acquire()
+    try:
+        fortunes = await db_conn.fetch('SELECT * FROM Fortune')
+    finally:
+        await db_pool.release(db_conn)
+
+    fortunes.append(ADDITIONAL_ROW)
+    fortunes.sort(key = _get_item1)
+    content = fortunes_template.render(fortunes=fortunes)
+    await send(HTML_RESPONSE)
+    await send({
+        'type': 'http.response.body',
+        'body': content.encode('utf8'),
+        'more_body': False
+    })
+
+
+async def database_updates(scope, receive, send):
+    num_queries = get_num_queries(scope)
+    ids = sorted(random.sample(range(1, 10000 + 1), num_queries))
+    numbers = sorted(random.sample(range(1, 10000), num_queries))
+    updates = list(zip(ids, numbers))
+    
+    worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
+
+    db_conn = await db_pool.acquire()
+    try:
+        statement = await db_conn.prepare(READ_ROW_SQL)
+        for row_id, _ in updates:
+            await statement.fetchval(row_id)
+        await db_conn.executemany(WRITE_ROW_SQL, updates)
+    finally:
+        await db_pool.release(db_conn)
+
+    content = jsonify(worlds)
+    await send(JSON_RESPONSE)
+    await send({
+        'type': 'http.response.body',
+        'body': content.encode('utf8'),
+        'more_body': False
+    })
+
+
+from asyncache import cached
+from cachetools.keys import hashkey
+
+@cached(cache={}, key=lambda stmt, id: hashkey(id))
+async def get_cached_world(stmt, id):
+    result = await stmt.fetchrow(id)
+    return {'id': result[1], 'randomNumber': result[0]}
+
+async def cached_queries(scope, receive, send):
+    count = get_num_queries(scope, b'count')
+    row_ids = random.sample(range(1, 10000 + 1), count)
+    
+    db_conn = await db_pool.acquire()
+    try:
+        statement = await db_conn.prepare(READ_ROW_SQL)
+        worlds = [ await get_cached_world(statement, id) for id in row_ids ]
+    finally:
+        await db_pool.release(db_conn)
+
+    content = jsonify(worlds)
+    await send(JSON_RESPONSE)
+    await send({
+        'type': 'http.response.body',
+        'body': content.encode('utf8'),
+        'more_body': False
+    })
+
+
+async def plaintext(scope, receive, send):
+    await send(PLAINTEXT_RESPONSE)
+    await send({
+        'type': 'http.response.body',
+        'body': b'Hello, world!',
+        'more_body': False
+    })
+
+
+async def handle_404(scope, receive, send):
+    await send(PLAINTEXT_RESPONSE)
+    await send({
+        'type': 'http.response.body',
+        'body': b'Not found',
+        'more_body': False
+    })
+
+
+routes = {
+    '/json': json_serialization,
+    '/db': single_database_query,
+    '/queries': multiple_database_queries,
+    '/fortunes': fortunes,
+    '/updates': database_updates,
+    '/cached-queries': cached_queries,
+    '/plaintext': plaintext,
+}
+
+
+async def app(scope, receive, send):
+    if scope['type'] == 'lifespan':
+        global db_pool
+        while True:
+            message = await receive()
+            if message['type'] == 'lifespan.startup':
+                await db_setup()
+                await send({'type': 'lifespan.startup.complete'})
+            elif message['type'] == 'lifespan.shutdown':
+                db_pool.close()
+                await send({'type': 'lifespan.shutdown.complete'})
+                return            
+    else:
+        path = scope['path']
+        app_handler = routes.get(path, handle_404)
+        await app_handler(scope, receive, send)
+
+# -----------------------------------------------------------------------------------------------------
+
+if __name__ == "__main__":
+    import multiprocessing
+    import fastwsgi
+
+    _is_travis = os.environ.get('TRAVIS') == 'true'
+
+    workers = int(multiprocessing.cpu_count())
+    if _is_travis:
+        workers = 2
+
+    host = '0.0.0.0'
+    port = 3000
+
+    def run_app():
+        loop = asyncio.get_event_loop()
+        loop.run_until_complete(db_setup())
+        fastwsgi.server.backlog = 4096
+        fastwsgi.run(app, host, port, loglevel=2)
+
+    def create_fork():
+        n = os.fork()
+        # n greater than 0 means parent process
+        if not n > 0:
+            run_app()
+
+    # fork limiting the cpu count - 1
+    for i in range(1, workers):
+        create_fork()
+
+    run_app()  # run app on the main process too :)

+ 4 - 2
frameworks/Python/fastwsgi/app.py

@@ -9,7 +9,8 @@ except:
 
 def app(environ, start_response):
     path = environ["PATH_INFO"]
-    headers = [ ('Server', 'FastWSGI') ]
+    headers = [ ]
+    #headers = [ ('Server', 'FastWSGI') ]
     
     if path == "/json":
         headers.append( ('Content-Type', 'application/json') )
@@ -38,7 +39,8 @@ if __name__ == "__main__":
     port = 3000
 
     def run_app():
-        fastwsgi.run(app, host, port, loglevel=0)
+        fastwsgi.server.backlog = 4096
+        fastwsgi.run(app, host, port, loglevel=2)
 
     def create_fork():
         n = os.fork()

+ 25 - 1
frameworks/Python/fastwsgi/benchmark_config.json

@@ -12,11 +12,35 @@
             "language": "Python",
             "flavor": "Python3",
             "orm": "Raw",
-            "platform": "None",
+            "platform": "WSGI",
             "webserver": "FastWSGI",
             "os": "Linux",
             "database_os": "Linux",
             "display_name": "FastWSGI",
+            "notes": "",
+            "versus": "wsgi"
+        },
+        "asgi": {
+            "json_url": "/json",
+            "plaintext_url": "/plaintext",
+            "db_url": "/db",
+            "query_url": "/queries?queries=",
+            "update_url": "/updates?queries=",
+            "cached_query_url": "/cached-queries?count=",
+            "fortune_url": "/fortunes",
+            "port": 3000,
+            "approach": "Realistic",
+            "classification": "Micro",
+            "database": "Postgres",
+            "framework": "None",
+            "language": "Python",
+            "flavor": "Python3",
+            "orm": "Raw",
+            "platform": "ASGI",
+            "webserver": "FastWSGI",
+            "os": "Linux",
+            "database_os": "Linux",
+            "display_name": "FastWSGI [ASGI]",
             "notes": ""
         }
     }]

+ 19 - 1
frameworks/Python/fastwsgi/config.toml

@@ -10,6 +10,24 @@ database = "None"
 database_os = "Linux"
 os = "Linux"
 orm = "Raw"
-platform = "None"
+platform = "WSGI"
+webserver = "FastWSGI"
+versus = "wsgi"
+
+[asgi]
+urls.json = "/json"
+urls.plaintext = "/plaintext"
+urls.db = "/db"
+urls.query = "/queries?queries="
+urls.update = "/updates?queries="
+urls.cached_query = "/cached-queries?count="
+urls.fortune = "/fortunes"
+approach = "Realistic"
+classification = "Platform"
+database = "Postgres"
+database_os = "Linux"
+os = "Linux"
+orm = "Raw"
+platform = "ASGI"
 webserver = "FastWSGI"
 versus = "None"

+ 13 - 0
frameworks/Python/fastwsgi/fastwsgi-asgi.dockerfile

@@ -0,0 +1,13 @@
+FROM python:3.11-bullseye
+
+WORKDIR /usr/src/app
+
+COPY requirements.txt ./
+RUN pip3 install -U pip
+RUN pip3 install --no-cache-dir -r requirements.txt
+
+COPY . .
+
+EXPOSE 3000
+
+CMD python ./app-asgi.py

+ 2 - 3
frameworks/Python/fastwsgi/fastwsgi.dockerfile

@@ -3,9 +3,8 @@ FROM python:3.11-bullseye
 WORKDIR /usr/src/app
 
 COPY requirements.txt ./
-RUN apt-get update
-RUN pip install --no-cache-dir ujson
-RUN pip install --no-cache-dir -r requirements.txt
+RUN pip3 install -U pip
+RUN pip3 install --no-cache-dir -r requirements.txt
 
 COPY . .
 

+ 6 - 1
frameworks/Python/fastwsgi/requirements.txt

@@ -1,3 +1,8 @@
 click==8.0.1
 ujson==5.7.0
-fastwsgi==0.0.9
+#fastwsgi==0.0.9
+git+https://github.com/remittor-pr/fastwsgi.git@6093fceaac44ffdfaffd577dd2b04ad4a4c6e3bf
+asyncpg==0.27.0
+Jinja2==3.1.2
+cachetools==5.3.0
+asyncache==0.3.1

+ 10 - 0
frameworks/Python/fastwsgi/templates/fortune.html

@@ -0,0 +1,10 @@
+<!DOCTYPE html>
+<html>
+<head><title>Fortunes</title></head>
+<body>
+<table>
+<tr><th>id</th><th>message</th></tr>
+{% for fortune in fortunes %}<tr><td>{{ fortune[0] }}</td><td>{{ fortune[1]|e }}</td></tr>
+{% endfor %}</table>
+</body>
+</html>