dashboard.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package dashboard
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "net/http"
  6. "time"
  7. log "github.com/Sirupsen/logrus"
  8. _ "github.com/flashmob/go-guerrilla/dashboard/statik"
  9. "github.com/gorilla/mux"
  10. "github.com/gorilla/websocket"
  11. "github.com/rakyll/statik/fs"
  12. "sync"
  13. )
  14. var (
  15. config *Config
  16. sessions map[string]*session
  17. stopRankingManager chan bool = make(chan bool)
  18. stopDataListener chan bool = make(chan bool)
  19. stopHttp chan bool = make(chan bool)
  20. wg sync.WaitGroup
  21. started sync.WaitGroup
  22. s state
  23. )
  24. type state int
  25. const (
  26. stateStopped state = iota
  27. stateRunning
  28. )
  29. var upgrader = websocket.Upgrader{
  30. ReadBufferSize: 1024,
  31. WriteBufferSize: 1024,
  32. // TODO below for testing w/ webpack only, change before merging
  33. CheckOrigin: func(r *http.Request) bool { return true },
  34. }
  35. type Config struct {
  36. Enabled bool `json:"is_enabled"`
  37. ListenInterface string `json:"listen_interface"`
  38. // Interval at which we send measure and send dataframe to frontend
  39. TickInterval string `json:"tick_interval"`
  40. // Maximum interval for which we store data
  41. MaxWindow string `json:"max_window"`
  42. // Granularity for which rankings are aggregated
  43. RankingUpdateInterval string `json:"ranking_aggregation_interval"`
  44. // Determines at which ratio of unique HELOs to unique connections we
  45. // will stop collecting data to prevent memory exhaustion attack.
  46. // Number between 0-1, set to >1 if you never want to stop collecting data.
  47. // Default is 0.8
  48. UniqueHeloRatioMax float64 `json:"unique_helo_ratio"`
  49. }
  50. // Begin collecting data and listening for dashboard clients
  51. func Run(c *Config) {
  52. statikFS, _ := fs.New()
  53. applyConfig(c)
  54. sessions = map[string]*session{}
  55. r := mux.NewRouter()
  56. r.HandleFunc("/ws", webSocketHandler)
  57. r.PathPrefix("/").Handler(http.FileServer(statikFS))
  58. rand.Seed(time.Now().UnixNano())
  59. started.Add(1)
  60. defer func() {
  61. s = stateStopped
  62. }()
  63. closer, err := ListenAndServeWithClose(c.ListenInterface, r)
  64. if err != nil {
  65. log.WithError(err).Error("Dashboard server failed to start")
  66. started.Done()
  67. return
  68. }
  69. log.Infof("started dashboard, listening on http [%s]", c.ListenInterface)
  70. wg.Add(1)
  71. go func() {
  72. wg.Add(1)
  73. dataListener(tickInterval)
  74. wg.Done()
  75. }()
  76. go func() {
  77. wg.Add(1)
  78. store.rankingManager()
  79. wg.Done()
  80. }()
  81. s = stateRunning
  82. started.Done()
  83. select {
  84. case <-stopHttp:
  85. closer.Close()
  86. wg.Done()
  87. return
  88. }
  89. }
  90. func Stop() {
  91. started.Wait()
  92. if s == stateRunning {
  93. stopDataListener <- true
  94. stopRankingManager <- true
  95. stopHttp <- true
  96. wg.Wait()
  97. }
  98. }
  99. // Parses options in config and applies to global variables
  100. func applyConfig(c *Config) {
  101. config = c
  102. if len(config.MaxWindow) > 0 {
  103. mw, err := time.ParseDuration(config.MaxWindow)
  104. if err == nil {
  105. maxWindow = mw
  106. }
  107. }
  108. if len(config.RankingUpdateInterval) > 0 {
  109. rui, err := time.ParseDuration(config.RankingUpdateInterval)
  110. if err == nil {
  111. rankingUpdateInterval = rui
  112. }
  113. }
  114. if len(config.TickInterval) > 0 {
  115. ti, err := time.ParseDuration(config.TickInterval)
  116. if err == nil {
  117. tickInterval = ti
  118. }
  119. }
  120. if config.UniqueHeloRatioMax > 0 {
  121. uniqueHeloRatioMax = config.UniqueHeloRatioMax
  122. }
  123. maxTicks = int(maxWindow * tickInterval)
  124. nRankingBuffers = int(maxWindow / rankingUpdateInterval)
  125. }
  126. func webSocketHandler(w http.ResponseWriter, r *http.Request) {
  127. var sess *session
  128. cookie, err := r.Cookie("SID")
  129. fmt.Println("cookie", cookie, err.Error())
  130. if err != nil {
  131. // Haven't set this cookie yet.
  132. sess = startSession(w, r)
  133. } else {
  134. var sidExists bool
  135. sess, sidExists = sessions[cookie.Value]
  136. if !sidExists {
  137. // No SID cookie in our store, start a new session
  138. sess = startSession(w, r)
  139. }
  140. }
  141. conn, err := upgrader.Upgrade(w, r, nil)
  142. if err != nil {
  143. w.WriteHeader(http.StatusInternalServerError)
  144. return
  145. }
  146. sess.ws = conn
  147. c := make(chan *message)
  148. sess.send = c
  149. store.subscribe(sess.id, c)
  150. go sess.receive()
  151. go sess.transmit()
  152. go store.initSession(sess)
  153. }
  154. func startSession(w http.ResponseWriter, r *http.Request) *session {
  155. sessionID := newSessionID()
  156. cookie := &http.Cookie{
  157. Name: "SID",
  158. Value: sessionID,
  159. Path: "/",
  160. // Secure: true, // TODO re-add this when TLS is set up
  161. }
  162. sess := &session{
  163. id: sessionID,
  164. }
  165. http.SetCookie(w, cookie)
  166. sessions[sessionID] = sess
  167. return sess
  168. }