app.py 10 KB


  1. #!/usr/bin/env python
  2. import os
  3. import sys
  4. import multiprocessing
  5. import itertools
  6. from collections import namedtuple
  7. from operator import attrgetter, itemgetter
  8. import random
  9. from email.utils import formatdate
  10. import flask
  11. from pony import orm
  12. if sys.version_info[0] == 3:
  13. xrange = range
  14. _is_pypy = hasattr(sys, "pypy_version_info")
  15. if _is_pypy:
  16. import psycopg2cffi.compat
  17. psycopg2cffi.compat.register()
  18. _is_travis = os.environ.get('TRAVIS') == 'true'
  19. _is_gunicorn = "gunicorn" in os.environ.get("SERVER_SOFTWARE", "")
  20. _cpu_count = multiprocessing.cpu_count()
  21. if _is_travis:
  22. _cpu_count = 2
  23. _raw = os.getenv('USE_RAW', "0") == "1"
  24. _use_orjson = os.getenv('USE_ORJSON', "0") == "1"
  25. if _use_orjson:
  26. import orjson as json
  27. _use_ujson = os.getenv('USE_UJSON', "0") == "1"
  28. if _use_ujson:
  29. import ujson as json
  30. if not _use_orjson and not _use_ujson:
  31. import json
  32. from flask import jsonify
  33. DBDRV = "postgres"
  34. DBHOST = "tfb-database"
  35. DBUSER = "benchmarkdbuser"
  36. DBPSWD = "benchmarkdbpass"
  37. # setup
  38. app = flask.Flask(__name__)
  39. app.config["JSONIFY_PRETTYPRINT_REGULAR"] = False
  40. # -----------------------------------------------------------------------------
  41. response_server = None
  42. response_add_date = False
  43. @app.after_request
  44. def after_request(response):
  45. if response_server:
  46. response.headers['Server'] = response_server
  47. if response_add_date:
  48. response.headers['Date'] = formatdate(timeval=None, localtime=False, usegmt=True)
  49. return response
  50. if _use_orjson or _use_ujson:
  51. def jsonify(jdict):
  52. return json.dumps(jdict), { "Content-Type": "application/json" }
  53. # -----------------------------------------------------------------------------
  54. def get_num_queries():
  55. try:
  56. num_queries = flask.request.args.get("queries", 1, type=int)
  57. except ValueError:
  58. num_queries = 1
  59. if num_queries < 1:
  60. return 1
  61. if num_queries > 500:
  62. return 500
  63. return num_queries
  64. def generate_ids(num_queries):
  65. return random.sample(range(1, 10001), num_queries)
  66. if _raw:
  67. import jinja2
  68. if _is_pypy:
  69. from psycopg2cffi.pool import ThreadedConnectionPool
  70. from psycopg2cffi.extras import execute_batch
  71. else:
  72. from psycopg2.pool import ThreadedConnectionPool
  73. from psycopg2.extras import execute_batch
  74. pool_size = int(_cpu_count * 2.5 / 4)
  75. if _is_travis:
  76. pool_size = 5
  77. POOL = ThreadedConnectionPool(
  78. minconn=pool_size,
  79. maxconn=pool_size,
  80. database="hello_world",
  81. user=DBUSER,
  82. password=DBPSWD,
  83. host=DBHOST,
  84. port=5432,
  85. )
  86. read_row_sql = (
  87. 'SELECT world."randomnumber", world."id" FROM "world" WHERE id = %(id)s'
  88. )
  89. prepared_read_row_sql = (
  90. 'SELECT world."randomnumber", world."id" FROM "world" WHERE id = $1'
  91. )
  92. write_row_sql = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  93. ADDITIONAL_ROW = (0, "Additional fortune added at request time.")
  94. db = POOL.getconn()
  95. cursor = db.cursor()
  96. cursor.execute("PREPARE read_stmt (int) AS " + prepared_read_row_sql)
  97. cursor.execute("PREPARE write_stmt (int, int) AS " + write_row_sql)
  98. cursor.execute('PREPARE fortune AS SELECT * FROM "Fortune"')
  99. del cursor
  100. POOL.putconn(db)
  101. del db
  102. def db_query(arg):
  103. (cursor, ident) = arg
  104. cursor.execute("EXECUTE read_stmt(%s)", [ident])
  105. result = cursor.fetchone()
  106. return result
  107. fn = os.path.join("templates", "fortune_raw.html")
  108. with open(fn, "r") as template_file:
  109. template_text = template_file.read()
  110. FORTUNE_TEMPLATE = jinja2.Template(template_text)
  111. #Fortune = namedtuple("Fortune", ["id", "message"])
  112. else: # --------- PonyORM ------------------------------------------------
  113. app.config["STORM_DATABASE_URI"] = "{}://{}:{}@{}:5432/hello_world".format(DBDRV, DBUSER, DBPSWD, DBHOST)
  114. db = orm.Database()
  115. db.bind(DBDRV, host=DBHOST, port=5432, user=DBUSER, password=DBPSWD, database="hello_world")
  116. class World(db.Entity):
  117. _table_ = "world"
  118. id = orm.PrimaryKey(int)
  119. randomNumber = orm.Required(int, column="randomnumber")
  120. def to_dict(self):
  121. """Return object data in easily serializeable format"""
  122. return {"id": self.id, "randomNumber": self.randomNumber}
  123. class Fortune(db.Entity):
  124. _table_ = "fortune"
  125. id = orm.PrimaryKey(int, auto=True)
  126. message = orm.Required(str)
  127. db.generate_mapping(create_tables=False)
  128. # ----------------------------------------------------------------------------------------
  129. @app.route("/json")
  130. def json_data():
  131. return flask.jsonify(message="Hello, World!")
  132. @app.route("/json-raw")
  133. def json_data_raw():
  134. return jsonify( {"message": "Hello, World!"} )
  135. @app.route("/db")
  136. def get_random_world_single():
  137. wid = random.randint(1, 10000)
  138. with orm.db_session(serializable=False):
  139. world = World[wid]
  140. return jsonify(world.to_dict())
  141. @app.route("/db-raw")
  142. def get_random_world_single_raw():
  143. db = POOL.getconn()
  144. cursor = db.cursor()
  145. cursor.execute("EXECUTE read_stmt(%s)", generate_ids(1))
  146. result = cursor.fetchone()
  147. world = {"id": result[0], "randomNumber": result[1]}
  148. POOL.putconn(db)
  149. return jsonify(world)
  150. @app.route("/query")
  151. def get_random_world():
  152. with orm.db_session(serializable=False):
  153. worlds = [World[ident].to_dict() for ident in generate_ids(get_num_queries())]
  154. return jsonify(worlds)
  155. @app.route("/query-raw")
  156. def get_random_world_raw():
  157. db = POOL.getconn()
  158. cursor = db.cursor()
  159. num_queries = get_num_queries()
  160. results = map(db_query, zip(itertools.repeat(cursor, num_queries), generate_ids(num_queries)))
  161. worlds = [ {"id": result[0], "randomNumber": result[1]} for result in results ]
  162. POOL.putconn(db)
  163. return jsonify(worlds)
  164. @app.route("/fortunes")
  165. def get_fortunes():
  166. with orm.db_session(serializable=False):
  167. fortunes = list(orm.select(fortune for fortune in Fortune))
  168. tmp_fortune = namedtuple("Fortune", ["id", "message"])
  169. fortunes.append(
  170. tmp_fortune(id=0, message="Additional fortune added at request time.")
  171. )
  172. fortunes.sort(key=attrgetter("message"))
  173. return flask.render_template("fortunes.html", fortunes=fortunes)
  174. @app.route("/fortunes-raw")
  175. def get_fortunes_raw():
  176. db = POOL.getconn()
  177. cursor = db.cursor()
  178. cursor.execute("EXECUTE fortune")
  179. fortunes = list(cursor.fetchall())
  180. fortunes.append(ADDITIONAL_ROW)
  181. fortunes.sort(key=itemgetter(1))
  182. POOL.putconn(db)
  183. return flask.Response(FORTUNE_TEMPLATE.render(fortunes=fortunes))
  184. @app.route("/updates")
  185. def updates():
  186. num_queries = get_num_queries()
  187. ids = generate_ids(num_queries)
  188. ids.sort()
  189. worlds = []
  190. with orm.db_session(serializable=False):
  191. for ident in ids:
  192. world = World[ident]
  193. world.randomNumber = random.randint(1, 10000)
  194. worlds.append({"id": world.id, "randomNumber": world.randomNumber})
  195. return jsonify(worlds)
  196. @app.route("/updates-raw")
  197. def updates_raw():
  198. db = POOL.getconn()
  199. cursor = db.cursor()
  200. num_queries = get_num_queries()
  201. ids = generate_ids(num_queries)
  202. update_values = generate_ids(num_queries)
  203. list(map(db_query, zip(itertools.repeat(cursor, num_queries), generate_ids(num_queries))))
  204. worlds = list(zip(ids, update_values))
  205. execute_batch(cursor, "EXECUTE write_stmt(%s, %s)", worlds)
  206. db.commit()
  207. POOL.putconn(db)
  208. data = [ {"id": ident, "randomNumber": update} for ident, update in worlds ]
  209. return jsonify(data)
  210. @app.route("/plaintext")
  211. def plaintext():
  212. response = flask.make_response(b"Hello, World!")
  213. response.content_type = "text/plain"
  214. return response
  215. # -----------------------------------------------------------------------------------
  216. if __name__ == "__main__":
  217. import optparse
  218. import logging
  219. import signal
  220. import re
  221. parser = optparse.OptionParser("usage: %prog [options]", add_help_option=False)
  222. parser.add_option("-h", "--host", dest="host", default='0.0.0.0', type="string")
  223. parser.add_option("-p", "--port", dest="port", default=8080, type="int")
  224. parser.add_option("-s", "--server", dest="server", default="gunicorn", type="string")
  225. parser.add_option("-w", "--workers", dest="workers", default=0, type="int")
  226. parser.add_option("-k", "--keepalive", dest="keepalive", default=60, type="int")
  227. parser.add_option("-v", "--verbose", dest="verbose", default=0, type="int")
  228. (opt, args) = parser.parse_args()
  229. workers = opt.workers if opt.workers > 0 else _cpu_count
  230. if _is_travis:
  231. workers = 2
  232. worker_list = [ ]
  233. def run_app():
  234. global response_server
  235. global response_add_date
  236. if opt.server == "werkzeug":
  237. import werkzeug
  238. werkzeug.serving.WSGIRequestHandler.protocol_version = "HTTP/1.1"
  239. wzlog = logging.getLogger("werkzeug")
  240. wzlog.setLevel(logging.WARN)
  241. use_reloader = False # True = Use a reloader process to restart the server process when files are changed
  242. response_server = None
  243. response_add_date = False
  244. werkzeug.serving.run_simple(opt.host, opt.port, app, use_reloader=use_reloader)
  245. if opt.server == 'fastwsgi':
  246. import fastwsgi
  247. response_server = "FastWSGI"
  248. response_add_date = False
  249. fastwsgi.server.backlog = 4096
  250. fastwsgi.run(app, host=opt.host, port=opt.port, loglevel=opt.verbose)
  251. if opt.server == 'socketify':
  252. import socketify
  253. response_server = None
  254. response_add_date = False
  255. msg = "Listening on http://0.0.0.0:{port} now\n".format(port=opt.port)
  256. socketify.WSGI(app).listen(opt.port, lambda config: logging.info(msg)).run()
  257. def create_fork():
  258. pid = os.fork()
  259. if pid > 0:
  260. return pid
  261. try:
  262. run_app()
  263. except KeyboardInterrupt:
  264. pass
  265. sys.exit(0)
  266. for i in range(0, workers):
  267. pid = create_fork()
  268. print("Worker process added with PID:", pid)
  269. worker_list.append(pid)
  270. print("Running {} workers".format(len(worker_list)))
  271. try:
  272. for i in range(workers):
  273. os.wait()
  274. except KeyboardInterrupt:
  275. print("\n" + "Stopping all workers")
  276. for pid in worker_list:
  277. os.kill(pid, signal.SIGINT)