Browse Source

[Python/Blacksheep] Adjust the unit startup mode (#9886)

* Adjust the unit startup mode

* fix

* try fix

* try fix

* try fix

* try fix

* try fix

* rollback

* adjust

* fix

* try change

* try change

* fix

* Tuning

* fix

* try fix

* try fix

* add pypy

* try fix pypy

* fix

* fix: app.py -> app-socketify.py

* fix: _is_travis

* fix error

* try fix

* try fix

* try fix

* try fix

* try optimization

* try fix

* rollback

* update socketify and fix query
nazo 2 months ago
parent
commit
ca99ef19da

+ 152 - 0
frameworks/Python/blacksheep/app-socketify.py

@@ -0,0 +1,152 @@
+import multiprocessing
+import os
+import psycopg
+import platform
+import random
+import asyncio
+import blacksheep as bs
+import jinja2
+from pathlib import Path
+from psycopg_pool import AsyncConnectionPool
+
+READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
+WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
+ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
+CORE_COUNT = multiprocessing.cpu_count()
+
+MAX_POOL_SIZE = CORE_COUNT * 2
+MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2)
+db_pool = None
+
+async def setup_db(app):
+    global db_pool
+    conninfo = (
+        f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}"
+        f"@tfb-database:5432/hello_world"
+    )
+    db_pool = AsyncConnectionPool(
+        conninfo=conninfo,
+        min_size=MIN_POOL_SIZE,
+        max_size=MAX_POOL_SIZE,
+        open=False,
+        timeout=5.0,
+        max_lifetime=1800,
+    )
+    await db_pool.open()
+
+async def shutdown_db(app):
+    global db_pool
+    if db_pool is not None:
+        await db_pool.close()
+        db_pool = None
+
+def load_fortunes_template():
+    with Path("templates/fortune.html").open("r") as f:
+        return jinja2.Template(f.read())
+
+fortune_template = load_fortunes_template()
+
+app = bs.Application()
+app.on_start += setup_db
+app.on_stop += shutdown_db
+
+def get_num_queries(request):
+    try:
+        value = request.query.get('queries')
+        if value is None:
+            return 1
+        query_count = int(value[0])
+    except (KeyError, IndexError, ValueError):
+        return 1
+    return min(max(query_count, 1), 500)
+
+JSON_CONTENT_TYPE = b"application/json"
+
[email protected]('/json')
+async def json_test(request):
+    return bs.json({'message': 'Hello, world!'})
+
[email protected]('/db')
+async def single_db_query_test(request):
+    row_id = random.randint(1, 10000)
+    async with db_pool.connection() as db_conn:
+        async with db_conn.cursor() as cursor:
+            await cursor.execute(READ_ROW_SQL, (row_id,))
+            number = await cursor.fetchone()
+    return bs.json({'id': row_id, 'randomNumber': number[1]})
+
[email protected]('/queries')
+async def multiple_db_queries_test(request):
+    num_queries = get_num_queries(request)
+    row_ids = random.sample(range(1, 10000), num_queries)
+    worlds = []
+    async with db_pool.connection() as db_conn:
+        async with db_conn.cursor() as cursor:
+            for row_id in row_ids:
+                await cursor.execute(READ_ROW_SQL, (row_id,))
+                number = await cursor.fetchone()
+                worlds.append({"id": row_id, "randomNumber": number[1]})
+    return bs.json(worlds)
+
[email protected]('/fortunes')
+async def fortunes_test(request):
+    async with db_pool.connection() as db_conn:
+        async with db_conn.cursor() as cursor:
+            await cursor.execute("SELECT * FROM Fortune")
+            fortunes = await cursor.fetchall()
+    fortunes.append(ADDITIONAL_ROW)
+    fortunes.sort(key=lambda row: row[1])
+    data = fortune_template.render(fortunes=fortunes)
+    return bs.html(data)
+
[email protected]('/updates')
+async def db_updates_test(request):
+    num_queries = get_num_queries(request)
+    updates = sorted(zip(
+        random.sample(range(1, 10000), num_queries),
+        random.sample(range(1, 10000), num_queries)
+    ), key=lambda x: x[1])
+    worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates]
+    for _ in range(5):
+        async with db_pool.connection() as db_conn:
+            try:
+                await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
+                async with db_conn.cursor() as cursor:
+                    for row_id, number in updates:
+                        await cursor.execute(READ_ROW_SQL, (row_id,))
+                        await cursor.fetchone()
+                    for _ in range(5):
+                        try:
+                            await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
+                            return bs.json(worlds)
+                        except psycopg.errors.DeadlockDetected:
+                            await db_conn.rollback()
+                            continue
+                    # await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
+            except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted):
+                await db_conn.rollback()
+                continue
+    raise Exception("connect error")
+
[email protected]('/plaintext')
+async def plaintext_test(request):
+    return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
+
+if platform.python_implementation() == 'PyPy':
+    import logging
+    from socketify import ASGI
+    workers = int(multiprocessing.cpu_count())
+    if os.environ.get('TRAVIS') == 'true':
+        workers = 2
+
+    def run_app():
+        ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run()
+
+    def create_fork():
+        n = os.fork()
+        if not n > 0:
+            run_app()
+
+    for i in range(1, workers):
+        create_fork()
+    run_app()

