app-asgi.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. import os
  2. import sys
  3. import asyncio
  4. import asyncpg
  5. import jinja2
  6. import random
  7. from operator import itemgetter
  8. from urllib.parse import parse_qs
  9. import asyncache
  10. import cachetools
  11. try:
  12. from ujson import dumps as jsonify
  13. except:
  14. from json import dumps as jsonify
  15. db_pool = None
  16. async def db_setup():
  17. global db_pool
  18. db_pool = await asyncpg.create_pool(
  19. user=os.getenv('PGUSER', 'benchmarkdbuser'),
  20. password=os.getenv('PGPASS', 'benchmarkdbpass'),
  21. database='hello_world',
  22. host='tfb-database',
  23. port=5432
  24. )
  25. READ_ROW_SQL = 'SELECT "randomnumber", "id" FROM "world" WHERE id = $1'
  26. WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  27. ADDITIONAL_ROW = [0, 'Additional fortune added at request time.']
  28. JSON_RESPONSE = {
  29. 'type': 'http.response.start',
  30. 'status': 200,
  31. 'headers': [
  32. [b'Content-Type', b'application/json'],
  33. ]
  34. }
  35. HTML_RESPONSE = {
  36. 'type': 'http.response.start',
  37. 'status': 200,
  38. 'headers': [
  39. [b'Content-Type', b'text/html; charset=utf-8'],
  40. ]
  41. }
  42. PLAINTEXT_RESPONSE = {
  43. 'type': 'http.response.start',
  44. 'status': 200,
  45. 'headers': [
  46. [b'Content-Type', b'text/plain; charset=utf-8'],
  47. ]
  48. }
  49. def get_num_queries(scope, name = b'queries'):
  50. try:
  51. query_string = scope['query_string']
  52. query_count = int(parse_qs(query_string)[name][0])
  53. except (KeyError, IndexError, ValueError):
  54. return 1
  55. if query_count < 1:
  56. return 1
  57. if query_count > 500:
  58. return 500
  59. return query_count
  60. async def json_serialization(scope, receive, send):
  61. content = jsonify( {'message': 'Hello, world!'} )
  62. await send(JSON_RESPONSE)
  63. await send({
  64. 'type': 'http.response.body',
  65. 'body': content.encode('utf8'),
  66. 'more_body': False
  67. })
  68. async def single_database_query(scope, receive, send):
  69. row_id = random.randint(1, 10000)
  70. db_conn = await db_pool.acquire()
  71. try:
  72. number = await db_conn.fetchval(READ_ROW_SQL, row_id)
  73. world = {'id': row_id, 'randomNumber': number}
  74. finally:
  75. await db_pool.release(db_conn)
  76. content = jsonify(world)
  77. await send(JSON_RESPONSE)
  78. await send({
  79. 'type': 'http.response.body',
  80. 'body': content.encode('utf8'),
  81. 'more_body': False
  82. })
  83. async def multiple_database_queries(scope, receive, send):
  84. num_queries = get_num_queries(scope)
  85. row_ids = random.sample(range(1, 10000), num_queries)
  86. worlds = [ ]
  87. db_conn = await db_pool.acquire()
  88. try:
  89. statement = await db_conn.prepare(READ_ROW_SQL)
  90. for row_id in row_ids:
  91. number = await statement.fetchval(row_id)
  92. worlds.append( {'id': row_id, 'randomNumber': number} )
  93. finally:
  94. await db_pool.release(db_conn)
  95. content = jsonify(worlds)
  96. await send(JSON_RESPONSE)
  97. await send({
  98. 'type': 'http.response.body',
  99. 'body': content.encode('utf8'),
  100. 'more_body': False
  101. })
  102. _get_item1 = itemgetter(1)
  103. fortunes_template = None
  104. fn = os.path.join('templates', 'fortune.html')
  105. with open(fn, 'r') as file:
  106. text = file.read()
  107. fortunes_template = jinja2.Template(text)
  108. async def fortunes(scope, receive, send):
  109. db_conn = await db_pool.acquire()
  110. try:
  111. fortunes = await db_conn.fetch('SELECT * FROM Fortune')
  112. finally:
  113. await db_pool.release(db_conn)
  114. fortunes.append(ADDITIONAL_ROW)
  115. fortunes.sort(key = _get_item1)
  116. content = fortunes_template.render(fortunes=fortunes)
  117. await send(HTML_RESPONSE)
  118. await send({
  119. 'type': 'http.response.body',
  120. 'body': content.encode('utf8'),
  121. 'more_body': False
  122. })
  123. async def database_updates(scope, receive, send):
  124. num_queries = get_num_queries(scope)
  125. ids = sorted(random.sample(range(1, 10000 + 1), num_queries))
  126. numbers = sorted(random.sample(range(1, 10000), num_queries))
  127. updates = list(zip(ids, numbers))
  128. worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
  129. db_conn = await db_pool.acquire()
  130. try:
  131. statement = await db_conn.prepare(READ_ROW_SQL)
  132. for row_id, _ in updates:
  133. await statement.fetchval(row_id)
  134. await db_conn.executemany(WRITE_ROW_SQL, updates)
  135. finally:
  136. await db_pool.release(db_conn)
  137. content = jsonify(worlds)
  138. await send(JSON_RESPONSE)
  139. await send({
  140. 'type': 'http.response.body',
  141. 'body': content.encode('utf8'),
  142. 'more_body': False
  143. })
  144. from asyncache import cached
  145. from cachetools.keys import hashkey
  146. @cached(cache={}, key=lambda stmt, id: hashkey(id))
  147. async def get_cached_world(stmt, id):
  148. result = await stmt.fetchrow(id)
  149. return {'id': result[1], 'randomNumber': result[0]}
  150. async def cached_queries(scope, receive, send):
  151. count = get_num_queries(scope, b'count')
  152. row_ids = random.sample(range(1, 10000 + 1), count)
  153. db_conn = await db_pool.acquire()
  154. try:
  155. statement = await db_conn.prepare(READ_ROW_SQL)
  156. worlds = [ await get_cached_world(statement, id) for id in row_ids ]
  157. finally:
  158. await db_pool.release(db_conn)
  159. content = jsonify(worlds)
  160. await send(JSON_RESPONSE)
  161. await send({
  162. 'type': 'http.response.body',
  163. 'body': content.encode('utf8'),
  164. 'more_body': False
  165. })
  166. async def plaintext(scope, receive, send):
  167. await send(PLAINTEXT_RESPONSE)
  168. await send({
  169. 'type': 'http.response.body',
  170. 'body': b'Hello, world!',
  171. 'more_body': False
  172. })
  173. async def handle_404(scope, receive, send):
  174. await send(PLAINTEXT_RESPONSE)
  175. await send({
  176. 'type': 'http.response.body',
  177. 'body': b'Not found',
  178. 'more_body': False
  179. })
  180. routes = {
  181. '/json': json_serialization,
  182. '/db': single_database_query,
  183. '/queries': multiple_database_queries,
  184. '/fortunes': fortunes,
  185. '/updates': database_updates,
  186. '/cached-queries': cached_queries,
  187. '/plaintext': plaintext,
  188. }
  189. async def app(scope, receive, send):
  190. if scope['type'] == 'lifespan':
  191. global db_pool
  192. while True:
  193. message = await receive()
  194. if message['type'] == 'lifespan.startup':
  195. await db_setup()
  196. await send({'type': 'lifespan.startup.complete'})
  197. elif message['type'] == 'lifespan.shutdown':
  198. db_pool.close()
  199. await send({'type': 'lifespan.shutdown.complete'})
  200. return
  201. else:
  202. path = scope['path']
  203. app_handler = routes.get(path, handle_404)
  204. await app_handler(scope, receive, send)
  205. # -----------------------------------------------------------------------------------------------------
  206. if __name__ == "__main__":
  207. import multiprocessing
  208. import fastwsgi
  209. _is_travis = os.environ.get('TRAVIS') == 'true'
  210. workers = int(multiprocessing.cpu_count())
  211. if _is_travis:
  212. workers = 2
  213. host = '0.0.0.0'
  214. port = 3000
  215. def run_app():
  216. loop = asyncio.get_event_loop()
  217. loop.run_until_complete(db_setup())
  218. fastwsgi.server.backlog = 4096
  219. fastwsgi.run(app, host, port, loglevel=2)
  220. def create_fork():
  221. n = os.fork()
  222. # n greater than 0 means parent process
  223. if not n > 0:
  224. run_app()
  225. # fork limiting the cpu count - 1
  226. for i in range(1, workers):
  227. create_fork()
  228. run_app() # run app on the main process too :)