app-asgi.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. import os
  2. import sys
  3. import asyncio
  4. import asyncpg
  5. import jinja2
  6. import random
  7. from operator import itemgetter
  8. from urllib.parse import parse_qs
  9. import asyncache
  10. import cachetools
  11. try:
  12. from ujson import dumps as jsonify
  13. except:
  14. from json import dumps as jsonify
  15. db_pool = None
  16. PG_POOL_SIZE = 4
  17. class NoResetConnection(asyncpg.Connection):
  18. __slots__ = ()
  19. def get_reset_query(self):
  20. return ""
  21. async def db_setup():
  22. global db_pool
  23. db_pool = await asyncpg.create_pool(
  24. user=os.getenv('PGUSER', 'benchmarkdbuser'),
  25. password=os.getenv('PGPASS', 'benchmarkdbpass'),
  26. database='hello_world',
  27. host='tfb-database',
  28. port=5432,
  29. min_size=PG_POOL_SIZE,
  30. max_size=PG_POOL_SIZE,
  31. connection_class=NoResetConnection,
  32. )
  33. READ_ROW_SQL = 'SELECT "randomnumber", "id" FROM "world" WHERE id = $1'
  34. WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
  35. ADDITIONAL_ROW = [0, 'Additional fortune added at request time.']
  36. JSON_RESPONSE = {
  37. 'type': 'http.response.start',
  38. 'status': 200,
  39. 'headers': [
  40. [b'Content-Type', b'application/json'],
  41. ]
  42. }
  43. HTML_RESPONSE = {
  44. 'type': 'http.response.start',
  45. 'status': 200,
  46. 'headers': [
  47. [b'Content-Type', b'text/html; charset=utf-8'],
  48. ]
  49. }
  50. PLAINTEXT_RESPONSE = {
  51. 'type': 'http.response.start',
  52. 'status': 200,
  53. 'headers': [
  54. [b'Content-Type', b'text/plain; charset=utf-8'],
  55. ]
  56. }
  57. def get_num_queries(scope, name = b'queries'):
  58. try:
  59. query_string = scope['query_string']
  60. query_count = int(parse_qs(query_string)[name][0])
  61. except (KeyError, IndexError, ValueError):
  62. return 1
  63. if query_count < 1:
  64. return 1
  65. if query_count > 500:
  66. return 500
  67. return query_count
  68. async def json_serialization(scope, receive, send):
  69. content = jsonify( {'message': 'Hello, world!'} )
  70. await send(JSON_RESPONSE)
  71. await send({
  72. 'type': 'http.response.body',
  73. 'body': content.encode('utf8'),
  74. 'more_body': False
  75. })
  76. async def single_database_query(scope, receive, send):
  77. row_id = random.randint(1, 10000)
  78. db_conn = await db_pool.acquire()
  79. try:
  80. number = await db_conn.fetchval(READ_ROW_SQL, row_id)
  81. world = {'id': row_id, 'randomNumber': number}
  82. finally:
  83. await db_pool.release(db_conn)
  84. content = jsonify(world)
  85. await send(JSON_RESPONSE)
  86. await send({
  87. 'type': 'http.response.body',
  88. 'body': content.encode('utf8'),
  89. 'more_body': False
  90. })
  91. async def multiple_database_queries(scope, receive, send):
  92. num_queries = get_num_queries(scope)
  93. row_ids = random.sample(range(1, 10000), num_queries)
  94. worlds = [ ]
  95. async with db_pool.acquire() as db_conn:
  96. rows = await db_conn.fetchmany(READ_ROW_SQL, [ (v, ) for v in row_ids ] )
  97. worlds = [ { 'id': row_id, 'randomNumber': number[0] } for row_id, number in zip(row_ids, rows) ]
  98. content = jsonify(worlds)
  99. await send(JSON_RESPONSE)
  100. await send({
  101. 'type': 'http.response.body',
  102. 'body': content.encode('utf8'),
  103. 'more_body': False
  104. })
  105. _get_item1 = itemgetter(1)
  106. fortunes_template = None
  107. fn = os.path.join('templates', 'fortune.html')
  108. with open(fn, 'r') as file:
  109. text = file.read()
  110. fortunes_template = jinja2.Template(text)
  111. async def fortunes(scope, receive, send):
  112. db_conn = await db_pool.acquire()
  113. try:
  114. fortunes = await db_conn.fetch('SELECT * FROM Fortune')
  115. finally:
  116. await db_pool.release(db_conn)
  117. fortunes.append(ADDITIONAL_ROW)
  118. fortunes.sort(key = _get_item1)
  119. content = fortunes_template.render(fortunes=fortunes)
  120. await send(HTML_RESPONSE)
  121. await send({
  122. 'type': 'http.response.body',
  123. 'body': content.encode('utf8'),
  124. 'more_body': False
  125. })
  126. async def database_updates(scope, receive, send):
  127. num_queries = get_num_queries(scope)
  128. ids = sorted(random.sample(range(1, 10000 + 1), num_queries))
  129. numbers = sorted(random.sample(range(1, 10000), num_queries))
  130. updates = list(zip(ids, numbers))
  131. worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
  132. async with db_pool.acquire() as db_conn:
  133. await db_conn.executemany(READ_ROW_SQL, [ (i[0], ) for i in updates ] )
  134. await db_conn.executemany(WRITE_ROW_SQL, updates)
  135. content = jsonify(worlds)
  136. await send(JSON_RESPONSE)
  137. await send({
  138. 'type': 'http.response.body',
  139. 'body': content.encode('utf8'),
  140. 'more_body': False
  141. })
  142. from asyncache import cached
  143. from cachetools.keys import hashkey
  144. @cached(cache={}, key=lambda stmt, id: hashkey(id))
  145. async def get_cached_world(stmt, id):
  146. result = await stmt.fetchrow(id)
  147. return {'id': result[1], 'randomNumber': result[0]}
  148. async def cached_queries(scope, receive, send):
  149. count = get_num_queries(scope, b'count')
  150. row_ids = random.sample(range(1, 10000 + 1), count)
  151. db_conn = await db_pool.acquire()
  152. try:
  153. statement = await db_conn.prepare(READ_ROW_SQL)
  154. worlds = [ await get_cached_world(statement, id) for id in row_ids ]
  155. finally:
  156. await db_pool.release(db_conn)
  157. content = jsonify(worlds)
  158. await send(JSON_RESPONSE)
  159. await send({
  160. 'type': 'http.response.body',
  161. 'body': content.encode('utf8'),
  162. 'more_body': False
  163. })
  164. async def plaintext(scope, receive, send):
  165. await send(PLAINTEXT_RESPONSE)
  166. await send({
  167. 'type': 'http.response.body',
  168. 'body': b'Hello, world!',
  169. 'more_body': False
  170. })
  171. async def handle_404(scope, receive, send):
  172. await send(PLAINTEXT_RESPONSE)
  173. await send({
  174. 'type': 'http.response.body',
  175. 'body': b'Not found',
  176. 'more_body': False
  177. })
  178. routes = {
  179. '/json': json_serialization,
  180. '/db': single_database_query,
  181. '/queries': multiple_database_queries,
  182. '/fortunes': fortunes,
  183. '/updates': database_updates,
  184. '/cached-queries': cached_queries,
  185. '/plaintext': plaintext,
  186. }
  187. async def app(scope, receive, send):
  188. if scope['type'] == 'lifespan':
  189. global db_pool
  190. while True:
  191. message = await receive()
  192. if message['type'] == 'lifespan.startup':
  193. await db_setup()
  194. await send({'type': 'lifespan.startup.complete'})
  195. elif message['type'] == 'lifespan.shutdown':
  196. db_pool.close()
  197. await send({'type': 'lifespan.shutdown.complete'})
  198. return
  199. else:
  200. path = scope['path']
  201. app_handler = routes.get(path, handle_404)
  202. await app_handler(scope, receive, send)
  203. # -----------------------------------------------------------------------------------------------------
  204. if __name__ == "__main__":
  205. import multiprocessing
  206. import fastwsgi
  207. _is_travis = os.environ.get('TRAVIS') == 'true'
  208. workers = int(multiprocessing.cpu_count())
  209. if _is_travis:
  210. workers = 2
  211. host = '0.0.0.0'
  212. port = 3000
  213. def run_app():
  214. loop = asyncio.get_event_loop()
  215. loop.run_until_complete(db_setup())
  216. fastwsgi.server.backlog = 4096
  217. fastwsgi.run(app, host, port, loglevel=2)
  218. def create_fork():
  219. n = os.fork()
  220. # n greater than 0 means parent process
  221. if not n > 0:
  222. run_app()
  223. # fork limiting the cpu count - 1
  224. for i in range(1, workers):
  225. create_fork()
  226. run_app() # run app on the main process too :)