hello.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. package main
  2. import (
  3. "encoding/json"
  4. "flag"
  5. "fmt"
  6. "html/template"
  7. "log"
  8. "math/rand"
  9. "net"
  10. "os"
  11. "os/exec"
  12. "runtime"
  13. "sort"
  14. "sync/atomic"
  15. "time"
  16. "github.com/jackc/pgx"
  17. "github.com/valyala/fasthttp"
  18. "github.com/valyala/fasthttp/reuseport"
  19. )
  20. type Message struct {
  21. Message string `json:"message"`
  22. }
  23. type World struct {
  24. Id int32 `json:"id"`
  25. RandomNumber int32 `json:"randomNumber"`
  26. }
  27. type Fortune struct {
  28. Id int32 `json:"id"`
  29. Message string `json:"message"`
  30. }
  31. const (
  32. worldRowCount = 10000
  33. maxConnectionCount = 256
  34. )
  35. var (
  36. worldSelectStmt *pgx.PreparedStatement
  37. worldUpdateStmt *pgx.PreparedStatement
  38. fortuneSelectStmt *pgx.PreparedStatement
  39. )
  40. const helloWorldString = "Hello, World!"
  41. var (
  42. tmpl = template.Must(template.ParseFiles("templates/fortune.html"))
  43. db *pgx.ConnPool
  44. helloWorldBytes = []byte(helloWorldString)
  45. )
  46. var (
  47. listenAddr = flag.String("listenAddr", ":8080", "Address to listen to")
  48. prefork = flag.Bool("prefork", false, "use prefork")
  49. child = flag.Bool("child", false, "is child proc")
  50. )
  51. func main() {
  52. flag.Parse()
  53. var err error
  54. // initialize the connection pool
  55. dbConns := maxConnectionCount
  56. if *prefork {
  57. dbConns = (maxConnectionCount + runtime.NumCPU() - 1) / runtime.NumCPU()
  58. }
  59. if db, err = initDatabase("localhost", "benchmarkdbuser", "benchmarkdbpass", "hello_world", 5432, dbConns); err != nil {
  60. log.Fatalf("Error opening database: %s", err)
  61. }
  62. s := &fasthttp.Server{
  63. Handler: mainHandler,
  64. Name: "fasthttp",
  65. }
  66. ln := getListener()
  67. if err = s.Serve(ln); err != nil {
  68. log.Fatalf("Error when serving incoming connections: %s", err)
  69. }
  70. }
  71. const maxConnDuration = time.Millisecond * 200
  72. var connDurationJitter uint64
  73. func mainHandler(ctx *fasthttp.RequestCtx) {
  74. // Performance hack for prefork mode - periodically close keepalive
  75. // connections for evenly distributing connections among available
  76. // processes.
  77. if *prefork {
  78. maxDuration := maxConnDuration + time.Millisecond*time.Duration(atomic.LoadUint64(&connDurationJitter))
  79. if time.Since(ctx.ConnTime()) > maxDuration {
  80. atomic.StoreUint64(&connDurationJitter, uint64(rand.Intn(100)))
  81. ctx.SetConnectionClose()
  82. }
  83. }
  84. path := ctx.Path()
  85. switch string(path) {
  86. case "/plaintext":
  87. plaintextHandler(ctx)
  88. case "/json":
  89. jsonHandler(ctx)
  90. case "/db":
  91. dbHandler(ctx)
  92. case "/queries":
  93. queriesHandler(ctx)
  94. case "/fortune":
  95. fortuneHandler(ctx)
  96. case "/update":
  97. updateHandler(ctx)
  98. default:
  99. ctx.Error("unexpected path", fasthttp.StatusBadRequest)
  100. }
  101. }
  102. // Test 1: JSON serialization
  103. func jsonHandler(ctx *fasthttp.RequestCtx) {
  104. jsonMarshal(ctx, &Message{helloWorldString})
  105. }
  106. // Test 2: Single database query
  107. func dbHandler(ctx *fasthttp.RequestCtx) {
  108. var w World
  109. fetchRandomWorld(&w)
  110. jsonMarshal(ctx, &w)
  111. }
  112. // Test 3: Multiple database queries
  113. func queriesHandler(ctx *fasthttp.RequestCtx) {
  114. n := getQueriesCount(ctx)
  115. worlds := make([]World, n)
  116. for i := 0; i < n; i++ {
  117. fetchRandomWorld(&worlds[i])
  118. }
  119. jsonMarshal(ctx, worlds)
  120. }
  121. // Test 4: Fortunes
  122. func fortuneHandler(ctx *fasthttp.RequestCtx) {
  123. rows, err := db.Query("fortuneSelectStmt")
  124. if err != nil {
  125. log.Fatalf("Error selecting db data: %v", err)
  126. }
  127. fortunes := make([]Fortune, 0, 16)
  128. for rows.Next() {
  129. var f Fortune
  130. if err := rows.Scan(&f.Id, &f.Message); err != nil {
  131. log.Fatalf("Error scanning fortune row: %s", err)
  132. }
  133. fortunes = append(fortunes, f)
  134. }
  135. rows.Close()
  136. fortunes = append(fortunes, Fortune{Message: "Additional fortune added at request time."})
  137. sort.Sort(FortunesByMessage(fortunes))
  138. ctx.SetContentType("text/html; charset=utf-8")
  139. if err := tmpl.Execute(ctx, fortunes); err != nil {
  140. log.Fatalf("Error executing fortune: %s", err)
  141. }
  142. }
  143. // Test 5: Database updates
  144. func updateHandler(ctx *fasthttp.RequestCtx) {
  145. n := getQueriesCount(ctx)
  146. worlds := make([]World, n)
  147. for i := 0; i < n; i++ {
  148. w := &worlds[i]
  149. fetchRandomWorld(w)
  150. w.RandomNumber = int32(randomWorldNum())
  151. }
  152. // sorting is required for insert deadlock prevention.
  153. sort.Sort(WorldsByID(worlds))
  154. txn, err := db.Begin()
  155. if err != nil {
  156. log.Fatalf("Error starting transaction: %s", err)
  157. }
  158. // Disable synchronous commit for the current transaction
  159. // as a performance optimization.
  160. // See http://www.postgresql.org/docs/current/static/runtime-config-wal.html for details.
  161. // Below is the relevant quote from the docs:
  162. //
  163. // > It is therefore possible, and useful, to have some transactions
  164. // > commit synchronously and others asynchronously. For example,
  165. // > to make a single multistatement transaction commit asynchronously
  166. // > when the default is the opposite, issue
  167. // > SET LOCAL synchronous_commit TO OFF
  168. // > within the transaction.
  169. if _, err = txn.Exec("SET LOCAL synchronous_commit TO OFF"); err != nil {
  170. log.Fatalf("Error when disabling synchronous commit")
  171. }
  172. for i := 0; i < n; i++ {
  173. w := &worlds[i]
  174. if _, err = txn.Exec("worldUpdateStmt", w.RandomNumber, w.Id); err != nil {
  175. log.Fatalf("Error updating world row %d: %s", i, err)
  176. }
  177. }
  178. if err = txn.Commit(); err != nil {
  179. log.Fatalf("Error when commiting world rows: %s", err)
  180. }
  181. jsonMarshal(ctx, worlds)
  182. }
  183. // Test 6: Plaintext
  184. func plaintextHandler(ctx *fasthttp.RequestCtx) {
  185. ctx.Write(helloWorldBytes)
  186. }
  187. func jsonMarshal(ctx *fasthttp.RequestCtx, v interface{}) {
  188. ctx.SetContentType("application/json")
  189. if err := json.NewEncoder(ctx).Encode(v); err != nil {
  190. log.Fatalf("error in json.Encoder.Encode: %s", err)
  191. }
  192. }
  193. func fetchRandomWorld(w *World) {
  194. n := randomWorldNum()
  195. if err := db.QueryRow("worldSelectStmt", n).Scan(&w.Id, &w.RandomNumber); err != nil {
  196. log.Fatalf("Error scanning world row: %s", err)
  197. }
  198. }
  199. func randomWorldNum() int {
  200. return rand.Intn(worldRowCount) + 1
  201. }
  202. func getQueriesCount(ctx *fasthttp.RequestCtx) int {
  203. n := ctx.QueryArgs().GetUintOrZero("queries")
  204. if n < 1 {
  205. n = 1
  206. } else if n > 500 {
  207. n = 500
  208. }
  209. return n
  210. }
  211. type FortunesByMessage []Fortune
  212. func (s FortunesByMessage) Len() int { return len(s) }
  213. func (s FortunesByMessage) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  214. func (s FortunesByMessage) Less(i, j int) bool { return s[i].Message < s[j].Message }
  215. type WorldsByID []World
  216. func (w WorldsByID) Len() int { return len(w) }
  217. func (w WorldsByID) Swap(i, j int) { w[i], w[j] = w[j], w[i] }
  218. func (w WorldsByID) Less(i, j int) bool { return w[i].Id < w[j].Id }
  219. func mustPrepare(db *pgx.Conn, name, query string) *pgx.PreparedStatement {
  220. stmt, err := db.Prepare(name, query)
  221. if err != nil {
  222. log.Fatalf("Error when preparing statement %q: %s", query, err)
  223. }
  224. return stmt
  225. }
  226. func getListener() net.Listener {
  227. if !*prefork {
  228. runtime.GOMAXPROCS(runtime.NumCPU())
  229. ln, err := net.Listen("tcp4", *listenAddr)
  230. if err != nil {
  231. log.Fatal(err)
  232. }
  233. return ln
  234. }
  235. if !*child {
  236. children := make([]*exec.Cmd, runtime.NumCPU())
  237. for i := range children {
  238. children[i] = exec.Command(os.Args[0], "-prefork", "-child")
  239. children[i].Stdout = os.Stdout
  240. children[i].Stderr = os.Stderr
  241. if err := children[i].Start(); err != nil {
  242. log.Fatal(err)
  243. }
  244. }
  245. for _, ch := range children {
  246. if err := ch.Wait(); err != nil {
  247. log.Print(err)
  248. }
  249. }
  250. os.Exit(0)
  251. panic("unreachable")
  252. }
  253. runtime.GOMAXPROCS(1)
  254. ln, err := reuseport.Listen("tcp4", *listenAddr)
  255. if err != nil {
  256. log.Fatal(err)
  257. }
  258. return ln
  259. }
  260. func initDatabase(dbHost string, dbUser string, dbPass string, dbName string, dbPort uint16, maxConnectionsInPool int) (*pgx.ConnPool, error) {
  261. var successOrFailure string = "OK"
  262. var config pgx.ConnPoolConfig
  263. config.Host = dbHost
  264. config.User = dbUser
  265. config.Password = dbPass
  266. config.Database = dbName
  267. config.Port = dbPort
  268. config.MaxConnections = maxConnectionsInPool
  269. config.AfterConnect = func(eachConn *pgx.Conn) error {
  270. worldSelectStmt = mustPrepare(eachConn, "worldSelectStmt", "SELECT id, randomNumber FROM World WHERE id = $1")
  271. worldUpdateStmt = mustPrepare(eachConn, "worldUpdateStmt", "UPDATE World SET randomNumber = $1 WHERE id = $2")
  272. fortuneSelectStmt = mustPrepare(eachConn, "fortuneSelectStmt", "SELECT id, message FROM Fortune")
  273. return nil
  274. }
  275. fmt.Println("--------------------------------------------------------------------------------------------")
  276. connPool, err := pgx.NewConnPool(config)
  277. if err != nil {
  278. successOrFailure = "FAILED"
  279. log.Println("Connecting to database ", dbName, " as user ", dbUser, " ", successOrFailure, ": \n ", err)
  280. } else {
  281. log.Println("Connecting to database ", dbName, " as user ", dbUser, ": ", successOrFailure)
  282. log.Println("Fetching one record to test if db connection is valid...")
  283. var w World
  284. n := randomWorldNum()
  285. if errPing := connPool.QueryRow("worldSelectStmt", n).Scan(&w.Id, &w.RandomNumber); errPing != nil {
  286. log.Fatalf("Error scanning world row: %s", errPing)
  287. }
  288. log.Println("OK")
  289. }
  290. fmt.Println("--------------------------------------------------------------------------------------------")
  291. return connPool, err
  292. }