+ 48 - 24
frameworks/Python/blacksheep/app.py

@@ -1,28 +1,29 @@
 import multiprocessing
 import multiprocessing
 import os
 import os
 import asyncpg
 import asyncpg
+import platform
 import random
 import random
 import asyncio
 import asyncio
-from operator import itemgetter
 import blacksheep as bs
 import blacksheep as bs
 import jinja2
 import jinja2
 import msgspec
 import msgspec
 from pathlib import Path
 from pathlib import Path
-
-READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
-WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
-ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
-MAX_POOL_SIZE = 1000 // multiprocessing.cpu_count()
-MIN_POOL_SIZE = max(int(MAX_POOL_SIZE / 2), 1)
-db_pool = None
-key = itemgetter(1)
-
 try:
 try:
     import uvloop
     import uvloop
     asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
     asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
 except Exception:
 except Exception:
     ...
     ...
 
 
+READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
+WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
+ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
+MAX_CONNECTIONS = 1900
+CORE_COUNT = multiprocessing.cpu_count()
+MAX_POOL_SIZE = max(1,int(os.getenv('MAX_POOL_SIZE', MAX_CONNECTIONS // CORE_COUNT)))
+MIN_POOL_SIZE = max(1,int(os.getenv('MIN_POOL_SIZE', MAX_POOL_SIZE // 2)))
+
+db_pool = None
+
 async def setup_db(app):
 async def setup_db(app):
     global db_pool
     global db_pool
     db_pool = await asyncpg.create_pool(
     db_pool = await asyncpg.create_pool(
@@ -35,6 +36,12 @@ async def setup_db(app):
         max_size=MAX_POOL_SIZE,
         max_size=MAX_POOL_SIZE,
     )
     )
 
 
+async def shutdown_db(app):
+    """Close asyncpg connection pool for the current process."""
+    global db_pool
+    if db_pool is not None:
+        await db_pool.close()
+        db_pool = None
 
 
 def load_fortunes_template():
 def load_fortunes_template():
     with Path("templates/fortune.html").open("r") as f:
     with Path("templates/fortune.html").open("r") as f:
@@ -45,7 +52,7 @@ fortune_template = load_fortunes_template()
 
 
 app = bs.Application()
 app = bs.Application()
 app.on_start += setup_db
 app.on_start += setup_db
-
+app.on_stop += shutdown_db
 
 
 def get_num_queries(request):
 def get_num_queries(request):
     try:
     try:
@@ -55,14 +62,9 @@ def get_num_queries(request):
         query_count = int(value[0])
         query_count = int(value[0])
     except (KeyError, IndexError, ValueError):
     except (KeyError, IndexError, ValueError):
         return 1
         return 1
-    if query_count < 1:
-        return 1
-    if query_count > 500:
-        return 500
-    return query_count
+    return min(max(query_count, 1), 500)
 
 
 ENCODER = msgspec.json.Encoder()
 ENCODER = msgspec.json.Encoder()
-DECODER = msgspec.json.Decoder()
 JSON_CONTENT_TYPE = b"application/json"
 JSON_CONTENT_TYPE = b"application/json"
 def jsonify(
 def jsonify(
     data,
     data,
@@ -122,7 +124,7 @@ async def fortunes_test(request):
         fortunes = await db_conn.fetch("SELECT * FROM Fortune")
         fortunes = await db_conn.fetch("SELECT * FROM Fortune")
 
 
     fortunes.append(ADDITIONAL_ROW)
     fortunes.append(ADDITIONAL_ROW)
-    fortunes.sort(key = key)
+    fortunes.sort(key=lambda row: row[1])
     data = fortune_template.render(fortunes=fortunes)
     data = fortune_template.render(fortunes=fortunes)
     return bs.html(data)
     return bs.html(data)
 
 
@@ -130,18 +132,17 @@ async def fortunes_test(request):
 @bs.get('/updates')
 @bs.get('/updates')
 async def db_updates_test(request):
 async def db_updates_test(request):
     num_queries = get_num_queries(request)
     num_queries = get_num_queries(request)
-    ids = sorted(random.sample(range(1, 10000 + 1), num_queries))
-    numbers = sorted(random.sample(range(1, 10000), num_queries))
-    updates = list(zip(ids, numbers))
-
-    # worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
+    updates = list(zip(
+        random.sample(range(1, 10000), num_queries),
+        sorted(random.sample(range(1, 10000), num_queries))
+    ))
     worlds = [Result(id=row_id, randomNumber=number) for row_id, number in updates]
     worlds = [Result(id=row_id, randomNumber=number) for row_id, number in updates]
+    # worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
     async with db_pool.acquire() as db_conn:
     async with db_pool.acquire() as db_conn:
         statement = await db_conn.prepare(READ_ROW_SQL)
         statement = await db_conn.prepare(READ_ROW_SQL)
         for row_id, _ in updates:
         for row_id, _ in updates:
             await statement.fetchval(row_id)
             await statement.fetchval(row_id)
         await db_conn.executemany(WRITE_ROW_SQL, updates)
         await db_conn.executemany(WRITE_ROW_SQL, updates)
- 
     return jsonify(worlds)
     return jsonify(worlds)
 
 
 
 
@@ -150,3 +151,26 @@ async def plaintext_test(request):
     return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
     return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
     #return bs.text('Hello, World!')
     #return bs.text('Hello, World!')
 
 
+
+if platform.python_implementation() == 'PyPy':
+    from socketify import ASGI
+    workers = int(multiprocessing.cpu_count())
+    if _is_travis:
+        workers = 2
+
+    def run_app():
+        ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run()
+
+
+    def create_fork():
+        n = os.fork()
+        # n greater than 0 means parent process
+        if not n > 0:
+            run_app()
+
+
+    # fork limiting the cpu count - 1
+    for i in range(1, workers):
+        create_fork()
+
+    run_app()

+ 23 - 0
frameworks/Python/blacksheep/benchmark_config.json

@@ -46,6 +46,29 @@
       "display_name": "blacksheep-nginx-unit",
       "display_name": "blacksheep-nginx-unit",
       "versus": "None",      
       "versus": "None",      
       "notes": ""
       "notes": ""
+    },
+    "pypy-socketify": {
+      "json_url": "/json",
+      "fortune_url": "/fortunes",
+      "plaintext_url": "/plaintext",
+      "db_url": "/db",
+      "query_url": "/queries?queries=",
+      "update_url": "/updates?queries=",
+      "port": 8080,
+      "approach": "Realistic",
+      "classification": "Micro",
+      "framework": "blacksheep",
+      "language": "Python",
+      "flavor": "Python3",
+      "platform": "ASGI",
+      "webserver": "Socketify.py",
+      "os": "Linux",
+      "orm": "Raw",
+      "database_os": "Linux",
+      "database": "Postgres",
+      "display_name": "Blacksheep [Socketify.py PyPy3]",
+      "versus": "None",      
+      "notes": ""
     }
     }
   }]
   }]
 }
 }

