app.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import os
  2. import ujson
  3. import asyncpg
  4. import multiprocessing
  5. import random
  6. import blacksheep as bs
  7. import jinja2
  8. from email.utils import formatdate
  9. try:
  10. from ujson import dumps as jsonify
  11. except:
  12. from json import dumps as jsonify
  13. _is_travis = os.environ.get('TRAVIS') == 'true'
  14. _is_gunicorn = "gunicorn" in os.environ.get("SERVER_SOFTWARE", "")
  15. _cpu_count = multiprocessing.cpu_count()
  16. if _is_travis:
  17. _cpu_count = 2
  18. #from blacksheep.settings.json import json_settings
  19. #json_settings.use(dumps=jsonify)
  20. DBDRV = "postgres"
  21. DBHOST = "tfb-database"
  22. DBUSER = "benchmarkdbuser"
  23. DBPSWD = "benchmarkdbpass"
  24. READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
  25. WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  26. ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
  27. MAX_POOL_SIZE = 1000 // multiprocessing.cpu_count()
  28. MIN_POOL_SIZE = max(int(MAX_POOL_SIZE / 2), 1)
  29. db_pool = None
  30. g_response_server = None
  31. g_response_add_date = False
  32. async def setup_db(app):
  33. global db_pool
  34. db_pool = await asyncpg.create_pool(
  35. user=os.getenv('PGUSER', DBUSER),
  36. password=os.getenv('PGPASS', DBPSWD),
  37. database='hello_world',
  38. host=DBHOST,
  39. port=5432,
  40. min_size=MIN_POOL_SIZE,
  41. max_size=MAX_POOL_SIZE,
  42. )
  43. def load_fortunes_template():
  44. path = os.path.join('templates', 'fortune.html')
  45. with open(path, 'r') as template_file:
  46. template_text = template_file.read()
  47. return jinja2.Template(template_text)
  48. fortune_template = load_fortunes_template()
  49. app = bs.Application()
  50. app.on_start += setup_db
  51. def get_num_queries(request):
  52. try:
  53. value = request.query.get('queries')
  54. if value is None:
  55. return 1
  56. query_count = int(value[0])
  57. except (KeyError, IndexError, ValueError):
  58. return 1
  59. if query_count < 1:
  60. return 1
  61. if query_count > 500:
  62. return 500
  63. return query_count
  64. # ------------------------------------------------------------------------------------------
  65. async def bs_middleware(request, handler):
  66. global g_response_server, g_response_add_date
  67. response = await handler(request)
  68. if g_response_server:
  69. response.headers[b'Server'] = g_response_server
  70. if g_response_add_date:
  71. response.headers[b'Date'] = formatdate(timeval=None, localtime=False, usegmt=True)
  72. return response
  73. @app.route('/json')
  74. async def json_test(request):
  75. return bs.json( {'message': 'Hello, world!'} )
  76. @app.route('/db')
  77. async def single_db_query_test(request):
  78. row_id = random.randint(1, 10000)
  79. async with db_pool.acquire() as db_conn:
  80. number = await db_conn.fetchval(READ_ROW_SQL, row_id)
  81. world = {'id': row_id, 'randomNumber': number}
  82. return bs.json(world)
  83. @app.route('/queries')
  84. async def multiple_db_queries_test(request):
  85. num_queries = get_num_queries(request)
  86. row_ids = random.sample(range(1, 10000), num_queries)
  87. worlds = [ ]
  88. async with db_pool.acquire() as db_conn:
  89. statement = await db_conn.prepare(READ_ROW_SQL)
  90. for row_id in row_ids:
  91. number = await statement.fetchval(row_id)
  92. worlds.append( {"id": row_id, "randomNumber": number} )
  93. return bs.json(worlds)
  94. @app.route('/fortunes')
  95. async def fortunes_test(request):
  96. async with db_pool.acquire() as db_conn:
  97. fortunes = await db_conn.fetch("SELECT * FROM Fortune")
  98. fortunes.append(ADDITIONAL_ROW)
  99. fortunes.sort(key = lambda row: row[1])
  100. data = fortune_template.render(fortunes=fortunes)
  101. return bs.html(data)
  102. @app.route('/updates')
  103. async def db_updates_test(request):
  104. num_queries = get_num_queries(request)
  105. ids = sorted(random.sample(range(1, 10000 + 1), num_queries))
  106. numbers = sorted(random.sample(range(1, 10000), num_queries))
  107. updates = list(zip(ids, numbers))
  108. worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
  109. async with db_pool.acquire() as db_conn:
  110. statement = await db_conn.prepare(READ_ROW_SQL)
  111. for row_id, _ in updates:
  112. await statement.fetchval(row_id)
  113. await db_conn.executemany(WRITE_ROW_SQL, updates)
  114. return bs.json(worlds)
  115. @app.route('/plaintext')
  116. async def plaintext_test(request):
  117. return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
  118. #return bs.text('Hello, World!')
  119. # -----------------------------------------------------------------------------------
  120. if __name__ == "__main__":
  121. import optparse
  122. import logging
  123. import re
  124. parser = optparse.OptionParser("usage: %prog [options]", add_help_option=False)
  125. parser.add_option("-h", "--host", dest="host", default='0.0.0.0', type="string")
  126. parser.add_option("-p", "--port", dest="port", default=8080, type="int")
  127. parser.add_option("-s", "--server", dest="server", default="uvicorn", type="string")
  128. parser.add_option("-w", "--workers", dest="workers", default=0, type="int")
  129. parser.add_option("-k", "--keepalive", dest="keepalive", default=60, type="int")
  130. parser.add_option("-v", "--verbose", dest="verbose", default=0, type="int")
  131. (opt, args) = parser.parse_args()
  132. workers = _cpu_count
  133. if workers > 0:
  134. workers = opt.workers
  135. if _is_travis:
  136. workers = 2
  137. def run_app():
  138. global g_response_server, g_response_add_date
  139. if opt.gateway == "uvicorn":
  140. import uvicorn
  141. log_level = logging.ERROR
  142. uvicorn.run(app, host=opt.host, port=opt.port, workers=1, loop="uvloop", log_level=log_level, access_log=False)
  143. if opt.server == 'fastwsgi':
  144. import fastwsgi
  145. from blacksheep.utils.aio import get_running_loop
  146. g_response_server = b'FastWSGI'
  147. app.middlewares.append(bs_middleware)
  148. loop = get_running_loop()
  149. loop.run_until_complete(app.start())
  150. fastwsgi.run(app, host=opt.host, port=opt.port, loglevel=opt.verbose)
  151. if opt.server == 'socketify':
  152. import socketify
  153. msg = "Listening on http://0.0.0.0:{port} now\n".format(port=opt.port)
  154. socketify.WSGI(app).listen(opt.port, lambda config: logging.info(msg)).run()
  155. def create_fork():
  156. n = os.fork()
  157. # n greater than 0 means parent process
  158. if not n > 0:
  159. run_app()
  160. # fork limiting the cpu count - 1
  161. for i in range(1, workers):
  162. create_fork()
  163. run_app() # run app on the main process too :)