server.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. //go:generate qtc -dir=src/templates
  2. package main
  3. import (
  4. "encoding/json"
  5. "flag"
  6. "fmt"
  7. "log"
  8. "math/rand"
  9. "net"
  10. "os"
  11. "os/exec"
  12. "runtime"
  13. "sort"
  14. "sync"
  15. "github.com/jackc/pgx"
  16. "github.com/valyala/fasthttp"
  17. "github.com/valyala/fasthttp/reuseport"
  18. "templates"
  19. )
  20. type JSONResponse struct {
  21. Message string `json:"message"`
  22. }
  23. type World struct {
  24. Id int32 `json:"id"`
  25. RandomNumber int32 `json:"randomNumber"`
  26. }
  27. const (
  28. worldRowCount = 10000
  29. maxConnectionCount = 40
  30. )
  31. var (
  32. worldSelectStmt *pgx.PreparedStatement
  33. worldUpdateStmt *pgx.PreparedStatement
  34. fortuneSelectStmt *pgx.PreparedStatement
  35. db *pgx.ConnPool
  36. )
  37. var (
  38. listenAddr = flag.String("listenAddr", ":8080", "Address to listen to")
  39. prefork = flag.Bool("prefork", false, "use prefork")
  40. child = flag.Bool("child", false, "is child proc")
  41. )
  42. func main() {
  43. flag.Parse()
  44. var err error
  45. // initialize the connection pool
  46. dbConns := maxConnectionCount
  47. if *prefork {
  48. dbConns = (maxConnectionCount + runtime.NumCPU() - 1) / runtime.NumCPU()
  49. }
  50. if db, err = initDatabase("localhost", "benchmarkdbuser", "benchmarkdbpass", "hello_world", 5432, dbConns); err != nil {
  51. log.Fatalf("Error opening database: %s", err)
  52. }
  53. s := &fasthttp.Server{
  54. Handler: mainHandler,
  55. Name: "go",
  56. }
  57. ln := getListener()
  58. if err = s.Serve(ln); err != nil {
  59. log.Fatalf("Error when serving incoming connections: %s", err)
  60. }
  61. }
  62. func mainHandler(ctx *fasthttp.RequestCtx) {
  63. path := ctx.Path()
  64. switch string(path) {
  65. case "/plaintext":
  66. plaintextHandler(ctx)
  67. case "/json":
  68. jsonHandler(ctx)
  69. case "/db":
  70. dbHandler(ctx)
  71. case "/queries":
  72. queriesHandler(ctx)
  73. case "/fortune":
  74. fortuneHandler(ctx)
  75. case "/update":
  76. updateHandler(ctx)
  77. default:
  78. ctx.Error("unexpected path", fasthttp.StatusBadRequest)
  79. }
  80. }
  81. // Test 1: JSON serialization
  82. func jsonHandler(ctx *fasthttp.RequestCtx) {
  83. r := jsonResponsePool.Get().(*JSONResponse)
  84. r.Message = "Hello, World!"
  85. jsonMarshal(ctx, r)
  86. jsonResponsePool.Put(r)
  87. }
  88. var jsonResponsePool = &sync.Pool{
  89. New: func() interface{} {
  90. return &JSONResponse{}
  91. },
  92. }
  93. // Test 2: Single database query
  94. func dbHandler(ctx *fasthttp.RequestCtx) {
  95. var w World
  96. fetchRandomWorld(&w)
  97. jsonMarshal(ctx, &w)
  98. }
  99. // Test 3: Multiple database queries
  100. func queriesHandler(ctx *fasthttp.RequestCtx) {
  101. n := getQueriesCount(ctx)
  102. worlds := make([]World, n)
  103. for i := 0; i < n; i++ {
  104. fetchRandomWorld(&worlds[i])
  105. }
  106. jsonMarshal(ctx, worlds)
  107. }
  108. // Test 4: Fortunes
  109. func fortuneHandler(ctx *fasthttp.RequestCtx) {
  110. rows, err := db.Query("fortuneSelectStmt")
  111. if err != nil {
  112. log.Fatalf("Error selecting db data: %v", err)
  113. }
  114. fortunes := make([]templates.Fortune, 0, 16)
  115. for rows.Next() {
  116. var f templates.Fortune
  117. if err := rows.Scan(&f.ID, &f.Message); err != nil {
  118. log.Fatalf("Error scanning fortune row: %s", err)
  119. }
  120. fortunes = append(fortunes, f)
  121. }
  122. rows.Close()
  123. fortunes = append(fortunes, templates.Fortune{Message: "Additional fortune added at request time."})
  124. sort.Sort(FortunesByMessage(fortunes))
  125. ctx.SetContentType("text/html; charset=utf-8")
  126. templates.WriteFortunePage(ctx, fortunes)
  127. }
  128. // Test 5: Database updates
  129. func updateHandler(ctx *fasthttp.RequestCtx) {
  130. n := getQueriesCount(ctx)
  131. worlds := make([]World, n)
  132. for i := 0; i < n; i++ {
  133. w := &worlds[i]
  134. fetchRandomWorld(w)
  135. w.RandomNumber = int32(randomWorldNum())
  136. }
  137. // sorting is required for insert deadlock prevention.
  138. sort.Sort(WorldsByID(worlds))
  139. txn, err := db.Begin()
  140. if err != nil {
  141. log.Fatalf("Error starting transaction: %s", err)
  142. }
  143. for i := 0; i < n; i++ {
  144. w := &worlds[i]
  145. if _, err = txn.Exec("worldUpdateStmt", w.RandomNumber, w.Id); err != nil {
  146. log.Fatalf("Error updating world row %d: %s", i, err)
  147. }
  148. }
  149. if err = txn.Commit(); err != nil {
  150. log.Fatalf("Error when commiting world rows: %s", err)
  151. }
  152. jsonMarshal(ctx, worlds)
  153. }
  154. // Test 6: Plaintext
  155. func plaintextHandler(ctx *fasthttp.RequestCtx) {
  156. ctx.SetContentType("text/plain")
  157. ctx.WriteString("Hello, World!")
  158. }
  159. func jsonMarshal(ctx *fasthttp.RequestCtx, v interface{}) {
  160. ctx.SetContentType("application/json")
  161. if err := json.NewEncoder(ctx).Encode(v); err != nil {
  162. log.Fatalf("error in json.Encoder.Encode: %s", err)
  163. }
  164. }
  165. func fetchRandomWorld(w *World) {
  166. n := randomWorldNum()
  167. if err := db.QueryRow("worldSelectStmt", n).Scan(&w.Id, &w.RandomNumber); err != nil {
  168. log.Fatalf("Error scanning world row: %s", err)
  169. }
  170. }
  171. func randomWorldNum() int {
  172. return rand.Intn(worldRowCount) + 1
  173. }
  174. func getQueriesCount(ctx *fasthttp.RequestCtx) int {
  175. n := ctx.QueryArgs().GetUintOrZero("queries")
  176. if n < 1 {
  177. n = 1
  178. } else if n > 500 {
  179. n = 500
  180. }
  181. return n
  182. }
  183. type FortunesByMessage []templates.Fortune
  184. func (s FortunesByMessage) Len() int { return len(s) }
  185. func (s FortunesByMessage) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  186. func (s FortunesByMessage) Less(i, j int) bool { return s[i].Message < s[j].Message }
  187. type WorldsByID []World
  188. func (w WorldsByID) Len() int { return len(w) }
  189. func (w WorldsByID) Swap(i, j int) { w[i], w[j] = w[j], w[i] }
  190. func (w WorldsByID) Less(i, j int) bool { return w[i].Id < w[j].Id }
  191. func mustPrepare(db *pgx.Conn, name, query string) *pgx.PreparedStatement {
  192. stmt, err := db.Prepare(name, query)
  193. if err != nil {
  194. log.Fatalf("Error when preparing statement %q: %s", query, err)
  195. }
  196. return stmt
  197. }
  198. func getListener() net.Listener {
  199. if !*prefork {
  200. runtime.GOMAXPROCS(runtime.NumCPU())
  201. ln, err := net.Listen("tcp4", *listenAddr)
  202. if err != nil {
  203. log.Fatal(err)
  204. }
  205. return ln
  206. }
  207. if !*child {
  208. children := make([]*exec.Cmd, runtime.NumCPU())
  209. for i := range children {
  210. children[i] = exec.Command(os.Args[0], "-prefork", "-child")
  211. children[i].Stdout = os.Stdout
  212. children[i].Stderr = os.Stderr
  213. if err := children[i].Start(); err != nil {
  214. log.Fatal(err)
  215. }
  216. }
  217. for _, ch := range children {
  218. if err := ch.Wait(); err != nil {
  219. log.Print(err)
  220. }
  221. }
  222. os.Exit(0)
  223. panic("unreachable")
  224. }
  225. runtime.GOMAXPROCS(1)
  226. ln, err := reuseport.Listen("tcp4", *listenAddr)
  227. if err != nil {
  228. log.Fatal(err)
  229. }
  230. return ln
  231. }
  232. func initDatabase(dbHost string, dbUser string, dbPass string, dbName string, dbPort uint16, maxConnectionsInPool int) (*pgx.ConnPool, error) {
  233. var successOrFailure string = "OK"
  234. var config pgx.ConnPoolConfig
  235. config.Host = dbHost
  236. config.User = dbUser
  237. config.Password = dbPass
  238. config.Database = dbName
  239. config.Port = dbPort
  240. config.MaxConnections = maxConnectionsInPool
  241. config.AfterConnect = func(conn *pgx.Conn) error {
  242. worldSelectStmt = mustPrepare(conn, "worldSelectStmt", "SELECT id, randomNumber FROM World WHERE id = $1")
  243. worldUpdateStmt = mustPrepare(conn, "worldUpdateStmt", "UPDATE World SET randomNumber = $1 WHERE id = $2")
  244. fortuneSelectStmt = mustPrepare(conn, "fortuneSelectStmt", "SELECT id, message FROM Fortune")
  245. return nil
  246. }
  247. fmt.Println("--------------------------------------------------------------------------------------------")
  248. connPool, err := pgx.NewConnPool(config)
  249. if err != nil {
  250. successOrFailure = "FAILED"
  251. log.Println("Connecting to database ", dbName, " as user ", dbUser, " ", successOrFailure, ": \n ", err)
  252. } else {
  253. log.Println("Connecting to database ", dbName, " as user ", dbUser, ": ", successOrFailure)
  254. log.Println("Fetching one record to test if db connection is valid...")
  255. var w World
  256. n := randomWorldNum()
  257. if errPing := connPool.QueryRow("worldSelectStmt", n).Scan(&w.Id, &w.RandomNumber); errPing != nil {
  258. log.Fatalf("Error scanning world row: %s", errPing)
  259. }
  260. log.Println("OK")
  261. }
  262. fmt.Println("--------------------------------------------------------------------------------------------")
  263. return connPool, err
  264. }