123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334 |
- use v5.40;
- use warnings;
- use Feersum::Runner;
- use EV; use AnyEvent;
- use DBI 'SQL_INTEGER';
- use DBD::Pg ':async';
- use Scalar::Util 'weaken';
- use List::Util qw'min max pairmap';
- use JSON::XS;
- use Text::Xslate;
- use LMDB_File qw':flags :error';
- use constant {
- host_port => $ENV{host_port} || '0.0.0.0:8080',
- debug => $ENV{debug} // 0,
- db => lc($ENV{db} || 'postgres'), # postgres / mysql / maria (will use for constant folding)
- db_name => $ENV{db_name} || 'hello_world',
- db_host => $ENV{db_host} || 'tfb-database',
- db_port => $ENV{db_port},
- db_user => $ENV{db_user} || 'benchmarkdbuser',
- db_pass => $ENV{db_pass} || 'benchmarkdbpass',
- empty => [], o => +{},
- reconnect_interval => 60,
- max_db_connections => 512,
- max_update_tries => 3
- };
- use constant max_batch_update_size => 1; # db eq 'postgres' ? 5 : 10; # rule of thumb
- use constant server => qw'Server Feersum';
- use constant {
- text => [server, qw'Content-Type text/plain'],
- json => [server, qw'Content-Type application/json'],
- html => [server, 'Content-Type', 'text/html; charset=utf-8'],
- nocontent => [server],
- };
- my @dsn = (
- (sprintf 'dbi:%s:port=%d;host=%s;database=%s;',
- (db eq 'mysql' ? ('mysql', db_port // 3306) :
- db eq 'maria' ? ('MariaDB', db_port // 3306) :
- db eq 'postgres' ? ('Pg', db_port // 5432)
- : die 'unknown db'), db_host, db_name),
- db_user, db_pass,
- +{qw'AutoCommit 1 RaiseError 0 PrintError 1',
- (db eq 'maria' ? (qw'mariadb_server_prepare 1 mariadb_ssl 0') :
- db eq 'mysql' ? (qw'mysql_server_prepare 1 mysql_ssl 0 mysql_get_server_pubkey 1') :
- db eq 'postgres' ? (qw'pg_server_prepare 1 sslmode 0') : ())}
- );
- chomp(my $cpus = `nproc`);
- say "$cpus cpus available" if debug;
- my $pool_size = int max_db_connections / $cpus; # number of db connections in each worker
- my $js = JSON::XS->new;
- my $html = render();
- cache('init');
- my %prepare = (
- world => ['select randomNumber, id from World where id = ?', SQL_INTEGER],
- fortune => ['select id, message from Fortune'],
- update1 => ['update World set randomNumber = ? where id = ?', (SQL_INTEGER) x 2],
- (map {
- 'update'.$_ =>
- [sprintf(
- (db eq 'mysql' || db eq 'maria') ? 'with t(v,i) as (values %s) update World w join t on t.i = w.id set w.randomNumber = t.v' :
- db eq 'postgres' ? 'with t(v,i) as (values %s) update World w set randomNumber = t.v from t where t.i = w.id' : undef,
- (join ',', ((db eq 'mysql' || db eq 'maria') ? 'row(?,?)' : '(?,?)') x $_)
- ), (SQL_INTEGER) x ($_ * 2)]
- } 2..max_batch_update_size)
- );
- my ($pool, $cache);
- my $w = EV::fork sub { # child init
- $pool = db_pool($pool_size, \@dsn, \%prepare); # db connection pool in each worker
- $cache = cache('use'); # cache
- };
- my %route = controllers();
- my $runner = Feersum::Runner->new(
- pre_fork => $cpus,
- quiet => !debug, keepalive => 1,
- max_connection_reqs => 1000,
- read_timeout => 60,
- listen => [host_port]
- )->run(sub ($h) { ($route{$h->path} // $route{404})->($h) });
- sub controllers {(
- '/plaintext', sub ($h) { $h->send_response(200, text, \'Hello, World!') },
- '/json', sub ($h) { $h->send_response(200, json, \$js->encode(+{ message => 'Hello, World!' })) },
- (map +('/db', $_, '/queries', $_, '/updates', $_ ), sub ($h) {
- my ($n) = (my $q = $h->query // '') =~ m/queries=(\d+)/a;
- $n = max(1, min($n//1, 500));
- my ($cv, @rs) = (AE::cv);
- my $on_done = sub { $h->send_response(200, json, \$js->encode($q ? \@rs : ($rs[0] // o))) };
- $cv->begin(
- $h->path ne '/updates'
- ? $on_done # select
- : sub { # update
- if (@rs) {
- my ($i, $j) = (0, 0);
- my $cv = AE::cv;
- $cv->begin($on_done);
- while () {
- $j = min($i + max_batch_update_size - 1, $#rs);
- say "$i $j" if debug;
- $cv->begin;
- $_->{randomNumber} = int(rand 10000) + 1 for @rs[$i..$j];
- my $tries = max_update_tries;
- my $st = 'update'.($j - $i + 1);
- my $args = [map @$_{qw/randomNumber id/}, @rs[$i..$j]];
- my $update = sub ($rv = undef, $sth_or_e = undef) {
- $cv->end, return if $rv;
- say 'retryin update on '.$sth_or_e if $tries < max_update_tries;
- say 'fail to update on '.max_update_tries.' tries ' and $cv->end unless $tries--;
- db_execute($pool, $st, $args, __SUB__);
- };
- $update->();
- $i += max_batch_update_size;
- last if $i >= @rs;
- }
- $cv->end;
- } else { $on_done->() }
- }
- );
- for (1..$n) {
- my $id = int(rand 10000) + 1;
- $cv->begin;
- db_execute($pool, world => [$id], sub ($rows, $sth) {
- push @rs, @{$sth->fetchall_arrayref(+{ randomNumber => 1, id => 1 })} if $rows > 0;
- $cv->end
- });
- }
- $cv->end
- }),
- '/fortunes' => sub ($h) {
- db_execute($pool, fortune => empty, sub ($rows, $sth) {
- $h->send_response(200, html, \$html->render('fortune.tx', +{ rows => [
- sort { $a->[1] cmp $b->[1] }
- @{$sth->fetchall_arrayref},
- [0, 'Additional fortune added at request time.']
- ]}))
- });
- },
- '/cached-queries' => sub ($h) {
- my ($n) = (my $q = $h->query // '') =~ m/count=(\d+)/a;
- $n = max(1, min($n//1, 500));
- my @rs = map +{ id => $_ , randomNumber => $cache->($_) }, map int(rand 10000) + 1, 1..$n;
- $h->send_response(200, json, \$js->encode(\@rs));
- },
- '/' => sub ($h) { $h->send_response(204, nocontent, empty) },
- 404 => sub ($h) { $h->send_response(404, nocontent, empty) }
- )}
- sub render {
- my $t = Text::Xslate->new(path => +{
- (my $file = 'fortune.tx') => <<~\html =~ s/(?<=[\r\n])\s+//sgr
- <!DOCTYPE html>
- <html>
- <head><title>Fortunes</title></head>
- <body>
- <table>
- <tr><th>id</th><th>message</th></tr>
- : for $rows -> $i {
- <tr><td><: $i.0 :></td><td><: $i.1 :></td></tr>
- : }
- </table>
- </body>
- </html>
- html
- });
- $t->load_file($file);
- $t
- }
- sub cache ($type = 'init') {
- my $path = '/dev/shm/feersum';
- say "clearing $path" and unlink glob "$path*" if $type eq 'init' && -e $path;
- my $env = LMDB::Env->new($path, +{
- mapsize => 1024*512,
- flags => MDB_WRITEMAP|MDB_NOSYNC|MDB_NOMETASYNC|MDB_NOTLS|MDB_NOSUBDIR|MDB_NORDAHEAD
- }) or die $LMDB_File::last_err;
- if ($type eq 'init') {
- die unless defined(my $tx = $env->BeginTxn);
- my $handle = $tx->open(undef, MDB_CREATE|MDB_INTEGERKEY);
- my $dbh = DBI->connect(@dsn);
- $tx->put($handle, $_->[0], pack S => $_->[1]) for @{$dbh->selectall_arrayref('select id, randomNumber from World')};
- $tx->commit;
- $dbh->disconnect;
- say 'cache populated' if debug;
- return;
- }
- my $tx = $env->BeginTxn(MDB_RDONLY);
- my $handle = $tx->open(undef, MDB_INTEGERKEY);
- sub ($k) { $tx->renew; $tx->get($handle, $k, my $v); unpack S => $v }
- }
- sub db_pool ($size, $dsn, $prepare = undef) {
- my %pool = (slot => [], active => +{}, free => [], pending => [], prepare => $prepare);
- db_connect(\%pool, $_, $dsn) for 0 .. $size - 1;
- \%pool
- }
- sub db_connect ($pool, $id, $dsn) {
- say "db[$id] connection.." if debug;
- my $dbh = DBI->connect(@$dsn);
- unless ($dbh) {
- warn sprintf 'err: %s. will try reconnect %d sec', $DBI::errstr, reconnect_interval;
- $pool->{slot}[$id] = AE::timer +reconnect_interval, 0, sub { db_connect($pool, $id, $dsn) }; # try later
- return
- }
- my $fd = db eq 'maria' ? $dbh->mariadb_sockfd : db eq 'mysql' ? $dbh->mysql_fd : db eq 'postgres' ? $dbh->{pg_socket} : undef;
- open my $fh, "<&=", $fd or die $!; # dup handle
- state $st_opt = +{
- db eq 'maria' ? (mariadb_async => 1) :
- db eq 'mysql' ? (async => 1) :
- db eq 'postgres' ? (pg_async => PG_ASYNC + PG_OLDQUERY_CANCEL) : ()
- };
- my %conn = (
- id => $id, db => $dbh, fd => $fd, fh => $fh, dsn => $dsn,
- st => +{ $pool->{prepare} ? (pairmap {
- my $sth = $dbh->prepare($b->[0], $st_opt);
- $sth->bind_param($_, undef, $b->[$_]) for 1..$#$b;
- ($a, $sth)
- } %{$pool->{prepare}}) : () },
- connected => 1,
- );
- $conn{w} = EV::io $fh, EV::READ, sub {
- my $e;
- { ;
- $e = 'inactive', last unless defined(my $st = $conn{active});
- if ($st) { # executed st
- $e = 'nost', last unless my $sth = $conn{st}{$st};
- $e = 'unready', last unless
- db eq 'maria' ? $sth->mariadb_async_ready :
- db eq 'mysql' ? $sth->mysql_async_ready :
- db eq 'postgres' ? $sth->pg_ready : undef;
- $e = 'noresult', $sth->finish unless defined(
- my $rows =
- db eq 'maria' ? $sth->mariadb_async_result :
- db eq 'mysql' ? $sth->mysql_async_result :
- db eq 'postgres' ? $sth->pg_result : undef
- );
- say "db[$id $fd] calling cb: ".$st if debug;
- if (my $cb = $conn{cb}) { $cb->($rows, $e // $sth) }
- else { say "db[$id $fd] no handler for response with $rows rows" }
- $sth->finish unless $e;
- } else { # db do
- $e = 'nodb', last unless my $dbh = $conn{db};
- $e = 'unready', last unless
- db eq 'maria' ? $dbh->mariadb_async_ready :
- db eq 'mysql' ? $dbh->mysql_async_ready :
- db eq 'postgres' ? $dbh->pg_ready : undef;
- $e = 'noresult' unless defined(
- my $rv =
- db eq 'maria' ? $dbh->mariadb_async_result :
- db eq 'mysql' ? $dbh->mysql_async_result :
- db eq 'postgres' ? $dbh->pg_result : undef
- );
- say "db[$id $fd] calling cb: db do query" if debug;
- if (my $cb = $conn{cb}) { $cb->($rv, $e) }
- else { say "db[$id $fd] no handler response with $rv return" }
- }
- say "db[$id $fd] error: $e " if debug && $e;
- say "db[$id $fd] finish" if debug;
- delete $conn{active};
- delete $pool->{active}{$id};
- push @{$pool->{free}}, \%conn;
- if (defined(my $pending = shift @{$pool->{pending}})) {
- my $code = shift @$pending;
- $code->($pool, splice @$pending)
- }
- return
- }
- say "db[$id $fd] $e" if debug;
- if (eof($fh) || (my $inactive = $e eq 'inactive')) {
- say "db[$id $fd] disconnected" if debug;
- delete @conn{qw/w connected/};
- $conn{db}->disconnect if $inactive;
- $conn{cb}->(-1, undef) if $conn{st} && $conn{active} && $conn{cb};
- db_connect($pool, $id, $dsn); # reconnect
- } else {
- say "db[$id $fd] stalled?";
- }
- };
- say "db[$id $fd] connected" if debug;
- $pool->{slot}[$id] = \%conn;
- weaken(my $weak = $pool->{slot}[$id]);
- push @{$pool->{free}}, $weak;
- if (defined(my $pending = shift @{$pool->{pending}})) {
- my $code = shift @$pending;
- $code->($pool, splice @$pending)
- }
- }
- sub db_execute ($pool, $st, $args, $cb) {
- say 'db executing..' if debug;
- while (my $conn = shift @{$pool->{free}}) {
- (debug and say 'skip unconnected'), next unless defined($conn) && $conn->{connected};
- say 'on connection..'.$conn->{id} if debug;
- if ($conn->{st}{$st}->execute(@$args)) {
- (@$conn{qw/cb active/}, $pool->{active}{$conn->{id}}) = ($cb, $st, 1);
- return;
- } else {
- say 'error: ', $conn->{st}{$st}->errstr;
- db_connect($pool, @$conn{qw/id dsn/}); # reconnect
- next;
- }
- }
- say '..put to pending..' if debug;
- push @{$pool->{pending}}, [__SUB__, $st, $args, $cb];
- }
- sub db_do ($pool, $query, $args, $cb) {
- say 'db doing..' if debug;
- state $db_opt = +{
- db eq 'maria' ? (mariadb_async => 1) :
- db eq 'mysql' ? (async => 1) :
- db eq 'postgres' ? (pg_async => PG_ASYNC + PG_OLDQUERY_CANCEL) : ()
- };
- while (my $conn = shift @{$pool->{free}}) {
- (debug and say 'skip unconnected'), next unless defined($conn) && $conn->{connected};
- say 'on connection..'.$conn->{id} if debug;
- if ($conn->{db}->do($query, $db_opt, defined($args) ? @$args : ())) {
- (@$conn{qw/cb active/}, $pool->{active}{$conn->{id}}) = ($cb, 0, 1);
- return;
- } else {
- say 'error: ', $conn->{db}->errstr;
- db_connect($pool, @$conn{qw/id dsn/}); # reconnect
- next;
- }
- }
- say '..put to pending..' if debug;
- push @{$pool->{pending}}, [__SUB__, $query, $args, $cb];
- }
|