server.go 6.4 KB


  1. //go:generate qtc -dir=src/templates
  2. package main
  3. import (
  4. "database/sql"
  5. "encoding/json"
  6. "flag"
  7. "log"
  8. "math/rand"
  9. "net"
  10. "os"
  11. "os/exec"
  12. "runtime"
  13. "sort"
  14. "sync"
  15. _ "github.com/go-sql-driver/mysql"
  16. "github.com/valyala/fasthttp"
  17. "github.com/valyala/fasthttp/reuseport"
  18. "templates"
  19. )
  20. type JSONResponse struct {
  21. Message string `json:"message"`
  22. }
  23. type World struct {
  24. Id uint16 `json:"id"`
  25. RandomNumber uint16 `json:"randomNumber"`
  26. }
  27. const (
  28. connectionString = "benchmarkdbuser:benchmarkdbpass@tcp(localhost:3306)/hello_world"
  29. worldRowCount = 10000
  30. maxConnectionCount = 40
  31. )
  32. var (
  33. worldSelectStmt *sql.Stmt
  34. worldUpdateStmt *sql.Stmt
  35. fortuneSelectStmt *sql.Stmt
  36. )
  37. const helloWorldString = "Hello, World!"
  38. var (
  39. db *sql.DB
  40. helloWorldBytes = []byte(helloWorldString)
  41. )
  42. var (
  43. listenAddr = flag.String("listenAddr", ":8080", "Address to listen to")
  44. prefork = flag.Bool("prefork", false, "use prefork")
  45. child = flag.Bool("child", false, "is child proc")
  46. )
  47. func main() {
  48. flag.Parse()
  49. var err error
  50. if db, err = sql.Open("mysql", connectionString); err != nil {
  51. log.Fatalf("Error opening database: %s", err)
  52. }
  53. if err = db.Ping(); err != nil {
  54. log.Fatalf("Cannot connect to db: %s", err)
  55. }
  56. dbConnCount := maxConnectionCount
  57. if *prefork {
  58. dbConnCount = (dbConnCount + runtime.NumCPU() - 1) / runtime.NumCPU()
  59. }
  60. db.SetMaxIdleConns(dbConnCount)
  61. db.SetMaxOpenConns(dbConnCount)
  62. worldSelectStmt = mustPrepare(db, "SELECT id, randomNumber FROM World WHERE id = ?")
  63. worldUpdateStmt = mustPrepare(db, "UPDATE World SET randomNumber = ? WHERE id = ?")
  64. fortuneSelectStmt = mustPrepare(db, "SELECT id, message FROM Fortune")
  65. s := &fasthttp.Server{
  66. Handler: mainHandler,
  67. Name: "fasthttp",
  68. }
  69. ln := getListener()
  70. if err = s.Serve(ln); err != nil {
  71. log.Fatalf("Error when serving incoming connections: %s", err)
  72. }
  73. }
  74. func mainHandler(ctx *fasthttp.RequestCtx) {
  75. path := ctx.Path()
  76. switch string(path) {
  77. case "/plaintext":
  78. plaintextHandler(ctx)
  79. case "/json":
  80. jsonHandler(ctx)
  81. case "/db":
  82. dbHandler(ctx)
  83. case "/queries":
  84. queriesHandler(ctx)
  85. case "/fortune":
  86. fortuneHandler(ctx)
  87. case "/update":
  88. updateHandler(ctx)
  89. default:
  90. ctx.Error("unexpected path", fasthttp.StatusBadRequest)
  91. }
  92. }
  93. // Test 1: JSON serialization
  94. func jsonHandler(ctx *fasthttp.RequestCtx) {
  95. r := jsonResponsePool.Get().(*JSONResponse)
  96. r.Message = helloWorldString
  97. jsonMarshal(ctx, r)
  98. jsonResponsePool.Put(r)
  99. }
  100. var jsonResponsePool = &sync.Pool{
  101. New: func() interface{} {
  102. return &JSONResponse{}
  103. },
  104. }
  105. // Test 2: Single database query
  106. func dbHandler(ctx *fasthttp.RequestCtx) {
  107. var w World
  108. fetchRandomWorld(&w)
  109. jsonMarshal(ctx, &w)
  110. }
  111. // Test 3: Multiple database queries
  112. func queriesHandler(ctx *fasthttp.RequestCtx) {
  113. n := getQueriesCount(ctx)
  114. worlds := make([]World, n)
  115. for i := 0; i < n; i++ {
  116. fetchRandomWorld(&worlds[i])
  117. }
  118. jsonMarshal(ctx, worlds)
  119. }
  120. // Test 4: Fortunes
  121. func fortuneHandler(ctx *fasthttp.RequestCtx) {
  122. rows, err := fortuneSelectStmt.Query()
  123. if err != nil {
  124. log.Fatalf("Error selecting db data: %v", err)
  125. }
  126. fortunes := make([]templates.Fortune, 0, 16)
  127. for rows.Next() {
  128. var f templates.Fortune
  129. if err := rows.Scan(&f.ID, &f.Message); err != nil {
  130. log.Fatalf("Error scanning fortune row: %s", err)
  131. }
  132. fortunes = append(fortunes, f)
  133. }
  134. rows.Close()
  135. fortunes = append(fortunes, templates.Fortune{Message: "Additional fortune added at request time."})
  136. sort.Sort(FortunesByMessage(fortunes))
  137. ctx.SetContentType("text/html; charset=utf-8")
  138. templates.WriteFortunePage(ctx, fortunes)
  139. }
  140. // Test 5: Database updates
  141. func updateHandler(ctx *fasthttp.RequestCtx) {
  142. n := getQueriesCount(ctx)
  143. worlds := make([]World, n)
  144. for i := 0; i < n; i++ {
  145. w := &worlds[i]
  146. fetchRandomWorld(w)
  147. w.RandomNumber = uint16(randomWorldNum())
  148. }
  149. // sorting is required for insert deadlock prevention.
  150. sort.Sort(WorldsByID(worlds))
  151. txn, err := db.Begin()
  152. if err != nil {
  153. log.Fatalf("Error starting transaction: %s", err)
  154. }
  155. stmt := txn.Stmt(worldUpdateStmt)
  156. for i := 0; i < n; i++ {
  157. w := &worlds[i]
  158. if _, err := stmt.Exec(w.RandomNumber, w.Id); err != nil {
  159. log.Fatalf("Error updating world row %d: %s", i, err)
  160. }
  161. }
  162. if err = txn.Commit(); err != nil {
  163. log.Fatalf("Error when commiting world rows: %s", err)
  164. }
  165. jsonMarshal(ctx, worlds)
  166. }
  167. // Test 6: Plaintext
  168. func plaintextHandler(ctx *fasthttp.RequestCtx) {
  169. ctx.Write(helloWorldBytes)
  170. }
  171. func jsonMarshal(ctx *fasthttp.RequestCtx, v interface{}) {
  172. ctx.SetContentType("application/json")
  173. if err := json.NewEncoder(ctx).Encode(v); err != nil {
  174. log.Fatalf("error in json.Encoder.Encode: %s", err)
  175. }
  176. }
  177. func fetchRandomWorld(w *World) {
  178. n := randomWorldNum()
  179. if err := worldSelectStmt.QueryRow(n).Scan(&w.Id, &w.RandomNumber); err != nil {
  180. log.Fatalf("Error scanning world row: %s", err)
  181. }
  182. }
  183. func randomWorldNum() int {
  184. return rand.Intn(worldRowCount) + 1
  185. }
  186. func getQueriesCount(ctx *fasthttp.RequestCtx) int {
  187. n := ctx.QueryArgs().GetUintOrZero("queries")
  188. if n < 1 {
  189. n = 1
  190. } else if n > 500 {
  191. n = 500
  192. }
  193. return n
  194. }
  195. type FortunesByMessage []templates.Fortune
  196. func (s FortunesByMessage) Len() int { return len(s) }
  197. func (s FortunesByMessage) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  198. func (s FortunesByMessage) Less(i, j int) bool { return s[i].Message < s[j].Message }
  199. type WorldsByID []World
  200. func (w WorldsByID) Len() int { return len(w) }
  201. func (w WorldsByID) Swap(i, j int) { w[i], w[j] = w[j], w[i] }
  202. func (w WorldsByID) Less(i, j int) bool { return w[i].Id < w[j].Id }
  203. func mustPrepare(db *sql.DB, query string) *sql.Stmt {
  204. stmt, err := db.Prepare(query)
  205. if err != nil {
  206. log.Fatalf("Error when preparing statement %q: %s", query, err)
  207. }
  208. return stmt
  209. }
  210. func getListener() net.Listener {
  211. if !*prefork {
  212. runtime.GOMAXPROCS(runtime.NumCPU())
  213. ln, err := net.Listen("tcp4", *listenAddr)
  214. if err != nil {
  215. log.Fatal(err)
  216. }
  217. return ln
  218. }
  219. if !*child {
  220. children := make([]*exec.Cmd, runtime.NumCPU())
  221. for i := range children {
  222. children[i] = exec.Command(os.Args[0], "-prefork", "-child")
  223. children[i].Stdout = os.Stdout
  224. children[i].Stderr = os.Stderr
  225. if err := children[i].Start(); err != nil {
  226. log.Fatal(err)
  227. }
  228. }
  229. for _, ch := range children {
  230. if err := ch.Wait(); err != nil {
  231. log.Print(err)
  232. }
  233. }
  234. os.Exit(0)
  235. panic("unreachable")
  236. }
  237. runtime.GOMAXPROCS(1)
  238. ln, err := reuseport.Listen("tcp4", *listenAddr)
  239. if err != nil {
  240. log.Fatal(err)
  241. }
  242. return ln
  243. }