hello.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package main
  2. import (
  3. "encoding/json"
  4. "flag"
  5. "fmt"
  6. "html/template"
  7. "log"
  8. "math/rand"
  9. "net"
  10. "os"
  11. "os/exec"
  12. "runtime"
  13. "sort"
  14. "time"
  15. "github.com/jackc/pgx"
  16. "github.com/valyala/fasthttp"
  17. "github.com/valyala/fasthttp/reuseport"
  18. )
  19. type Message struct {
  20. Message string `json:"message"`
  21. }
  22. type World struct {
  23. Id int32 `json:"id"`
  24. RandomNumber int32 `json:"randomNumber"`
  25. }
  26. type Fortune struct {
  27. Id int32 `json:"id"`
  28. Message string `json:"message"`
  29. }
  30. const (
  31. worldRowCount = 10000
  32. maxConnectionCount = 256
  33. )
  34. var (
  35. worldSelectStmt *pgx.PreparedStatement
  36. worldUpdateStmt *pgx.PreparedStatement
  37. fortuneSelectStmt *pgx.PreparedStatement
  38. )
  39. const helloWorldString = "Hello, World!"
  40. var (
  41. tmpl = template.Must(template.ParseFiles("templates/fortune.html"))
  42. db *pgx.ConnPool
  43. helloWorldBytes = []byte(helloWorldString)
  44. )
  45. var (
  46. listenAddr = flag.String("listenAddr", ":8080", "Address to listen to")
  47. prefork = flag.Bool("prefork", false, "use prefork")
  48. child = flag.Bool("child", false, "is child proc")
  49. )
  50. func main() {
  51. flag.Parse()
  52. var err error
  53. // initialize the connection pool
  54. dbConns := maxConnectionCount
  55. if *prefork {
  56. dbConns = (maxConnectionCount + runtime.NumCPU() - 1) / runtime.NumCPU()
  57. }
  58. if db, err = initDatabase("localhost", "benchmarkdbuser", "benchmarkdbpass", "hello_world", 5432, 2*dbConns); err != nil {
  59. log.Fatalf("Error opening database: %s", err)
  60. }
  61. s := &fasthttp.Server{
  62. Handler: mainHandler,
  63. Name: "fasthttp",
  64. }
  65. ln := getListener()
  66. if err = s.Serve(ln); err != nil {
  67. log.Fatalf("Error when serving incoming connections: %s", err)
  68. }
  69. }
  70. const maxConnDuration = time.Millisecond * 300
  71. func mainHandler(ctx *fasthttp.RequestCtx) {
  72. // Performance hack for prefork mode - periodically close keepalive
  73. // connections for evenly distributing connections among available
  74. // processes.
  75. if *prefork && time.Since(ctx.ConnTime()) > maxConnDuration {
  76. ctx.SetConnectionClose()
  77. }
  78. path := ctx.Path()
  79. switch string(path) {
  80. case "/plaintext":
  81. plaintextHandler(ctx)
  82. case "/json":
  83. jsonHandler(ctx)
  84. case "/db":
  85. dbHandler(ctx)
  86. case "/queries":
  87. queriesHandler(ctx)
  88. case "/fortune":
  89. fortuneHandler(ctx)
  90. case "/update":
  91. updateHandler(ctx)
  92. default:
  93. ctx.Error("unexpected path", fasthttp.StatusBadRequest)
  94. }
  95. }
  96. // Test 1: JSON serialization
  97. func jsonHandler(ctx *fasthttp.RequestCtx) {
  98. jsonMarshal(ctx, &Message{helloWorldString})
  99. }
  100. // Test 2: Single database query
  101. func dbHandler(ctx *fasthttp.RequestCtx) {
  102. var w World
  103. fetchRandomWorld(&w)
  104. jsonMarshal(ctx, &w)
  105. }
  106. // Test 3: Multiple database queries
  107. func queriesHandler(ctx *fasthttp.RequestCtx) {
  108. n := getQueriesCount(ctx)
  109. worlds := make([]World, n)
  110. for i := 0; i < n; i++ {
  111. fetchRandomWorld(&worlds[i])
  112. }
  113. jsonMarshal(ctx, worlds)
  114. }
  115. // Test 4: Fortunes
  116. func fortuneHandler(ctx *fasthttp.RequestCtx) {
  117. rows, err := db.Query("fortuneSelectStmt")
  118. if err != nil {
  119. log.Fatalf("Error selecting db data: %v", err)
  120. }
  121. fortunes := make([]Fortune, 0, 16)
  122. for rows.Next() {
  123. var f Fortune
  124. if err := rows.Scan(&f.Id, &f.Message); err != nil {
  125. log.Fatalf("Error scanning fortune row: %s", err)
  126. }
  127. fortunes = append(fortunes, f)
  128. }
  129. rows.Close()
  130. fortunes = append(fortunes, Fortune{Message: "Additional fortune added at request time."})
  131. sort.Sort(FortunesByMessage(fortunes))
  132. ctx.SetContentType("text/html; charset=utf-8")
  133. if err := tmpl.Execute(ctx, fortunes); err != nil {
  134. log.Fatalf("Error executing fortune: %s", err)
  135. }
  136. }
  137. // Test 5: Database updates
  138. func updateHandler(ctx *fasthttp.RequestCtx) {
  139. n := getQueriesCount(ctx)
  140. worlds := make([]World, n)
  141. for i := 0; i < n; i++ {
  142. w := &worlds[i]
  143. fetchRandomWorld(w)
  144. w.RandomNumber = int32(randomWorldNum())
  145. }
  146. // sorting is required for insert deadlock prevention.
  147. sort.Sort(WorldsByID(worlds))
  148. txn, err := db.Begin()
  149. if err != nil {
  150. log.Fatalf("Error starting transaction: %s", err)
  151. }
  152. // Disable synchronous commit for the current transaction
  153. // as a performance optimization.
  154. // See http://www.postgresql.org/docs/current/static/runtime-config-wal.html for details.
  155. // Below is the relevant quote from the docs:
  156. //
  157. // > It is therefore possible, and useful, to have some transactions
  158. // > commit synchronously and others asynchronously. For example,
  159. // > to make a single multistatement transaction commit asynchronously
  160. // > when the default is the opposite, issue
  161. // > SET LOCAL synchronous_commit TO OFF
  162. // > within the transaction.
  163. if _, err = txn.Exec("SET LOCAL synchronous_commit TO OFF"); err != nil {
  164. log.Fatalf("Error when disabling synchronous commit")
  165. }
  166. for i := 0; i < n; i++ {
  167. w := &worlds[i]
  168. if _, err = txn.Exec("worldUpdateStmt", w.RandomNumber, w.Id); err != nil {
  169. log.Fatalf("Error updating world row %d: %s", i, err)
  170. }
  171. }
  172. if err = txn.Commit(); err != nil {
  173. log.Fatalf("Error when commiting world rows: %s", err)
  174. }
  175. jsonMarshal(ctx, worlds)
  176. }
  177. // Test 6: Plaintext
  178. func plaintextHandler(ctx *fasthttp.RequestCtx) {
  179. ctx.Write(helloWorldBytes)
  180. }
  181. func jsonMarshal(ctx *fasthttp.RequestCtx, v interface{}) {
  182. ctx.SetContentType("application/json")
  183. if err := json.NewEncoder(ctx).Encode(v); err != nil {
  184. log.Fatalf("error in json.Encoder.Encode: %s", err)
  185. }
  186. }
  187. func fetchRandomWorld(w *World) {
  188. n := randomWorldNum()
  189. if err := db.QueryRow("worldSelectStmt", n).Scan(&w.Id, &w.RandomNumber); err != nil {
  190. log.Fatalf("Error scanning world row: %s", err)
  191. }
  192. }
  193. func randomWorldNum() int {
  194. return rand.Intn(worldRowCount) + 1
  195. }
  196. func getQueriesCount(ctx *fasthttp.RequestCtx) int {
  197. n := ctx.QueryArgs().GetUintOrZero("queries")
  198. if n < 1 {
  199. n = 1
  200. } else if n > 500 {
  201. n = 500
  202. }
  203. return n
  204. }
  205. type FortunesByMessage []Fortune
  206. func (s FortunesByMessage) Len() int { return len(s) }
  207. func (s FortunesByMessage) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  208. func (s FortunesByMessage) Less(i, j int) bool { return s[i].Message < s[j].Message }
  209. type WorldsByID []World
  210. func (w WorldsByID) Len() int { return len(w) }
  211. func (w WorldsByID) Swap(i, j int) { w[i], w[j] = w[j], w[i] }
  212. func (w WorldsByID) Less(i, j int) bool { return w[i].Id < w[j].Id }
  213. func mustPrepare(db *pgx.Conn, name, query string) *pgx.PreparedStatement {
  214. stmt, err := db.Prepare(name, query)
  215. if err != nil {
  216. log.Fatalf("Error when preparing statement %q: %s", query, err)
  217. }
  218. return stmt
  219. }
  220. func getListener() net.Listener {
  221. if !*prefork {
  222. runtime.GOMAXPROCS(runtime.NumCPU())
  223. ln, err := net.Listen("tcp4", *listenAddr)
  224. if err != nil {
  225. log.Fatal(err)
  226. }
  227. return ln
  228. }
  229. if !*child {
  230. children := make([]*exec.Cmd, runtime.NumCPU())
  231. for i := range children {
  232. children[i] = exec.Command(os.Args[0], "-prefork", "-child")
  233. children[i].Stdout = os.Stdout
  234. children[i].Stderr = os.Stderr
  235. if err := children[i].Start(); err != nil {
  236. log.Fatal(err)
  237. }
  238. }
  239. for _, ch := range children {
  240. if err := ch.Wait(); err != nil {
  241. log.Print(err)
  242. }
  243. }
  244. os.Exit(0)
  245. panic("unreachable")
  246. }
  247. runtime.GOMAXPROCS(1)
  248. ln, err := reuseport.Listen("tcp4", *listenAddr)
  249. if err != nil {
  250. log.Fatal(err)
  251. }
  252. return ln
  253. }
  254. func initDatabase(dbHost string, dbUser string, dbPass string, dbName string, dbPort uint16, maxConnectionsInPool int) (*pgx.ConnPool, error) {
  255. var successOrFailure string = "OK"
  256. var config pgx.ConnPoolConfig
  257. config.Host = dbHost
  258. config.User = dbUser
  259. config.Password = dbPass
  260. config.Database = dbName
  261. config.Port = dbPort
  262. config.MaxConnections = maxConnectionsInPool
  263. config.AfterConnect = func(eachConn *pgx.Conn) error {
  264. worldSelectStmt = mustPrepare(eachConn, "worldSelectStmt", "SELECT id, randomNumber FROM World WHERE id = $1")
  265. worldUpdateStmt = mustPrepare(eachConn, "worldUpdateStmt", "UPDATE World SET randomNumber = $1 WHERE id = $2")
  266. fortuneSelectStmt = mustPrepare(eachConn, "fortuneSelectStmt", "SELECT id, message FROM Fortune")
  267. return nil
  268. }
  269. fmt.Println("--------------------------------------------------------------------------------------------")
  270. connPool, err := pgx.NewConnPool(config)
  271. if err != nil {
  272. successOrFailure = "FAILED"
  273. log.Println("Connecting to database ", dbName, " as user ", dbUser, " ", successOrFailure, ": \n ", err)
  274. } else {
  275. log.Println("Connecting to database ", dbName, " as user ", dbUser, ": ", successOrFailure)
  276. log.Println("Fetching one record to test if db connection is valid...")
  277. var w World
  278. n := randomWorldNum()
  279. if errPing := connPool.QueryRow("worldSelectStmt", n).Scan(&w.Id, &w.RandomNumber); errPing != nil {
  280. log.Fatalf("Error scanning world row: %s", errPing)
  281. }
  282. log.Println("OK")
  283. }
  284. fmt.Println("--------------------------------------------------------------------------------------------")
  285. return connPool, err
  286. }