#!/usr/bin/env python import os import sys import multiprocessing import itertools from collections import namedtuple from operator import attrgetter, itemgetter from random import randint from email.utils import formatdate import flask from pony import orm if sys.version_info[0] == 3: xrange = range _is_pypy = hasattr(sys, "pypy_version_info") if _is_pypy: import psycopg2cffi.compat psycopg2cffi.compat.register() _is_travis = os.environ.get('TRAVIS') == 'true' _is_gunicorn = "gunicorn" in os.environ.get("SERVER_SOFTWARE", "") _cpu_count = multiprocessing.cpu_count() if _is_travis: _cpu_count = 2 _raw_orm = os.getenv('USE_RAW', "0") == "1" _use_orjson = os.getenv('USE_ORJSON', "0") == "1" if _use_orjson: import orjson as json _use_ujson = os.getenv('USE_UJSON', "0") == "1" if _use_ujson: import ujson as json if not _use_orjson and not _use_ujson: import json from flask import jsonify DBDRV = "postgres" DBHOST = "tfb-database" DBUSER = "benchmarkdbuser" DBPSWD = "benchmarkdbpass" # setup app = flask.Flask(__name__) app.config["JSONIFY_PRETTYPRINT_REGULAR"] = False # ----------------------------------------------------------------------------- response_server = None response_add_date = False @app.after_request def after_request(response): if response_server: response.headers['Server'] = response_server if response_add_date: response.headers['Date'] = formatdate(timeval=None, localtime=False, usegmt=True) return response if _use_orjson or _use_ujson: def jsonify(jdict): return json.dumps(jdict), { "Content-Type": "application/json" } # ----------------------------------------------------------------------------- def get_num_queries(): try: num_queries = flask.request.args.get("queries", 1, type=int) except ValueError: num_queries = 1 if num_queries < 1: return 1 if num_queries > 500: return 500 return num_queries def generate_ids(num_queries): ids = {randint(1, 10000) for _ in xrange(num_queries)} while len(ids) < num_queries: ids.add(randint(1, 10000)) return list(sorted(ids)) if _raw_orm: import jinja2 if _is_pypy: from psycopg2cffi.pool import ThreadedConnectionPool from psycopg2cffi.extras import execute_batch else: from psycopg2.pool import ThreadedConnectionPool from psycopg2.extras import execute_batch pool_size = _cpu_count * 2.5 POOL = ThreadedConnectionPool( minconn=int(pool_size / 4), maxconn=int(pool_size / 4), database="hello_world", user=DBUSER, password=DBPSWD, host="tfb-database", port=5432, ) read_row_sql = ( 'SELECT world."randomnumber", world."id" FROM "world" WHERE id = %(id)s' ) prepared_read_row_sql = ( 'SELECT world."randomnumber", world."id" FROM "world" WHERE id = $1' ) write_row_sql = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2' ADDITIONAL_ROW = (0, "Additional fortune added at request time.") db = POOL.getconn() cursor = db.cursor() cursor.execute("PREPARE read_stmt (int) AS " + prepared_read_row_sql) cursor.execute("PREPARE write_stmt (int, int) AS " + write_row_sql) cursor.execute('PREPARE fortune AS SELECT * FROM "Fortune"') del cursor POOL.putconn(db) del db def db_query(arg): (cursor, ident) = arg cursor.execute("EXECUTE read_stmt(%s)", [ident]) result = cursor.fetchone() return result fn = os.path.join("templates", "fortune_raw.html") with open(fn, "r") as template_file: template_text = template_file.read() FORTUNE_TEMPLATE = jinja2.Template(template_text) #Fortune = namedtuple("Fortune", ["id", "message"]) else: # --------- PonyORM ------------------------------------------------ app.config["STORM_DATABASE_URI"] = "{}://{}:{}@{}:5432/hello_world".format(DBDRV, DBUSER, DBPSWD, DBHOST) db = orm.Database() db.bind(DBDRV, host=DBHOST, port=5432, user=DBUSER, password=DBPSWD, database="hello_world") class World(db.Entity): _table_ = "world" id = orm.PrimaryKey(int) randomNumber = orm.Required(int, column="randomnumber") def to_dict(self): """Return object data in easily serializeable format""" return {"id": self.id, "randomNumber": self.randomNumber} class Fortune(db.Entity): _table_ = "fortune" id = orm.PrimaryKey(int, auto=True) message = orm.Required(str) db.generate_mapping(create_tables=False) # ---------------------------------------------------------------------------------------- @app.route("/json") def hello(): if _use_orjson or _use_ujson: return jsonify( {"message": "Hello, World!"} ) return flask.jsonify(message="Hello, World!") @app.route("/query") def get_random_world(): if _raw_orm: db = POOL.getconn() cursor = db.cursor() num_queries = get_num_queries() results = map(db_query, zip(itertools.repeat(cursor, num_queries), generate_ids(num_queries))) worlds = [ {"id": result[0], "randomNumber": result[1]} for result in results ] POOL.putconn(db) return jsonify(worlds) with orm.db_session(serializable=False): worlds = [World[ident].to_dict() for ident in generate_ids(get_num_queries())] return jsonify(worlds) @app.route("/db") def get_random_world_single(): if _raw_orm: db = POOL.getconn() cursor = db.cursor() cursor.execute("EXECUTE read_stmt(%s)", generate_ids(1)) result = cursor.fetchone() world = {"id": result[0], "randomNumber": result[1]} POOL.putconn(db) return jsonify(world) wid = randint(1, 10000) with orm.db_session(serializable=False): world = World[wid] return jsonify(world.to_dict()) @app.route("/fortunes") def get_fortunes(): if _raw_orm: db = POOL.getconn() cursor = db.cursor() cursor.execute("EXECUTE fortune") fortunes = list(cursor.fetchall()) fortunes.append(ADDITIONAL_ROW) fortunes.sort(key=itemgetter(1)) POOL.putconn(db) return flask.Response(FORTUNE_TEMPLATE.render(fortunes=fortunes)) with orm.db_session(serializable=False): fortunes = list(orm.select(fortune for fortune in Fortune)) tmp_fortune = namedtuple("Fortune", ["id", "message"]) fortunes.append( tmp_fortune(id=0, message="Additional fortune added at request time.") ) fortunes.sort(key=attrgetter("message")) return flask.render_template("fortunes.html", fortunes=fortunes) @app.route("/updates") def updates(): if _raw_orm: db = POOL.getconn() cursor = db.cursor() num_queries = get_num_queries() ids = generate_ids(num_queries) update_values = generate_ids(num_queries) list(map(db_query, zip(itertools.repeat(cursor, num_queries), generate_ids(num_queries)))) worlds = list(zip(ids, update_values)) execute_batch(cursor, "EXECUTE write_stmt(%s, %s)", worlds) db.commit() POOL.putconn(db) data = [ {"id": ident, "randomNumber": update} for ident, update in worlds ] return jsonify(data) num_queries = get_num_queries() ids = generate_ids(num_queries) ids.sort() worlds = [] with orm.db_session(serializable=False): for ident in ids: world = World[ident] world.randomNumber = randint(1, 10000) worlds.append({"id": world.id, "randomNumber": world.randomNumber}) return jsonify(worlds) @app.route("/plaintext") def plaintext(): response = flask.make_response(b"Hello, World!") response.content_type = "text/plain" return response # ----------------------------------------------------------------------------------- if __name__ == "__main__": import optparse import logging import re parser = optparse.OptionParser("usage: %prog [options]", add_help_option=False) parser.add_option("-h", "--host", dest="host", default='0.0.0.0', type="string") parser.add_option("-p", "--port", dest="port", default=8080, type="int") parser.add_option("-s", "--server", dest="server", default="gunicorn", type="string") parser.add_option("-w", "--workers", dest="workers", default=0, type="int") parser.add_option("-k", "--keepalive", dest="keepalive", default=60, type="int") parser.add_option("-v", "--verbose", dest="verbose", default=0, type="int") (opt, args) = parser.parse_args() workers = _cpu_count if workers > 0: workers = opt.workers if _is_travis: workers = 2 def run_app(): global response_server global response_add_date if opt.server == "werkzeug": import werkzeug werkzeug.serving.WSGIRequestHandler.protocol_version = "HTTP/1.1" wzlog = logging.getLogger("werkzeug") wzlog.setLevel(logging.WARN) use_reloader = False # True = Use a reloader process to restart the server process when files are changed response_server = None response_add_date = False werkzeug.serving.run_simple(opt.host, opt.port, app, use_reloader=use_reloader) if opt.server == 'fastwsgi': import fastwsgi response_server = "FastWSGI" response_add_date = False fastwsgi.run(app, host=opt.host, port=opt.port, loglevel=opt.verbose) if opt.server == 'socketify': import socketify response_server = None response_add_date = False msg = "Listening on http://0.0.0.0:{port} now\n".format(port=opt.port) socketify.WSGI(app).listen(opt.port, lambda config: logging.info(msg)).run() def create_fork(): n = os.fork() # n greater than 0 means parent process if not n > 0: run_app() # fork limiting the cpu count - 1 for i in range(1, workers): create_fork() run_app() # run app on the main process too :)