app.pl 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. use v5.40;
  2. use warnings;
  3. use Feersum::Runner;
  4. use EV; use AnyEvent;
  5. use DBI 'SQL_INTEGER';
  6. use DBD::Pg ':async';
  7. use Scalar::Util 'weaken';
  8. use List::Util qw'min max pairmap';
  9. use JSON::XS;
  10. use Text::Xslate;
  11. use LMDB_File qw':flags :error';
  12. use constant {
  13. host_port => $ENV{host_port} || '0.0.0.0:8080',
  14. debug => $ENV{debug} // 0,
  15. db => lc($ENV{db} || 'postgres'), # postgres / mysql / maria (will use for constant folding)
  16. db_name => $ENV{db_name} || 'hello_world',
  17. db_host => $ENV{db_host} || 'tfb-database',
  18. db_port => $ENV{db_port},
  19. db_user => $ENV{db_user} || 'benchmarkdbuser',
  20. db_pass => $ENV{db_pass} || 'benchmarkdbpass',
  21. empty => [], o => +{},
  22. reconnect_interval => 60,
  23. max_db_connections => 512,
  24. max_update_tries => 3
  25. };
  26. use constant max_batch_update_size => 1; # db eq 'postgres' ? 5 : 10; # rule of thumb
  27. use constant server => qw'Server Feersum';
  28. use constant {
  29. text => [server, qw'Content-Type text/plain'],
  30. json => [server, qw'Content-Type application/json'],
  31. html => [server, 'Content-Type', 'text/html; charset=utf-8'],
  32. nocontent => [server],
  33. };
  34. my @dsn = (
  35. (sprintf 'dbi:%s:port=%d;host=%s;database=%s;',
  36. (db eq 'mysql' ? ('mysql', db_port // 3306) :
  37. db eq 'maria' ? ('MariaDB', db_port // 3306) :
  38. db eq 'postgres' ? ('Pg', db_port // 5432)
  39. : die 'unknown db'), db_host, db_name),
  40. db_user, db_pass,
  41. +{qw'AutoCommit 1 RaiseError 0 PrintError 1',
  42. (db eq 'maria' ? (qw'mariadb_server_prepare 1 mariadb_ssl 0') :
  43. db eq 'mysql' ? (qw'mysql_server_prepare 1 mysql_ssl 0 mysql_get_server_pubkey 1') :
  44. db eq 'postgres' ? (qw'pg_server_prepare 1 sslmode 0') : ())}
  45. );
  46. chomp(my $cpus = `nproc`);
  47. say "$cpus cpus available" if debug;
  48. my $pool_size = int max_db_connections / $cpus; # number of db connections in each worker
  49. my $js = JSON::XS->new;
  50. my $html = render();
  51. cache('init');
  52. my %prepare = (
  53. world => ['select randomNumber, id from World where id = ?', SQL_INTEGER],
  54. fortune => ['select id, message from Fortune'],
  55. update1 => ['update World set randomNumber = ? where id = ?', (SQL_INTEGER) x 2],
  56. (map {
  57. 'update'.$_ =>
  58. [sprintf(
  59. (db eq 'mysql' || db eq 'maria') ? 'with t(v,i) as (values %s) update World w join t on t.i = w.id set w.randomNumber = t.v' :
  60. db eq 'postgres' ? 'with t(v,i) as (values %s) update World w set randomNumber = t.v from t where t.i = w.id' : undef,
  61. (join ',', ((db eq 'mysql' || db eq 'maria') ? 'row(?,?)' : '(?,?)') x $_)
  62. ), (SQL_INTEGER) x ($_ * 2)]
  63. } 2..max_batch_update_size)
  64. );
  65. my ($pool, $cache);
  66. my $w = EV::fork sub { # child init
  67. $pool = db_pool($pool_size, \@dsn, \%prepare); # db connection pool in each worker
  68. $cache = cache('use'); # cache
  69. };
  70. my %route = controllers();
  71. my $runner = Feersum::Runner->new(
  72. pre_fork => $cpus,
  73. quiet => !debug, keepalive => 1,
  74. max_connection_reqs => 1000,
  75. read_timeout => 60,
  76. listen => [host_port]
  77. )->run(sub ($h) { ($route{$h->path} // $route{404})->($h) });
  78. sub controllers {(
  79. '/plaintext', sub ($h) { $h->send_response(200, text, \'Hello, World!') },
  80. '/json', sub ($h) { $h->send_response(200, json, \$js->encode(+{ message => 'Hello, World!' })) },
  81. (map +('/db', $_, '/queries', $_, '/updates', $_ ), sub ($h) {
  82. my ($n) = (my $q = $h->query // '') =~ m/queries=(\d+)/a;
  83. $n = max(1, min($n//1, 500));
  84. my ($cv, @rs) = (AE::cv);
  85. my $on_done = sub { $h->send_response(200, json, \$js->encode($q ? \@rs : ($rs[0] // o))) };
  86. $cv->begin(
  87. $h->path ne '/updates'
  88. ? $on_done # select
  89. : sub { # update
  90. if (@rs) {
  91. my ($i, $j) = (0, 0);
  92. my $cv = AE::cv;
  93. $cv->begin($on_done);
  94. while () {
  95. $j = min($i + max_batch_update_size - 1, $#rs);
  96. say "$i $j" if debug;
  97. $cv->begin;
  98. $_->{randomNumber} = int(rand 10000) + 1 for @rs[$i..$j];
  99. my $tries = max_update_tries;
  100. my $st = 'update'.($j - $i + 1);
  101. my $args = [map @$_{qw/randomNumber id/}, @rs[$i..$j]];
  102. my $update = sub ($rv = undef, $sth_or_e = undef) {
  103. $cv->end, return if $rv;
  104. say 'retryin update on '.$sth_or_e if $tries < max_update_tries;
  105. say 'fail to update on '.max_update_tries.' tries ' and $cv->end unless $tries--;
  106. db_execute($pool, $st, $args, __SUB__);
  107. };
  108. $update->();
  109. $i += max_batch_update_size;
  110. last if $i >= @rs;
  111. }
  112. $cv->end;
  113. } else { $on_done->() }
  114. }
  115. );
  116. for (1..$n) {
  117. my $id = int(rand 10000) + 1;
  118. $cv->begin;
  119. db_execute($pool, world => [$id], sub ($rows, $sth) {
  120. push @rs, @{$sth->fetchall_arrayref(+{ randomNumber => 1, id => 1 })} if $rows > 0;
  121. $cv->end
  122. });
  123. }
  124. $cv->end
  125. }),
  126. '/fortunes' => sub ($h) {
  127. db_execute($pool, fortune => empty, sub ($rows, $sth) {
  128. $h->send_response(200, html, \$html->render('fortune.tx', +{ rows => [
  129. sort { $a->[1] cmp $b->[1] }
  130. @{$sth->fetchall_arrayref},
  131. [0, 'Additional fortune added at request time.']
  132. ]}))
  133. });
  134. },
  135. '/cached-queries' => sub ($h) {
  136. my ($n) = (my $q = $h->query // '') =~ m/count=(\d+)/a;
  137. $n = max(1, min($n//1, 500));
  138. my @rs = map +{ id => $_ , randomNumber => $cache->($_) }, map int(rand 10000) + 1, 1..$n;
  139. $h->send_response(200, json, \$js->encode(\@rs));
  140. },
  141. '/' => sub ($h) { $h->send_response(204, nocontent, empty) },
  142. 404 => sub ($h) { $h->send_response(404, nocontent, empty) }
  143. )}
  144. sub render {
  145. my $t = Text::Xslate->new(path => +{
  146. (my $file = 'fortune.tx') => <<~\html =~ s/(?<=[\r\n])\s+//sgr
  147. <!DOCTYPE html>
  148. <html>
  149. <head><title>Fortunes</title></head>
  150. <body>
  151. <table>
  152. <tr><th>id</th><th>message</th></tr>
  153. : for $rows -> $i {
  154. <tr><td><: $i.0 :></td><td><: $i.1 :></td></tr>
  155. : }
  156. </table>
  157. </body>
  158. </html>
  159. html
  160. });
  161. $t->load_file($file);
  162. $t
  163. }
  164. sub cache ($type = 'init') {
  165. my $path = '/dev/shm/feersum';
  166. say "clearing $path" and unlink glob "$path*" if $type eq 'init' && -e $path;
  167. my $env = LMDB::Env->new($path, +{
  168. mapsize => 1024*512,
  169. flags => MDB_WRITEMAP|MDB_NOSYNC|MDB_NOMETASYNC|MDB_NOTLS|MDB_NOSUBDIR|MDB_NORDAHEAD
  170. }) or die $LMDB_File::last_err;
  171. if ($type eq 'init') {
  172. die unless defined(my $tx = $env->BeginTxn);
  173. my $handle = $tx->open(undef, MDB_CREATE|MDB_INTEGERKEY);
  174. my $dbh = DBI->connect(@dsn);
  175. $tx->put($handle, $_->[0], pack S => $_->[1]) for @{$dbh->selectall_arrayref('select id, randomNumber from World')};
  176. $tx->commit;
  177. $dbh->disconnect;
  178. say 'cache populated' if debug;
  179. return;
  180. }
  181. my $tx = $env->BeginTxn(MDB_RDONLY);
  182. my $handle = $tx->open(undef, MDB_INTEGERKEY);
  183. sub ($k) { $tx->renew; $tx->get($handle, $k, my $v); unpack S => $v }
  184. }
  185. sub db_pool ($size, $dsn, $prepare = undef) {
  186. my %pool = (slot => [], active => +{}, free => [], pending => [], prepare => $prepare);
  187. db_connect(\%pool, $_, $dsn) for 0 .. $size - 1;
  188. \%pool
  189. }
  190. sub db_connect ($pool, $id, $dsn) {
  191. say "db[$id] connection.." if debug;
  192. my $dbh = DBI->connect(@$dsn);
  193. unless ($dbh) {
  194. warn sprintf 'err: %s. will try reconnect %d sec', $DBI::errstr, reconnect_interval;
  195. $pool->{slot}[$id] = AE::timer +reconnect_interval, 0, sub { db_connect($pool, $id, $dsn) }; # try later
  196. return
  197. }
  198. my $fd = db eq 'maria' ? $dbh->mariadb_sockfd : db eq 'mysql' ? $dbh->mysql_fd : db eq 'postgres' ? $dbh->{pg_socket} : undef;
  199. open my $fh, "<&=", $fd or die $!; # dup handle
  200. state $st_opt = +{
  201. db eq 'maria' ? (mariadb_async => 1) :
  202. db eq 'mysql' ? (async => 1) :
  203. db eq 'postgres' ? (pg_async => PG_ASYNC + PG_OLDQUERY_CANCEL) : ()
  204. };
  205. my %conn = (
  206. id => $id, db => $dbh, fd => $fd, fh => $fh, dsn => $dsn,
  207. st => +{ $pool->{prepare} ? (pairmap {
  208. my $sth = $dbh->prepare($b->[0], $st_opt);
  209. $sth->bind_param($_, undef, $b->[$_]) for 1..$#$b;
  210. ($a, $sth)
  211. } %{$pool->{prepare}}) : () },
  212. connected => 1,
  213. );
  214. $conn{w} = EV::io $fh, EV::READ, sub {
  215. my $e;
  216. { ;
  217. $e = 'inactive', last unless defined(my $st = $conn{active});
  218. if ($st) { # executed st
  219. $e = 'nost', last unless my $sth = $conn{st}{$st};
  220. $e = 'unready', last unless
  221. db eq 'maria' ? $sth->mariadb_async_ready :
  222. db eq 'mysql' ? $sth->mysql_async_ready :
  223. db eq 'postgres' ? $sth->pg_ready : undef;
  224. $e = 'noresult', $sth->finish unless defined(
  225. my $rows =
  226. db eq 'maria' ? $sth->mariadb_async_result :
  227. db eq 'mysql' ? $sth->mysql_async_result :
  228. db eq 'postgres' ? $sth->pg_result : undef
  229. );
  230. say "db[$id $fd] calling cb: ".$st if debug;
  231. if (my $cb = $conn{cb}) { $cb->($rows, $e // $sth) }
  232. else { say "db[$id $fd] no handler for response with $rows rows" }
  233. $sth->finish unless $e;
  234. } else { # db do
  235. $e = 'nodb', last unless my $dbh = $conn{db};
  236. $e = 'unready', last unless
  237. db eq 'maria' ? $dbh->mariadb_async_ready :
  238. db eq 'mysql' ? $dbh->mysql_async_ready :
  239. db eq 'postgres' ? $dbh->pg_ready : undef;
  240. $e = 'noresult' unless defined(
  241. my $rv =
  242. db eq 'maria' ? $dbh->mariadb_async_result :
  243. db eq 'mysql' ? $dbh->mysql_async_result :
  244. db eq 'postgres' ? $dbh->pg_result : undef
  245. );
  246. say "db[$id $fd] calling cb: db do query" if debug;
  247. if (my $cb = $conn{cb}) { $cb->($rv, $e) }
  248. else { say "db[$id $fd] no handler response with $rv return" }
  249. }
  250. say "db[$id $fd] error: $e " if debug && $e;
  251. say "db[$id $fd] finish" if debug;
  252. delete $conn{active};
  253. delete $pool->{active}{$id};
  254. push @{$pool->{free}}, \%conn;
  255. if (defined(my $pending = shift @{$pool->{pending}})) {
  256. my $code = shift @$pending;
  257. $code->($pool, splice @$pending)
  258. }
  259. return
  260. }
  261. say "db[$id $fd] $e" if debug;
  262. if (eof($fh) || (my $inactive = $e eq 'inactive')) {
  263. say "db[$id $fd] disconnected" if debug;
  264. delete @conn{qw/w connected/};
  265. $conn{db}->disconnect if $inactive;
  266. $conn{cb}->(-1, undef) if $conn{st} && $conn{active} && $conn{cb};
  267. db_connect($pool, $id, $dsn); # reconnect
  268. } else {
  269. say "db[$id $fd] stalled?";
  270. }
  271. };
  272. say "db[$id $fd] connected" if debug;
  273. $pool->{slot}[$id] = \%conn;
  274. weaken(my $weak = $pool->{slot}[$id]);
  275. push @{$pool->{free}}, $weak;
  276. if (defined(my $pending = shift @{$pool->{pending}})) {
  277. my $code = shift @$pending;
  278. $code->($pool, splice @$pending)
  279. }
  280. }
  281. sub db_execute ($pool, $st, $args, $cb) {
  282. say 'db executing..' if debug;
  283. while (my $conn = shift @{$pool->{free}}) {
  284. (debug and say 'skip unconnected'), next unless defined($conn) && $conn->{connected};
  285. say 'on connection..'.$conn->{id} if debug;
  286. if ($conn->{st}{$st}->execute(@$args)) {
  287. (@$conn{qw/cb active/}, $pool->{active}{$conn->{id}}) = ($cb, $st, 1);
  288. return;
  289. } else {
  290. say 'error: ', $conn->{st}{$st}->errstr;
  291. db_connect($pool, @$conn{qw/id dsn/}); # reconnect
  292. next;
  293. }
  294. }
  295. say '..put to pending..' if debug;
  296. push @{$pool->{pending}}, [__SUB__, $st, $args, $cb];
  297. }
  298. sub db_do ($pool, $query, $args, $cb) {
  299. say 'db doing..' if debug;
  300. state $db_opt = +{
  301. db eq 'maria' ? (mariadb_async => 1) :
  302. db eq 'mysql' ? (async => 1) :
  303. db eq 'postgres' ? (pg_async => PG_ASYNC + PG_OLDQUERY_CANCEL) : ()
  304. };
  305. while (my $conn = shift @{$pool->{free}}) {
  306. (debug and say 'skip unconnected'), next unless defined($conn) && $conn->{connected};
  307. say 'on connection..'.$conn->{id} if debug;
  308. if ($conn->{db}->do($query, $db_opt, defined($args) ? @$args : ())) {
  309. (@$conn{qw/cb active/}, $pool->{active}{$conn->{id}}) = ($cb, 0, 1);
  310. return;
  311. } else {
  312. say 'error: ', $conn->{db}->errstr;
  313. db_connect($pool, @$conn{qw/id dsn/}); # reconnect
  314. next;
  315. }
  316. }
  317. say '..put to pending..' if debug;
  318. push @{$pool->{pending}}, [__SUB__, $query, $args, $cb];
  319. }