123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- package dashboard
- import (
- "fmt"
- "math/rand"
- "net/http"
- "time"
- log "github.com/Sirupsen/logrus"
- _ "github.com/flashmob/go-guerrilla/dashboard/statik"
- "github.com/gorilla/mux"
- "github.com/gorilla/websocket"
- "github.com/rakyll/statik/fs"
- "sync"
- )
- var (
- config *Config
- sessions map[string]*session
- stopRankingManager chan bool = make(chan bool)
- stopDataListener chan bool = make(chan bool)
- stopHttp chan bool = make(chan bool)
- wg sync.WaitGroup
- started sync.WaitGroup
- s state
- )
- type state int
- const (
- stateStopped state = iota
- stateRunning
- )
- var upgrader = websocket.Upgrader{
- ReadBufferSize: 1024,
- WriteBufferSize: 1024,
- // TODO below for testing w/ webpack only, change before merging
- CheckOrigin: func(r *http.Request) bool { return true },
- }
- type Config struct {
- Enabled bool `json:"is_enabled"`
- ListenInterface string `json:"listen_interface"`
- // Interval at which we send measure and send dataframe to frontend
- TickInterval string `json:"tick_interval"`
- // Maximum interval for which we store data
- MaxWindow string `json:"max_window"`
- // Granularity for which rankings are aggregated
- RankingUpdateInterval string `json:"ranking_aggregation_interval"`
- // Determines at which ratio of unique HELOs to unique connections we
- // will stop collecting data to prevent memory exhaustion attack.
- // Number between 0-1, set to >1 if you never want to stop collecting data.
- // Default is 0.8
- UniqueHeloRatioMax float64 `json:"unique_helo_ratio"`
- }
- // Begin collecting data and listening for dashboard clients
- func Run(c *Config) {
- statikFS, _ := fs.New()
- applyConfig(c)
- sessions = map[string]*session{}
- r := mux.NewRouter()
- r.HandleFunc("/ws", webSocketHandler)
- r.PathPrefix("/").Handler(http.FileServer(statikFS))
- rand.Seed(time.Now().UnixNano())
- started.Add(1)
- defer func() {
- s = stateStopped
- }()
- closer, err := ListenAndServeWithClose(c.ListenInterface, r)
- if err != nil {
- log.WithError(err).Error("Dashboard server failed to start")
- started.Done()
- return
- }
- log.Infof("started dashboard, listening on http [%s]", c.ListenInterface)
- wg.Add(1)
- go func() {
- wg.Add(1)
- dataListener(tickInterval)
- wg.Done()
- }()
- go func() {
- wg.Add(1)
- store.rankingManager()
- wg.Done()
- }()
- s = stateRunning
- started.Done()
- select {
- case <-stopHttp:
- closer.Close()
- wg.Done()
- return
- }
- }
- func Stop() {
- started.Wait()
- if s == stateRunning {
- stopDataListener <- true
- stopRankingManager <- true
- stopHttp <- true
- wg.Wait()
- }
- }
- // Parses options in config and applies to global variables
- func applyConfig(c *Config) {
- config = c
- if len(config.MaxWindow) > 0 {
- mw, err := time.ParseDuration(config.MaxWindow)
- if err == nil {
- maxWindow = mw
- }
- }
- if len(config.RankingUpdateInterval) > 0 {
- rui, err := time.ParseDuration(config.RankingUpdateInterval)
- if err == nil {
- rankingUpdateInterval = rui
- }
- }
- if len(config.TickInterval) > 0 {
- ti, err := time.ParseDuration(config.TickInterval)
- if err == nil {
- tickInterval = ti
- }
- }
- if config.UniqueHeloRatioMax > 0 {
- uniqueHeloRatioMax = config.UniqueHeloRatioMax
- }
- maxTicks = int(maxWindow * tickInterval)
- nRankingBuffers = int(maxWindow / rankingUpdateInterval)
- }
- func webSocketHandler(w http.ResponseWriter, r *http.Request) {
- var sess *session
- cookie, err := r.Cookie("SID")
- fmt.Println("cookie", cookie, err.Error())
- if err != nil {
- // Haven't set this cookie yet.
- sess = startSession(w, r)
- } else {
- var sidExists bool
- sess, sidExists = sessions[cookie.Value]
- if !sidExists {
- // No SID cookie in our store, start a new session
- sess = startSession(w, r)
- }
- }
- conn, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- return
- }
- sess.ws = conn
- c := make(chan *message)
- sess.send = c
- store.subscribe(sess.id, c)
- go sess.receive()
- go sess.transmit()
- go store.initSession(sess)
- }
- func startSession(w http.ResponseWriter, r *http.Request) *session {
- sessionID := newSessionID()
- cookie := &http.Cookie{
- Name: "SID",
- Value: sessionID,
- Path: "/",
- // Secure: true, // TODO re-add this when TLS is set up
- }
- sess := &session{
- id: sessionID,
- }
- http.SetCookie(w, cookie)
- sessions[sessionID] = sess
- return sess
- }
|