+ 12 - 6
frameworks/Python/blacksheep/blacksheep-nginx-unit.dockerfile

@@ -4,14 +4,20 @@ WORKDIR /blacksheep
 
 
 COPY ./ /blacksheep
 COPY ./ /blacksheep
 
 
-RUN pip3 install -U pip
-RUN pip3 install Cython==3.0.12
-RUN pip3 install -r /blacksheep/requirements.txt
-RUN pip3 install -r /blacksheep/requirements-uvicorn.txt
+RUN apt-get update; apt-get install libuv1 -y
 
 
-RUN chmod +x start-unit.sh
+RUN pip3 install -U pip -q
+RUN pip3 install Cython==3.0.12 -q
+RUN pip3 install -r /blacksheep/requirements.txt -q
+RUN pip3 install -r /blacksheep/requirements-uvicorn.txt -q
 
 
 ENV PGSSLMODE=disable
 ENV PGSSLMODE=disable
+RUN CORE_COUNT=$(nproc) && \
+    sed -i "s|\"processes\": [0-9]*|\"processes\": $CORE_COUNT|g" /blacksheep/unit-config.json
 
 
+RUN chmod +x start-unit.sh
+ENTRYPOINT []
 EXPOSE 8080
 EXPOSE 8080
-CMD ["./start-unit.sh"]
+
+# CMD ["unitd", "--no-daemon", "--control", "unix:/var/run/control.unit.sock"]
+CMD ["./start-unit.sh"]

