Procházet zdrojové kódy

[Python] Flask: fix workers (#8169)

Oleg S před 2 roky
rodič
revize
24c6e25454

+ 92 - 74
frameworks/Python/flask/app.py

@@ -5,7 +5,7 @@ import multiprocessing
 import itertools
 from collections import namedtuple
 from operator import attrgetter, itemgetter
-from random import randint
+import random
 from email.utils import formatdate
 
 import flask
@@ -28,7 +28,7 @@ _cpu_count = multiprocessing.cpu_count()
 if _is_travis:
     _cpu_count = 2
 
-_raw_orm = os.getenv('USE_RAW', "0") == "1"
+_raw = os.getenv('USE_RAW', "0") == "1"
 
 _use_orjson = os.getenv('USE_ORJSON', "0") == "1"
 if _use_orjson:
@@ -82,13 +82,10 @@ def get_num_queries():
     return num_queries
 
 def generate_ids(num_queries):
-    ids = {randint(1, 10000) for _ in xrange(num_queries)}
-    while len(ids) < num_queries:
-        ids.add(randint(1, 10000))
-    return list(sorted(ids))
+    return random.sample(range(1, 10001), num_queries)
 
 
-if _raw_orm:
+if _raw:
     import jinja2
     
     if _is_pypy:
@@ -98,11 +95,9 @@ if _raw_orm:
         from psycopg2.pool import ThreadedConnectionPool
         from psycopg2.extras import execute_batch
     
-    pool_size = _cpu_count * 2.5
-    
     POOL = ThreadedConnectionPool(
-        minconn=int(pool_size / 4),
-        maxconn=int(pool_size / 4),
+        minconn=int(_cpu_count * 2.0),
+        maxconn=int(_cpu_count * 2.6),
         database="hello_world",
         user=DBUSER,
         password=DBPSWD,
@@ -163,58 +158,54 @@ else: # --------- PonyORM ------------------------------------------------
 # ----------------------------------------------------------------------------------------
 
 @app.route("/json")
-def hello():
-    if _use_orjson or _use_ujson:
-        return jsonify( {"message": "Hello, World!"} )
-
+def json_data():
     return flask.jsonify(message="Hello, World!")
 
 
[email protected]("/query")
-def get_random_world():
-    if _raw_orm:
-        db = POOL.getconn()
-        cursor = db.cursor()
-        num_queries = get_num_queries()
-        results = map(db_query, zip(itertools.repeat(cursor, num_queries), generate_ids(num_queries)))
-        worlds = [ {"id": result[0], "randomNumber": result[1]} for result in results ]
-        POOL.putconn(db)
-        return jsonify(worlds)
-    
-    with orm.db_session(serializable=False):
-        worlds = [World[ident].to_dict() for ident in generate_ids(get_num_queries())]
-    return jsonify(worlds)
[email protected]("/json-raw")
+def json_data_raw():
+    return jsonify( {"message": "Hello, World!"} )
 
 
 @app.route("/db")
 def get_random_world_single():
-    if _raw_orm:
-        db = POOL.getconn()
-        cursor = db.cursor()
-        cursor.execute("EXECUTE read_stmt(%s)", generate_ids(1))
-        result = cursor.fetchone()
-        world = {"id": result[0], "randomNumber": result[1]}
-        POOL.putconn(db)
-        return jsonify(world)
-    
-    wid = randint(1, 10000)
+    wid = random.randint(1, 10000)
     with orm.db_session(serializable=False):
         world = World[wid]
     return jsonify(world.to_dict())
 
 
[email protected]("/db-raw")
+def get_random_world_single_raw():
+    db = POOL.getconn()
+    cursor = db.cursor()
+    cursor.execute("EXECUTE read_stmt(%s)", generate_ids(1))
+    result = cursor.fetchone()
+    world = {"id": result[0], "randomNumber": result[1]}
+    POOL.putconn(db)
+    return jsonify(world)
+
+
[email protected]("/query")
+def get_random_world():
+    with orm.db_session(serializable=False):
+        worlds = [World[ident].to_dict() for ident in generate_ids(get_num_queries())]
+    return jsonify(worlds)
+
+
[email protected]("/query-raw")
+def get_random_world_raw():
+    db = POOL.getconn()
+    cursor = db.cursor()
+    num_queries = get_num_queries()
+    results = map(db_query, zip(itertools.repeat(cursor, num_queries), generate_ids(num_queries)))
+    worlds = [ {"id": result[0], "randomNumber": result[1]} for result in results ]
+    POOL.putconn(db)
+    return jsonify(worlds)
+
+
 @app.route("/fortunes")
 def get_fortunes():
-    if _raw_orm:
-        db = POOL.getconn()
-        cursor = db.cursor()
-        cursor.execute("EXECUTE fortune")
-        fortunes = list(cursor.fetchall())
-        fortunes.append(ADDITIONAL_ROW)
-        fortunes.sort(key=itemgetter(1))
-        POOL.putconn(db)
-        return flask.Response(FORTUNE_TEMPLATE.render(fortunes=fortunes))
-    
     with orm.db_session(serializable=False):
         fortunes = list(orm.select(fortune for fortune in Fortune))
     tmp_fortune = namedtuple("Fortune", ["id", "message"])
@@ -225,22 +216,20 @@ def get_fortunes():
     return flask.render_template("fortunes.html", fortunes=fortunes)
 
 
[email protected]("/fortunes-raw")
+def get_fortunes_raw():
+    db = POOL.getconn()
+    cursor = db.cursor()
+    cursor.execute("EXECUTE fortune")
+    fortunes = list(cursor.fetchall())
+    fortunes.append(ADDITIONAL_ROW)
+    fortunes.sort(key=itemgetter(1))
+    POOL.putconn(db)
+    return flask.Response(FORTUNE_TEMPLATE.render(fortunes=fortunes))
+
+
 @app.route("/updates")
 def updates():
-    if _raw_orm:
-        db = POOL.getconn()
-        cursor = db.cursor()
-        num_queries = get_num_queries()
-        ids = generate_ids(num_queries)
-        update_values = generate_ids(num_queries)
-        list(map(db_query, zip(itertools.repeat(cursor, num_queries), generate_ids(num_queries))))
-        worlds = list(zip(ids, update_values))
-        execute_batch(cursor, "EXECUTE write_stmt(%s, %s)", worlds)
-        db.commit()
-        POOL.putconn(db)
-        data = [ {"id": ident, "randomNumber": update} for ident, update in worlds ]
-        return jsonify(data)
-
     num_queries = get_num_queries()
     ids = generate_ids(num_queries)
     ids.sort()
@@ -248,11 +237,27 @@ def updates():
     with orm.db_session(serializable=False):
         for ident in ids:
             world = World[ident]
-            world.randomNumber = randint(1, 10000)
+            world.randomNumber = random.randint(1, 10000)
             worlds.append({"id": world.id, "randomNumber": world.randomNumber})
     return jsonify(worlds)
 
 
[email protected]("/updates-raw")
+def updates_raw():
+    db = POOL.getconn()
+    cursor = db.cursor()
+    num_queries = get_num_queries()
+    ids = generate_ids(num_queries)
+    update_values = generate_ids(num_queries)
+    list(map(db_query, zip(itertools.repeat(cursor, num_queries), generate_ids(num_queries))))
+    worlds = list(zip(ids, update_values))
+    execute_batch(cursor, "EXECUTE write_stmt(%s, %s)", worlds)
+    db.commit()
+    POOL.putconn(db)
+    data = [ {"id": ident, "randomNumber": update} for ident, update in worlds ]
+    return jsonify(data)
+
+
 @app.route("/plaintext")
 def plaintext():
     response = flask.make_response(b"Hello, World!")
@@ -264,6 +269,7 @@ def plaintext():
 if __name__ == "__main__":
     import optparse
     import logging
+    import signal
     import re
 
     parser = optparse.OptionParser("usage: %prog [options]", add_help_option=False)
@@ -275,13 +281,13 @@ if __name__ == "__main__":
     parser.add_option("-v", "--verbose", dest="verbose", default=0, type="int")
     (opt, args) = parser.parse_args() 
 
-    workers = _cpu_count
-    if workers > 0:
-        workers = opt.workers
+    workers = opt.workers if opt.workers > 0 else _cpu_count
 
     if _is_travis:
         workers = 2
 
+    worker_list = [ ]
+
     def run_app():
         global response_server
         global response_add_date
@@ -310,14 +316,26 @@ if __name__ == "__main__":
             socketify.WSGI(app).listen(opt.port, lambda config: logging.info(msg)).run()
 
     def create_fork():
-        n = os.fork()
-        # n greater than 0 means parent process
-        if not n > 0:
+        pid = os.fork()
+        if pid > 0:            
+            return pid
+        try:
             run_app()
+        except KeyboardInterrupt:
+            pass
+        sys.exit(0)
 
-    # fork limiting the cpu count - 1
-    for i in range(1, workers):
-        create_fork()
+    for i in range(0, workers):
+        pid = create_fork()
+        print("Worker process added with PID:", pid)
+        worker_list.append(pid)
 
-    run_app()  # run app on the main process too :)
+    print("Running {} workers".format(len(worker_list)))
+    try:
+        for i in range(workers):
+            os.wait()
+    except KeyboardInterrupt:
+        print("\n" + "Stopping all workers")
+        for pid in worker_list:
+            os.kill(pid, signal.SIGINT)
 

+ 15 - 19
frameworks/Python/flask/benchmark_config.json

@@ -26,11 +26,11 @@
       "tags": [ ]
     },
     "raw": {
-      "json_url": "/json",
-      "db_url": "/db",
-      "query_url": "/query?queries=",
-      "fortune_url": "/fortunes",
-      "update_url": "/updates?queries=",
+      "json_url": "/json-raw",
+      "db_url": "/db-raw",
+      "query_url": "/query-raw?queries=",
+      "fortune_url": "/fortunes-raw",
+      "update_url": "/updates-raw?queries=",
       "plaintext_url": "/plaintext",
       "port": 8080,
       "approach": "Realistic",
@@ -50,7 +50,7 @@
       "tags": [ ]
     },
     "socketify-wsgi": {
-      "json_url": "/json",
+      "json_url": "/json-raw",
       "plaintext_url": "/plaintext",
       "port": 8080,
       "approach": "Realistic",
@@ -69,7 +69,7 @@
       "versus": "wsgi"
     },
     "socketify-wsgi-pypy": {
-      "json_url": "/json",
+      "json_url": "/json-raw",
       "plaintext_url": "/plaintext",
       "port": 8080,
       "approach": "Realistic",
@@ -109,14 +109,14 @@
       "display_name": "Flask [Meinheld] [PyPy3]",
       "notes": "PyPy",
       "versus": "wsgi",
-      "tags": [ ]
+      "tags": ["broken"]
     },
     "pypy-raw": {
-      "json_url": "/json",
-      "db_url": "/db",
-      "query_url": "/query?queries=",
-      "fortune_url": "/fortunes",
-      "update_url": "/updates?queries=",
+      "json_url": "/json-raw",
+      "db_url": "/db-raw",
+      "query_url": "/query-raw?queries=",
+      "fortune_url": "/fortunes-raw",
+      "update_url": "/updates-raw?queries=",
       "plaintext_url": "/plaintext",
       "port": 8080,
       "approach": "Realistic",
@@ -133,7 +133,7 @@
       "display_name": "Flask [Meinheld] [PyPy3] [Raw]",
       "notes": "PyPy",
       "versus": "wsgi",
-      "tags": [ ]
+      "tags": ["broken"]
     },
     "nginx-uwsgi": {
       "json_url": "/json",
@@ -160,11 +160,7 @@
       "tags": [ ]
     },
     "fastwsgi": {
-      "json_url": "/json",
-      "db_url": "/db",
-      "query_url": "/query?queries=",
-      "fortune_url": "/fortunes",
-      "update_url": "/updates?queries=",
+      "json_url": "/json-raw",
       "plaintext_url": "/plaintext",
       "port": 8080,
       "approach": "Realistic",

+ 13 - 17
frameworks/Python/flask/config.toml

@@ -20,11 +20,11 @@ versus = "wsgi"
 
 [raw]
 urls.plaintext = "/plaintext"
-urls.json = "/json"
-urls.db = "/db"
-urls.query = "/queries?queries="
-urls.update = "/updates?queries="
-urls.fortune = "/fortunes"
+urls.json = "/json-raw"
+urls.db = "/db-raw"
+urls.query = "/queries-raw?queries="
+urls.update = "/updates-raw?queries="
+urls.fortune = "/fortunes-raw"
 approach = "Realistic"
 classification = "Micro"
 database = "Postgres"
@@ -54,11 +54,11 @@ versus = "wsgi"
 
 [pypy-raw]
 urls.plaintext = "/plaintext"
-urls.json = "/json"
-urls.db = "/db"
-urls.query = "/queries?queries="
-urls.update = "/updates?queries="
-urls.fortune = "/fortunes"
+urls.json = "/json-raw"
+urls.db = "/db-raw"
+urls.query = "/queries-raw?queries="
+urls.update = "/updates-raw?queries="
+urls.fortune = "/fortunes-raw"
 approach = "Realistic"
 classification = "Micro"
 database = "Postgres"
@@ -71,7 +71,7 @@ versus = "wsgi"
 
 [socketify-wsgi]
 urls.plaintext = "/plaintext"
-urls.json = "/json"
+urls.json = "/json-raw"
 approach = "Realistic"
 classification = "Micro"
 database = "Postgres"
@@ -84,7 +84,7 @@ versus = "wsgi"
 
 [socketify-wsgi-pypy]
 urls.plaintext = "/plaintext"
-urls.json = "/json"
+urls.json = "/json-raw"
 approach = "Realistic"
 classification = "Micro"
 database = "Postgres"
@@ -114,11 +114,7 @@ versus = "wsgi"
 
 [fastwsgi]
 urls.plaintext = "/plaintext"
-urls.json = "/json"
-urls.db = "/db"
-urls.query = "/queries?queries="
-urls.update = "/updates?queries="
-urls.fortune = "/fortunes"
+urls.json = "/json-raw"
 approach = "Realistic"
 classification = "Micro"
 database = "Postgres"

+ 1 - 1
frameworks/Python/flask/gunicorn_conf.py

@@ -5,7 +5,7 @@ import sys
 _is_pypy = hasattr(sys, "pypy_version_info")
 _is_travis = os.environ.get("TRAVIS") == "true"
 
-workers = int(multiprocessing.cpu_count() * 1.5)
+workers = int(multiprocessing.cpu_count() * 2.5)
 if _is_travis:
     workers = 2