app.py 10 KB


  1. #!/usr/bin/env python
  2. import os
  3. import sys
  4. import multiprocessing
  5. import itertools
  6. from collections import namedtuple
  7. from operator import attrgetter, itemgetter
  8. from random import randint
  9. from email.utils import formatdate
  10. import flask
  11. from pony import orm
  12. if sys.version_info[0] == 3:
  13. xrange = range
  14. _is_pypy = hasattr(sys, "pypy_version_info")
  15. if _is_pypy:
  16. import psycopg2cffi.compat
  17. psycopg2cffi.compat.register()
  18. _is_travis = os.environ.get('TRAVIS') == 'true'
  19. _is_gunicorn = "gunicorn" in os.environ.get("SERVER_SOFTWARE", "")
  20. _cpu_count = multiprocessing.cpu_count()
  21. if _is_travis:
  22. _cpu_count = 2
  23. _raw_orm = os.getenv('USE_RAW', "0") == "1"
  24. _use_orjson = os.getenv('USE_ORJSON', "0") == "1"
  25. if _use_orjson:
  26. import orjson as json
  27. _use_ujson = os.getenv('USE_UJSON', "0") == "1"
  28. if _use_ujson:
  29. import ujson as json
  30. if not _use_orjson and not _use_ujson:
  31. import json
  32. from flask import jsonify
  33. DBDRV = "postgres"
  34. DBHOST = "tfb-database"
  35. DBUSER = "benchmarkdbuser"
  36. DBPSWD = "benchmarkdbpass"
  37. # setup
  38. app = flask.Flask(__name__)
  39. app.config["JSONIFY_PRETTYPRINT_REGULAR"] = False
  40. # -----------------------------------------------------------------------------
  41. response_server = None
  42. response_add_date = False
  43. @app.after_request
  44. def after_request(response):
  45. if response_server:
  46. response.headers['Server'] = response_server
  47. if response_add_date:
  48. response.headers['Date'] = formatdate(timeval=None, localtime=False, usegmt=True)
  49. return response
  50. if _use_orjson or _use_ujson:
  51. def jsonify(jdict):
  52. return json.dumps(jdict), { "Content-Type": "application/json" }
  53. # -----------------------------------------------------------------------------
  54. def get_num_queries():
  55. try:
  56. num_queries = flask.request.args.get("queries", 1, type=int)
  57. except ValueError:
  58. num_queries = 1
  59. if num_queries < 1:
  60. return 1
  61. if num_queries > 500:
  62. return 500
  63. return num_queries
  64. def generate_ids(num_queries):
  65. ids = {randint(1, 10000) for _ in xrange(num_queries)}
  66. while len(ids) < num_queries:
  67. ids.add(randint(1, 10000))
  68. return list(sorted(ids))
  69. if _raw_orm:
  70. import jinja2
  71. if _is_pypy:
  72. from psycopg2cffi.pool import ThreadedConnectionPool
  73. from psycopg2cffi.extras import execute_batch
  74. else:
  75. from psycopg2.pool import ThreadedConnectionPool
  76. from psycopg2.extras import execute_batch
  77. pool_size = _cpu_count * 2.5
  78. POOL = ThreadedConnectionPool(
  79. minconn=int(pool_size / 4),
  80. maxconn=int(pool_size / 4),
  81. database="hello_world",
  82. user=DBUSER,
  83. password=DBPSWD,
  84. host="tfb-database",
  85. port=5432,
  86. )
  87. read_row_sql = (
  88. 'SELECT world."randomnumber", world."id" FROM "world" WHERE id = %(id)s'
  89. )
  90. prepared_read_row_sql = (
  91. 'SELECT world."randomnumber", world."id" FROM "world" WHERE id = $1'
  92. )
  93. write_row_sql = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  94. ADDITIONAL_ROW = (0, "Additional fortune added at request time.")
  95. db = POOL.getconn()
  96. cursor = db.cursor()
  97. cursor.execute("PREPARE read_stmt (int) AS " + prepared_read_row_sql)
  98. cursor.execute("PREPARE write_stmt (int, int) AS " + write_row_sql)
  99. cursor.execute('PREPARE fortune AS SELECT * FROM "Fortune"')
  100. del cursor
  101. POOL.putconn(db)
  102. del db
  103. def db_query(arg):
  104. (cursor, ident) = arg
  105. cursor.execute("EXECUTE read_stmt(%s)", [ident])
  106. result = cursor.fetchone()
  107. return result
  108. fn = os.path.join("templates", "fortune_raw.html")
  109. with open(fn, "r") as template_file:
  110. template_text = template_file.read()
  111. FORTUNE_TEMPLATE = jinja2.Template(template_text)
  112. #Fortune = namedtuple("Fortune", ["id", "message"])
  113. else: # --------- PonyORM ------------------------------------------------
  114. app.config["STORM_DATABASE_URI"] = "{}://{}:{}@{}:5432/hello_world".format(DBDRV, DBUSER, DBPSWD, DBHOST)
  115. db = orm.Database()
  116. db.bind(DBDRV, host=DBHOST, port=5432, user=DBUSER, password=DBPSWD, database="hello_world")
  117. class World(db.Entity):
  118. _table_ = "world"
  119. id = orm.PrimaryKey(int)
  120. randomNumber = orm.Required(int, column="randomnumber")
  121. def to_dict(self):
  122. """Return object data in easily serializeable format"""
  123. return {"id": self.id, "randomNumber": self.randomNumber}
  124. class Fortune(db.Entity):
  125. _table_ = "fortune"
  126. id = orm.PrimaryKey(int, auto=True)
  127. message = orm.Required(str)
  128. db.generate_mapping(create_tables=False)
  129. # ----------------------------------------------------------------------------------------
  130. @app.route("/json")
  131. def hello():
  132. if _use_orjson or _use_ujson:
  133. return jsonify( {"message": "Hello, World!"} )
  134. return flask.jsonify(message="Hello, World!")
  135. @app.route("/query")
  136. def get_random_world():
  137. if _raw_orm:
  138. db = POOL.getconn()
  139. cursor = db.cursor()
  140. num_queries = get_num_queries()
  141. results = map(db_query, zip(itertools.repeat(cursor, num_queries), generate_ids(num_queries)))
  142. worlds = [ {"id": result[0], "randomNumber": result[1]} for result in results ]
  143. POOL.putconn(db)
  144. return jsonify(worlds)
  145. with orm.db_session(serializable=False):
  146. worlds = [World[ident].to_dict() for ident in generate_ids(get_num_queries())]
  147. return jsonify(worlds)
  148. @app.route("/db")
  149. def get_random_world_single():
  150. if _raw_orm:
  151. db = POOL.getconn()
  152. cursor = db.cursor()
  153. cursor.execute("EXECUTE read_stmt(%s)", generate_ids(1))
  154. result = cursor.fetchone()
  155. world = {"id": result[0], "randomNumber": result[1]}
  156. POOL.putconn(db)
  157. return jsonify(world)
  158. wid = randint(1, 10000)
  159. with orm.db_session(serializable=False):
  160. world = World[wid]
  161. return jsonify(world.to_dict())
  162. @app.route("/fortunes")
  163. def get_fortunes():
  164. if _raw_orm:
  165. db = POOL.getconn()
  166. cursor = db.cursor()
  167. cursor.execute("EXECUTE fortune")
  168. fortunes = list(cursor.fetchall())
  169. fortunes.append(ADDITIONAL_ROW)
  170. fortunes.sort(key=itemgetter(1))
  171. POOL.putconn(db)
  172. return flask.Response(FORTUNE_TEMPLATE.render(fortunes=fortunes))
  173. with orm.db_session(serializable=False):
  174. fortunes = list(orm.select(fortune for fortune in Fortune))
  175. tmp_fortune = namedtuple("Fortune", ["id", "message"])
  176. fortunes.append(
  177. tmp_fortune(id=0, message="Additional fortune added at request time.")
  178. )
  179. fortunes.sort(key=attrgetter("message"))
  180. return flask.render_template("fortunes.html", fortunes=fortunes)
  181. @app.route("/updates")
  182. def updates():
  183. if _raw_orm:
  184. db = POOL.getconn()
  185. cursor = db.cursor()
  186. num_queries = get_num_queries()
  187. ids = generate_ids(num_queries)
  188. update_values = generate_ids(num_queries)
  189. list(map(db_query, zip(itertools.repeat(cursor, num_queries), generate_ids(num_queries))))
  190. worlds = list(zip(ids, update_values))
  191. execute_batch(cursor, "EXECUTE write_stmt(%s, %s)", worlds)
  192. db.commit()
  193. POOL.putconn(db)
  194. data = [ {"id": ident, "randomNumber": update} for ident, update in worlds ]
  195. return jsonify(data)
  196. num_queries = get_num_queries()
  197. ids = generate_ids(num_queries)
  198. ids.sort()
  199. worlds = []
  200. with orm.db_session(serializable=False):
  201. for ident in ids:
  202. world = World[ident]
  203. world.randomNumber = randint(1, 10000)
  204. worlds.append({"id": world.id, "randomNumber": world.randomNumber})
  205. return jsonify(worlds)
  206. @app.route("/plaintext")
  207. def plaintext():
  208. response = flask.make_response(b"Hello, World!")
  209. response.content_type = "text/plain"
  210. return response
  211. # -----------------------------------------------------------------------------------
  212. if __name__ == "__main__":
  213. import optparse
  214. import logging
  215. import re
  216. parser = optparse.OptionParser("usage: %prog [options]", add_help_option=False)
  217. parser.add_option("-h", "--host", dest="host", default='0.0.0.0', type="string")
  218. parser.add_option("-p", "--port", dest="port", default=8080, type="int")
  219. parser.add_option("-s", "--server", dest="server", default="gunicorn", type="string")
  220. parser.add_option("-w", "--workers", dest="workers", default=0, type="int")
  221. parser.add_option("-k", "--keepalive", dest="keepalive", default=60, type="int")
  222. parser.add_option("-v", "--verbose", dest="verbose", default=0, type="int")
  223. (opt, args) = parser.parse_args()
  224. workers = _cpu_count
  225. if workers > 0:
  226. workers = opt.workers
  227. if _is_travis:
  228. workers = 2
  229. def run_app():
  230. global response_server
  231. global response_add_date
  232. if opt.server == "werkzeug":
  233. import werkzeug
  234. werkzeug.serving.WSGIRequestHandler.protocol_version = "HTTP/1.1"
  235. wzlog = logging.getLogger("werkzeug")
  236. wzlog.setLevel(logging.WARN)
  237. use_reloader = False # True = Use a reloader process to restart the server process when files are changed
  238. response_server = None
  239. response_add_date = False
  240. werkzeug.serving.run_simple(opt.host, opt.port, app, use_reloader=use_reloader)
  241. if opt.server == 'fastwsgi':
  242. import fastwsgi
  243. response_server = "FastWSGI"
  244. response_add_date = False
  245. fastwsgi.run(app, host=opt.host, port=opt.port, loglevel=opt.verbose)
  246. if opt.server == 'socketify':
  247. import socketify
  248. response_server = None
  249. response_add_date = False
  250. msg = "Listening on http://0.0.0.0:{port} now\n".format(port=opt.port)
  251. socketify.WSGI(app).listen(opt.port, lambda config: logging.info(msg)).run()
  252. def create_fork():
  253. n = os.fork()
  254. # n greater than 0 means parent process
  255. if not n > 0:
  256. run_app()
  257. # fork limiting the cpu count - 1
  258. for i in range(1, workers):
  259. create_fork()
  260. run_app() # run app on the main process too :)