+ 15 - 0
frameworks/Python/blacksheep/blacksheep-pypy-socketify.dockerfile

@@ -0,0 +1,15 @@
+FROM pypy:3.11-bookworm
+
+ADD ./ /blacksheep
+
+WORKDIR /blacksheep
+
+RUN apt-get update; apt-get install libuv1 libpq5 -y
+
+RUN pip3 install -r /blacksheep/requirements.txt
+RUN pip3 install -r /blacksheep/requirements-pypy.txt
+
+EXPOSE 8080
+
+CMD python ./app-socketify.py
+

+ 7 - 6
frameworks/Python/blacksheep/blacksheep.dockerfile

@@ -4,12 +4,13 @@ WORKDIR /blacksheep
 
 
 COPY ./ /blacksheep
 COPY ./ /blacksheep
 
 
-RUN pip3 install -U pip
-RUN pip3 install Cython==3.0.12
-RUN pip3 install -r /blacksheep/requirements.txt
-RUN pip3 install -r /blacksheep/requirements-gunicorn.txt
-RUN pip3 install -r /blacksheep/requirements-uvicorn.txt
+RUN apt-get update; apt-get install libuv1 -y
 
 
+RUN pip3 install -U pip -q
+RUN pip3 install Cython==3.0.12 -q
+RUN pip3 install -r /blacksheep/requirements.txt -q
+RUN pip3 install -r /blacksheep/requirements-uvicorn.txt -q
+ENV GUNICORN=1
 EXPOSE 8080
 EXPOSE 8080
 
 
-CMD gunicorn app:app -k uvicorn.workers.UvicornWorker -c blacksheep_conf.py
+CMD gunicorn app:app -k uvicorn_worker.UvicornWorker -c blacksheep_conf.py

+ 7 - 2
frameworks/Python/blacksheep/blacksheep_conf.py

@@ -2,13 +2,18 @@ import multiprocessing
 import os
 import os
 
 
 _is_travis = os.environ.get('TRAVIS') == 'true'
 _is_travis = os.environ.get('TRAVIS') == 'true'
+CPU_CORES = multiprocessing.cpu_count()
+MAX_CONNECTIONS = 1900 
+CONNECTIONS_PER_WORKER = 100
+max_pg_workers = MAX_CONNECTIONS // CONNECTIONS_PER_WORKER
 
 
-workers = multiprocessing.cpu_count()
+workers = CPU_CORES
 if _is_travis:
 if _is_travis:
     workers = 2
     workers = 2
 
 
 bind = "0.0.0.0:8080"
 bind = "0.0.0.0:8080"
-keepalive = 120
+keepalive = 1
+timeout = 0
 errorlog = '-'
 errorlog = '-'
 pidfile = '/tmp/blacksheep.pid'
 pidfile = '/tmp/blacksheep.pid'
 loglevel = 'error'
 loglevel = 'error'

