app.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import asyncio
  2. import asyncpg
  3. import jinja2
  4. import os
  5. import ujson as json
  6. from functools import partial
  7. from random import randint
  8. from operator import itemgetter
  9. from urllib import parse
  10. async def setup():
  11. global pool
  12. pool = await asyncpg.create_pool(
  13. user=os.getenv('PGUSER', 'benchmarkdbuser'),
  14. password=os.getenv('PGPASS', 'benchmarkdbpass'),
  15. database='hello_world',
  16. host=os.getenv('DBHOST', 'localhost'),
  17. port=5432
  18. )
  19. pool = None
  20. additional = [0, 'Additional fortune added at request time.']
  21. key = itemgetter(1)
  22. template = None
  23. path = os.path.join('templates', 'fortune.html')
  24. with open(path, 'r') as template_file:
  25. template_text = template_file.read()
  26. template = jinja2.Template(template_text)
  27. loop = asyncio.get_event_loop()
  28. loop.run_until_complete(setup())
  29. def get_query_count(query_string):
  30. # helper to deal with the querystring passed in
  31. queries = parse.parse_qs(query_string).get(b'queries', [None])[0]
  32. if queries:
  33. try:
  34. query_count = int(queries)
  35. if query_count < 1:
  36. return 1
  37. if query_count > 500:
  38. return 500
  39. return query_count
  40. except ValueError:
  41. pass
  42. return 1
  43. random_int = partial(randint, 1, 10000)
  44. async def json_endpoint(message, channels):
  45. content = json.dumps({'message': 'Hello, world!'}).encode('utf-8')
  46. await channels['reply'].send({
  47. 'status': 200,
  48. 'headers': [
  49. [b'content-type', b'application/json'],
  50. ],
  51. 'content': content
  52. })
  53. async def fortunes_endpoint(message, channels):
  54. connection = await pool.acquire()
  55. try:
  56. fortunes = await connection.fetch('SELECT * FROM Fortune')
  57. fortunes.append(additional)
  58. fortunes.sort(key=key)
  59. content = template.render(fortunes=fortunes).encode('utf-8')
  60. await channels['reply'].send({
  61. 'status': 200,
  62. 'headers': [
  63. [b'content-type', b'text/html; charset=utf-8'],
  64. ],
  65. 'content': content
  66. })
  67. finally:
  68. await pool.release(connection)
  69. async def plaintext_endpoint(message, channels):
  70. await channels['reply'].send({
  71. 'status': 200,
  72. 'headers': [
  73. [b'content-type', b'text/plain'],
  74. ],
  75. 'content': b'Hello, world!'
  76. })
  77. async def handle_404(message, channels):
  78. await channels['reply'].send({
  79. 'status': 404,
  80. 'headers': [
  81. [b'content-type', b'text/plain'],
  82. ],
  83. 'content': b'Not found'
  84. })
  85. async def db_endpoint(message, channels):
  86. """Test Type 2: Single database object"""
  87. async with pool.acquire() as connection:
  88. row = await connection.fetchrow('SELECT id, "randomnumber" FROM "world" WHERE id = ' + str(random_int()))
  89. world = {'id': row[0], 'randomNumber': row[1]}
  90. await channels['reply'].send({
  91. 'status': 200,
  92. 'headers': [
  93. [b'content-type', b'application/json'],
  94. ],
  95. 'content': json.dumps(world).encode('utf-8')
  96. })
  97. async def queries_endpoint(message, channels):
  98. """Test Type 3: Multiple database queries"""
  99. queries = get_query_count(message.get('query_string', {}))
  100. async with pool.acquire() as connection:
  101. worlds = []
  102. for i in range(queries):
  103. sql = 'SELECT id, "randomnumber" FROM "world" WHERE id = ' + str(random_int())
  104. row = await connection.fetchrow(sql)
  105. worlds.append({'id': row[0], 'randomNumber': row[1]})
  106. await channels['reply'].send({
  107. 'status': 200,
  108. 'headers': [
  109. [b'content-type', b'application/json'],
  110. ],
  111. 'content': json.dumps(worlds).encode('utf-8')
  112. })
  113. async def updates_endpoint(message, channels):
  114. """Test 5: Database Updates"""
  115. queries = get_query_count(message.get('query_string', {}))
  116. async with pool.acquire() as connection:
  117. worlds = []
  118. for i in range(queries):
  119. row = await connection.fetchrow('SELECT id FROM "world" WHERE id=' + str(random_int()))
  120. worlds.append({'id': row[0], 'randomNumber': random_int()})
  121. await connection.execute('UPDATE "world" SET "randomnumber"=%s WHERE id=%s' % (random_int(), row[0]))
  122. await channels['reply'].send({
  123. 'status': 200,
  124. 'headers': [
  125. [b'content-type', b'application/json'],
  126. ],
  127. 'content': json.dumps(worlds).encode('utf-8')
  128. })
  129. routes = {
  130. '/json': json_endpoint,
  131. '/fortunes': fortunes_endpoint,
  132. '/plaintext': plaintext_endpoint,
  133. '/db': db_endpoint,
  134. '/queries': queries_endpoint,
  135. '/updates': updates_endpoint,
  136. }
  137. async def main(message, channels):
  138. path = message['path']
  139. await routes.get(path, handle_404)(message, channels)