dashboard.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package dashboard
  2. import (
  3. "fmt"
  4. "math/rand"
  5. "net/http"
  6. "time"
  7. "sync"
  8. _ "github.com/flashmob/go-guerrilla/dashboard/statik"
  9. "github.com/flashmob/go-guerrilla/log"
  10. "github.com/gorilla/mux"
  11. "github.com/gorilla/websocket"
  12. "github.com/rakyll/statik/fs"
  13. "sync/atomic"
  14. )
  15. var (
  16. config *Config
  17. sessions map[string]*session
  18. stopRankingManager chan bool = make(chan bool)
  19. stopDataListener chan bool = make(chan bool)
  20. stopHttp chan bool = make(chan bool)
  21. wg sync.WaitGroup
  22. started sync.WaitGroup
  23. s state
  24. mainlogStore atomic.Value
  25. )
  26. type state int
  27. const (
  28. stateStopped state = iota
  29. stateRunning
  30. )
  31. var upgrader = websocket.Upgrader{
  32. ReadBufferSize: 1024,
  33. WriteBufferSize: 1024,
  34. // TODO below for testing w/ webpack only, change before merging
  35. CheckOrigin: func(r *http.Request) bool { return true },
  36. }
  37. type Config struct {
  38. Enabled bool `json:"is_enabled"`
  39. ListenInterface string `json:"listen_interface"`
  40. // Interval at which we send measure and send dataframe to frontend
  41. TickInterval string `json:"tick_interval"`
  42. // Maximum interval for which we store data
  43. MaxWindow string `json:"max_window"`
  44. // Granularity for which rankings are aggregated
  45. RankingUpdateInterval string `json:"ranking_aggregation_interval"`
  46. // Determines at which ratio of unique HELOs to unique connections we
  47. // will stop collecting data to prevent memory exhaustion attack.
  48. // Number between 0-1, set to >1 if you never want to stop collecting data.
  49. // Default is 0.8
  50. UniqueHeloRatioMax float64 `json:"unique_helo_ratio"`
  51. }
  52. // Begin collecting data and listening for dashboard clients
  53. func Run(c *Config, l log.Logger) {
  54. mainlogStore.Store(l)
  55. statikFS, _ := fs.New()
  56. //store = newDataStore()
  57. applyConfig(c)
  58. sessions = map[string]*session{}
  59. r := mux.NewRouter()
  60. r.HandleFunc("/ws", webSocketHandler)
  61. r.PathPrefix("/").Handler(http.FileServer(statikFS))
  62. rand.Seed(time.Now().UnixNano())
  63. started.Add(1)
  64. defer func() {
  65. s = stateStopped
  66. }()
  67. closer, err := ListenAndServeWithClose(c.ListenInterface, r)
  68. if err != nil {
  69. mainlog().WithError(err).Error("Dashboard server failed to start")
  70. started.Done()
  71. return
  72. }
  73. mainlog().Infof("started dashboard, listening on http [%s]", c.ListenInterface)
  74. wg.Add(1)
  75. go func() {
  76. wg.Add(1)
  77. dataListener(tickInterval)
  78. wg.Done()
  79. }()
  80. go func() {
  81. wg.Add(1)
  82. store.rankingManager()
  83. wg.Done()
  84. }()
  85. s = stateRunning
  86. started.Done()
  87. select {
  88. case <-stopHttp:
  89. closer.Close()
  90. wg.Done()
  91. return
  92. }
  93. }
  94. func Stop() {
  95. started.Wait()
  96. if s == stateRunning {
  97. stopDataListener <- true
  98. stopRankingManager <- true
  99. stopHttp <- true
  100. wg.Wait()
  101. }
  102. }
  103. func mainlog() log.Logger {
  104. if v, ok := mainlogStore.Load().(log.Logger); ok {
  105. return v
  106. }
  107. l, _ := log.GetLogger(log.OutputStderr.String(), log.InfoLevel.String())
  108. return l
  109. }
  110. // Parses options in config and applies to global variables
  111. func applyConfig(c *Config) {
  112. config = c
  113. if len(config.MaxWindow) > 0 {
  114. mw, err := time.ParseDuration(config.MaxWindow)
  115. if err == nil {
  116. maxWindow = mw
  117. }
  118. }
  119. if len(config.RankingUpdateInterval) > 0 {
  120. rui, err := time.ParseDuration(config.RankingUpdateInterval)
  121. if err == nil {
  122. rankingUpdateInterval = rui
  123. }
  124. }
  125. if len(config.TickInterval) > 0 {
  126. ti, err := time.ParseDuration(config.TickInterval)
  127. if err == nil {
  128. tickInterval = ti
  129. }
  130. }
  131. if config.UniqueHeloRatioMax > 0 {
  132. uniqueHeloRatioMax = config.UniqueHeloRatioMax
  133. }
  134. maxTicks = int(maxWindow * tickInterval)
  135. nRankingBuffers = int(maxWindow / rankingUpdateInterval)
  136. }
  137. func webSocketHandler(w http.ResponseWriter, r *http.Request) {
  138. var sess *session
  139. cookie, err := r.Cookie("SID")
  140. fmt.Println("cookie", cookie, err.Error())
  141. if err != nil {
  142. // Haven't set this cookie yet.
  143. sess = startSession(w, r)
  144. } else {
  145. var sidExists bool
  146. sess, sidExists = sessions[cookie.Value]
  147. if !sidExists {
  148. // No SID cookie in our store, start a new session
  149. sess = startSession(w, r)
  150. }
  151. }
  152. conn, err := upgrader.Upgrade(w, r, nil)
  153. if err != nil {
  154. w.WriteHeader(http.StatusInternalServerError)
  155. return
  156. }
  157. sess.ws = conn
  158. c := make(chan *message)
  159. sess.send = c
  160. store.subscribe(sess.id, c)
  161. go sess.receive()
  162. go sess.transmit()
  163. go store.initSession(sess)
  164. }
  165. func startSession(w http.ResponseWriter, r *http.Request) *session {
  166. sessionID := newSessionID()
  167. cookie := &http.Cookie{
  168. Name: "SID",
  169. Value: sessionID,
  170. Path: "/",
  171. // Secure: true, // TODO re-add this when TLS is set up
  172. }
  173. sess := &session{
  174. id: sessionID,
  175. }
  176. http.SetCookie(w, cookie)
  177. sessions[sessionID] = sess
  178. return sess
  179. }