server.php 6.2 KB


  1. <?php
  2. require __DIR__ . '/vendor/autoload.php';
  3. use Amp\Cluster\Cluster;
  4. use Amp\Coroutine;
  5. use Amp\Http\Server\Request;
  6. use Amp\Http\Server\RequestHandler;
  7. use Amp\Http\Server\Response;
  8. use Amp\Http\Server\Router;
  9. use Amp\Http\Server\Options;
  10. use Amp\Http\Server\Server;
  11. use Amp\Promise;
  12. use Amp\Success;
  13. use Monolog\Logger;
  14. define('DB_HOST', gethostbyname('tfb-database'));
  15. define('CONCURRENCY_LIMIT', 102400);
  16. Amp\Loop::run(function () {
  17. $sockets = yield [
  18. Cluster::listen('0.0.0.0:8080',
  19. (new Amp\Socket\BindContext)->withBacklog(CONCURRENCY_LIMIT)
  20. ->withReusePort()
  21. ->withTcpNoDelay()
  22. )
  23. ];
  24. $router = new Router;
  25. $config = Amp\Mysql\ConnectionConfig::fromString(
  26. 'host='.DB_HOST.' user=benchmarkdbuser password=benchmarkdbpass db=hello_world'
  27. );
  28. $connector = new Amp\Mysql\CancellableConnector(
  29. new class implements Amp\Socket\Connector {
  30. public function connect(
  31. string $uri,
  32. ?Amp\Socket\ConnectContext $context = null,
  33. ?Amp\CancellationToken $token = null
  34. ): Amp\Promise {
  35. $context = $context ?? new Amp\Socket\ConnectContext;
  36. $context = $context->withTcpNoDelay();
  37. return Amp\Socket\connector()->connect($uri, $context, $token);
  38. }
  39. }
  40. );
  41. $mysql = new Amp\Mysql\Pool($config, 512 * 50, 300, $connector);
  42. // Case 1 - JSON
  43. $router->addRoute('GET', '/json', new class implements RequestHandler {
  44. public function handleRequest(Request $request): Promise {
  45. return new Success(new Response(200, [
  46. 'Content-Type' => 'application/json',
  47. 'Server' => 'amphp/http-server',
  48. ], \json_encode([
  49. 'message' => 'Hello, World!',
  50. ])));
  51. }
  52. });
  53. // Case 2 - Single Query
  54. $router->addRoute('GET', '/db', new class ($mysql) implements RequestHandler {
  55. private $mysql;
  56. public function __construct($mysql) {
  57. $this->mysql = $mysql;
  58. }
  59. public function handleRequest(Request $request): Promise {
  60. return new Coroutine($this->doHandleRequest($request));
  61. }
  62. private function doHandleRequest($request) {
  63. $statement = yield $this->mysql->prepare('SELECT * FROM World WHERE id = ?');
  64. $result = yield $statement->execute([mt_rand(1, 10000)]);
  65. if (yield $result->advance()) {
  66. $item = $result->getCurrent();
  67. } else {
  68. $item = null;
  69. }
  70. return new Response(200, [
  71. 'Content-Type' => 'application/json',
  72. 'Server' => 'amphp/http-server',
  73. ], \json_encode($item));
  74. }
  75. });
  76. // Case 3 - Multiple Queries
  77. $router->addRoute('GET', '/queries', new class ($mysql) implements RequestHandler {
  78. private $mysql;
  79. public function __construct($mysql) {
  80. $this->mysql = $mysql;
  81. }
  82. public function handleRequest(Request $request): Promise {
  83. return new Coroutine($this->doHandleRequest($request));
  84. }
  85. private function doHandleRequest($request) {
  86. $query = $request->getUri()->getQuery();
  87. \parse_str($query, $queryParams);
  88. $queries = (int) ($queryParams['q'] ?? 1);
  89. if ($queries < 1) {
  90. $queries = 1;
  91. } elseif ($queries > 500) {
  92. $queries = 500;
  93. }
  94. $items = [];
  95. $statement = yield $this->mysql->prepare('SELECT * FROM World WHERE id = ?');
  96. while ($queries--) {
  97. $items[] = new Coroutine($this->execute($statement));
  98. }
  99. return new Response(200, [
  100. 'Content-Type' => 'application/json',
  101. 'Server' => 'amphp/http-server',
  102. ], \json_encode(yield $items));
  103. }
  104. private function execute($statement) {
  105. $result = yield $statement->execute([mt_rand(1, 10000)]);
  106. yield $result->advance();
  107. return $result->getCurrent();
  108. }
  109. });
  110. // Case 4 - Fortunes
  111. $router->addRoute('GET', '/fortunes', new class ($mysql) implements RequestHandler {
  112. private $mysql;
  113. public function __construct($mysql) {
  114. $this->mysql = $mysql;
  115. }
  116. public function handleRequest(Request $request): Promise {
  117. return new Coroutine($this->doHandleRequest($request));
  118. }
  119. private function doHandleRequest($request) {
  120. $result = yield $this->mysql->query('SELECT * FROM Fortune');
  121. $items = [];
  122. while (yield $result->advance()) {
  123. $item = $result->getCurrent();
  124. $items[$item['id']] = $item['message'];
  125. }
  126. $items[0] = 'Additional fortune added at request time.';
  127. \asort($items);
  128. \ob_start();
  129. require __DIR__ . '/fortunes.php';
  130. return new Response(200, [
  131. 'Content-Type' => 'text/html; charset=utf-8',
  132. 'Server' => 'amphp/http-server',
  133. ], \ob_get_clean());
  134. }
  135. });
  136. // Case 6 - Plaintext
  137. $router->addRoute('GET', '/plaintext', new class implements RequestHandler {
  138. public function handleRequest(Request $request): Promise {
  139. return new Success(new Response(200, [
  140. 'Content-Type' => 'text/plain',
  141. 'Server' => 'amphp/http-server',
  142. ], 'Hello, World!'));
  143. }
  144. });
  145. $logger = new Logger('Worker-' . Cluster::getId());
  146. $logger->pushHandler(Cluster::createLogHandler());
  147. $logger->info('Using ' . get_class(Amp\Loop::get()));
  148. $options = (new Options)
  149. ->withoutCompression()
  150. ->withConnectionLimit(CONCURRENCY_LIMIT)
  151. ->withConnectionsPerIpLimit(CONCURRENCY_LIMIT);
  152. $server = new Server($sockets, $router, $logger, $options);
  153. yield $server->start();
  154. Amp\Loop::onSignal(\SIGINT, function (string $watcherId) use ($server) {
  155. Amp\Loop::cancel($watcherId);
  156. yield $server->stop();
  157. });
  158. });