+ 18 - 0
frameworks/Python/blacksheep/config.toml

@@ -35,3 +35,21 @@ orm = "Raw"
 platform = "ASGI"
 platform = "ASGI"
 webserver = "nginx-unit"
 webserver = "nginx-unit"
 versus = "None"
 versus = "None"
+
+
+[pypy-socketify]
+urls.plaintext = "/plaintext"
+urls.json = "/json"
+urls.db = "/db"
+urls.query = "/queries?queries="
+urls.update = "/updates?queries="
+urls.fortune = "/fortunes"
+approach = "Realistic"
+classification = "Platform"
+database = "Postgres"
+database_os = "Linux"
+os = "Linux"
+orm = "Raw"
+platform = "ASGI"
+webserver = "Socketify.py"
+versus = "None"

+ 0 - 1
frameworks/Python/blacksheep/requirements-gunicorn.txt

@@ -1 +0,0 @@
-gunicorn==23.0.0

+ 0 - 1
frameworks/Python/blacksheep/requirements-hypercorn.txt

@@ -1 +0,0 @@
-hypercorn==0.17.3

+ 3 - 0
frameworks/Python/blacksheep/requirements-pypy.txt

@@ -0,0 +1,3 @@
+psycopg[pool]
+git+https://github.com/cirospaciari/socketify.py.git@main#egg=socketify
+h11

+ 5 - 1
frameworks/Python/blacksheep/requirements-uvicorn.txt

@@ -1,3 +1,7 @@
 uvloop==0.21.0
 uvloop==0.21.0
-uvicorn==0.34.1
+uvicorn==0.34.2
+uvicorn-worker
 httptools==0.6.4
 httptools==0.6.4
+gunicorn==23.0.0
+msgspec==0.19.0
+asyncpg==0.30.0

+ 2 - 3
frameworks/Python/blacksheep/requirements.txt

@@ -1,4 +1,3 @@
-asyncpg==0.30.0
 Jinja2==3.1.6
 Jinja2==3.1.6
-blacksheep==2.1.0
-msgspec==0.19.0
+blacksheep==2.3.1
+psycopg

+ 7 - 5
frameworks/Python/blacksheep/start-unit.sh

@@ -1,16 +1,18 @@
 #!/usr/bin/env bash
 #!/usr/bin/env bash
+CORE_COUNT=$(nproc)
+sysctl -w net.core.somaxconn=65535
+sysctl -w net.ipv4.tcp_max_syn_backlog=65535
+ulimit -n 65535
+taskset -c 0-$(($CORE_COUNT-1)) unitd --no-daemon --control unix:/var/run/control.unit.sock &
 
 
-NPROC=$(nproc)
-sed "s/{{NPROC}}/$NPROC/" unit-config.template.json > nginx-unit-config.json
-
-unitd --no-daemon --control unix:/var/run/control.unit.sock &
+# unitd --no-daemon --control unix:/var/run/control.unit.sock &
 
 
 # wait UNIT started
 # wait UNIT started
 sleep 1
 sleep 1
 
 
 # PUT configure
 # PUT configure
 curl -X PUT \
 curl -X PUT \
-     --data-binary @nginx-unit-config.json \
+     --data-binary @unit-config.json \
      --unix-socket /var/run/control.unit.sock \
      --unix-socket /var/run/control.unit.sock \
      http://localhost/config
      http://localhost/config
 
 

+ 2 - 2
frameworks/Python/blacksheep/unit-config.template.json → frameworks/Python/blacksheep/unit-config.json

@@ -9,10 +9,10 @@
             "type": "python",
             "type": "python",
             "path": "/blacksheep",
             "path": "/blacksheep",
             "working_directory": "/blacksheep",
             "working_directory": "/blacksheep",
+            "processes": 14,
             "protocol": "asgi",
             "protocol": "asgi",
             "module": "app",
             "module": "app",
-            "callable": "app",
-            "processes": {{NPROC}}
+            "callable": "app"
         }
         }
     },
     },
     "access_log": "/dev/null"
     "access_log": "/dev/null"