app.py 6.4 KB


  1. import os
  2. import ujson
  3. import asyncpg
  4. import multiprocessing
  5. import random
  6. import blacksheep as bs
  7. import jinja2
  8. from email.utils import formatdate
  9. try:
  10. from ujson import dumps as jsonify
  11. except:
  12. from json import dumps as jsonify
  13. _is_travis = os.environ.get('TRAVIS') == 'true'
  14. _is_gunicorn = "gunicorn" in os.environ.get("SERVER_SOFTWARE", "")
  15. _cpu_count = multiprocessing.cpu_count()
  16. if _is_travis:
  17. _cpu_count = 2
  18. #from blacksheep.settings.json import json_settings
  19. #json_settings.use(dumps=jsonify)
  20. DBDRV = "postgres"
  21. DBHOST = "tfb-database"
  22. DBUSER = "benchmarkdbuser"
  23. DBPSWD = "benchmarkdbpass"
  24. READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
  25. WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  26. ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
  27. MAX_POOL_SIZE = 1000 // multiprocessing.cpu_count()
  28. MIN_POOL_SIZE = max(int(MAX_POOL_SIZE / 2), 1)
  29. db_pool = None
  30. g_response_server = None
  31. g_response_add_date = False
  32. async def setup_db(app):
  33. global db_pool
  34. db_pool = await asyncpg.create_pool(
  35. user=os.getenv('PGUSER', DBUSER),
  36. password=os.getenv('PGPASS', DBPSWD),
  37. database='hello_world',
  38. host=DBHOST,
  39. port=5432,
  40. min_size=MIN_POOL_SIZE,
  41. max_size=MAX_POOL_SIZE,
  42. )
  43. def load_fortunes_template():
  44. path = os.path.join('templates', 'fortune.html')
  45. with open(path, 'r') as template_file:
  46. template_text = template_file.read()
  47. return jinja2.Template(template_text)
  48. fortune_template = load_fortunes_template()
  49. app = bs.Application()
  50. app.on_start += setup_db
  51. def get_num_queries(request):
  52. try:
  53. value = request.query.get('queries')
  54. if value is None:
  55. return 1
  56. query_count = int(value[0])
  57. except (KeyError, IndexError, ValueError):
  58. return 1
  59. if query_count < 1:
  60. return 1
  61. if query_count > 500:
  62. return 500
  63. return query_count
  64. # ------------------------------------------------------------------------------------------
  65. async def bs_middleware(request, handler):
  66. global g_response_server, g_response_add_date
  67. response = await handler(request)
  68. if g_response_server:
  69. response.headers[b'Server'] = g_response_server
  70. if g_response_add_date:
  71. response.headers[b'Date'] = formatdate(timeval=None, localtime=False, usegmt=True)
  72. return response
  73. @app.route('/json')
  74. async def json_test(request):
  75. return bs.json( {'message': 'Hello, world!'} )
  76. @app.route('/db')
  77. async def single_db_query_test(request):
  78. row_id = random.randint(1, 10000)
  79. async with db_pool.acquire() as db_conn:
  80. number = await db_conn.fetchval(READ_ROW_SQL, row_id)
  81. world = {'id': row_id, 'randomNumber': number}
  82. return bs.json(world)
  83. @app.route('/queries')
  84. async def multiple_db_queries_test(request):
  85. num_queries = get_num_queries(request)
  86. row_ids = random.sample(range(1, 10000), num_queries)
  87. worlds = [ ]
  88. async with db_pool.acquire() as db_conn:
  89. statement = await db_conn.prepare(READ_ROW_SQL)
  90. for row_id in row_ids:
  91. number = await statement.fetchval(row_id)
  92. worlds.append( {"id": row_id, "randomNumber": number} )
  93. return bs.json(worlds)
  94. @app.route('/fortunes')
  95. async def fortunes_test(request):
  96. async with db_pool.acquire() as db_conn:
  97. fortunes = await db_conn.fetch("SELECT * FROM Fortune")
  98. fortunes.append(ADDITIONAL_ROW)
  99. fortunes.sort(key = lambda row: row[1])
  100. data = fortune_template.render(fortunes=fortunes)
  101. return bs.html(data)
  102. @app.route('/updates')
  103. async def db_updates_test(request):
  104. num_queries = get_num_queries(request)
  105. ids = sorted(random.sample(range(1, 10000 + 1), num_queries))
  106. numbers = sorted(random.sample(range(1, 10000), num_queries))
  107. updates = list(zip(ids, numbers))
  108. worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
  109. async with db_pool.acquire() as db_conn:
  110. statement = await db_conn.prepare(READ_ROW_SQL)
  111. for row_id, _ in updates:
  112. await statement.fetchval(row_id)
  113. await db_conn.executemany(WRITE_ROW_SQL, updates)
  114. return bs.json(worlds)
  115. @app.route('/plaintext')
  116. async def plaintext_test(request):
  117. return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
  118. #return bs.text('Hello, World!')
  119. # -----------------------------------------------------------------------------------
  120. if __name__ == "__main__":
  121. import optparse
  122. import logging
  123. import re
  124. parser = optparse.OptionParser("usage: %prog [options]", add_help_option=False)
  125. parser.add_option("-h", "--host", dest="host", default='0.0.0.0', type="string")
  126. parser.add_option("-p", "--port", dest="port", default=8080, type="int")
  127. parser.add_option("-s", "--server", dest="server", default="uvicorn", type="string")
  128. parser.add_option("-w", "--workers", dest="workers", default=0, type="int")
  129. parser.add_option("-k", "--keepalive", dest="keepalive", default=60, type="int")
  130. parser.add_option("-v", "--verbose", dest="verbose", default=0, type="int")
  131. (opt, args) = parser.parse_args()
  132. workers = _cpu_count
  133. if workers > 0:
  134. workers = opt.workers
  135. if _is_travis:
  136. workers = 2
  137. def run_app():
  138. global g_response_server, g_response_add_date
  139. if opt.gateway == "uvicorn":
  140. import uvicorn
  141. log_level = logging.ERROR
  142. uvicorn.run(app, host=opt.host, port=opt.port, workers=1, loop="uvloop", log_level=log_level, access_log=False)
  143. if opt.server == 'fastwsgi':
  144. import fastwsgi
  145. from blacksheep.utils.aio import get_running_loop
  146. g_response_server = b'FastWSGI'
  147. app.middlewares.append(bs_middleware)
  148. loop = get_running_loop()
  149. loop.run_until_complete(app.start())
  150. fastwsgi.run(app, host=opt.host, port=opt.port, loglevel=opt.verbose)
  151. if opt.server == 'socketify':
  152. import socketify
  153. msg = "Listening on http://0.0.0.0:{port} now\n".format(port=opt.port)
  154. socketify.WSGI(app).listen(opt.port, lambda config: logging.info(msg)).run()
  155. def create_fork():
  156. n = os.fork()
  157. # n greater than 0 means parent process
  158. if not n > 0:
  159. run_app()
  160. # fork limiting the cpu count - 1
  161. for i in range(1, workers):
  162. create_fork()
  163. run_app() # run app on the main process too :)