Browse Source

- add Stop function
- Start blocks until Stop() is called (using a non-blocking version of ListenAndServe)

flashmob 8 years ago
parent
commit
8fcb6c2be8
4 changed files with 151 additions and 20 deletions
  1. 41 4
      dashboard/dashboard.go
  2. 31 0
      dashboard/dashboard_test.go
  3. 25 16
      dashboard/datastore.go
  4. 54 0
      dashboard/http.go

+ 41 - 4
dashboard/dashboard.go

@@ -11,11 +11,18 @@ import (
 	"github.com/gorilla/mux"
 	"github.com/gorilla/mux"
 	"github.com/gorilla/websocket"
 	"github.com/gorilla/websocket"
 	"github.com/rakyll/statik/fs"
 	"github.com/rakyll/statik/fs"
+	"sync"
 )
 )
 
 
 var (
 var (
 	config   *Config
 	config   *Config
 	sessions map[string]*session
 	sessions map[string]*session
+
+	stopRankingManager chan bool = make(chan bool)
+	stopDataListener   chan bool = make(chan bool)
+	stopHttp           chan bool = make(chan bool)
+
+	wg sync.WaitGroup
 )
 )
 
 
 var upgrader = websocket.Upgrader{
 var upgrader = websocket.Upgrader{
@@ -54,11 +61,41 @@ func Run(c *Config) {
 
 
 	rand.Seed(time.Now().UnixNano())
 	rand.Seed(time.Now().UnixNano())
 
 
-	go dataListener(tickInterval)
-	go store.rankingManager()
+	go func() {
+		wg.Add(1)
+		dataListener(tickInterval)
+		wg.Done()
+	}()
+	go func() {
+		wg.Add(1)
+		store.rankingManager()
+		wg.Done()
+	}()
+
+	closer, err := ListenAndServeWithClose(c.ListenInterface, r)
+	if err != nil {
+		stopDataListener <- true
+		stopRankingManager <- true
+		log.WithError(err).Error("Dashboard server failed to start")
+		return
+	}
+	log.Infof("started dashboard, listening on http [%s]", c.ListenInterface)
+	wg.Add(1)
+	for {
+		select {
+		case <-stopHttp:
+			closer.Close()
+			wg.Done()
+			return
+		}
+	}
+}
 
 
-	err := http.ListenAndServe(c.ListenInterface, r)
-	log.WithError(err).Error("Dashboard server failed to start")
+func Stop() {
+	stopDataListener <- true
+	stopRankingManager <- true
+	stopHttp <- true
+	wg.Wait()
 }
 }
 
 
 // Parses options in config and applies to global variables
 // Parses options in config and applies to global variables

+ 31 - 0
dashboard/dashboard_test.go

@@ -0,0 +1,31 @@
+package dashboard
+
+import (
+	"sync"
+	"testing"
+)
+
+func TestRunStop(t *testing.T) {
+
+	config := &Config{
+		Enabled:               true,
+		ListenInterface:       ":8081",
+		TickInterval:          "5s",
+		MaxWindow:             "24h",
+		RankingUpdateInterval: "6h",
+	}
+
+	var wg sync.WaitGroup
+
+	wg.Add(1)
+	go func() {
+		Run(config)
+		wg.Done()
+	}()
+
+	Stop()
+
+	// Wait for Run() to exit
+	wg.Wait()
+
+}

+ 25 - 16
dashboard/datastore.go

@@ -117,7 +117,11 @@ func (ds *dataStore) rankingManager() {
 				[]map[string]int{map[string]int{}},
 				[]map[string]int{map[string]int{}},
 				ds.topHelo[:len(ds.topIP)-1]...)
 				ds.topHelo[:len(ds.topIP)-1]...)
 			ds.lock.Unlock()
 			ds.lock.Unlock()
+
+		case <-stopRankingManager:
+			return
 		}
 		}
+
 	}
 	}
 }
 }
 
 
@@ -205,22 +209,27 @@ func dataListener(interval time.Duration) {
 	memStats := &runtime.MemStats{}
 	memStats := &runtime.MemStats{}
 
 
 	for {
 	for {
-		t := <-ticker
-		runtime.ReadMemStats(memStats)
-		ramPoint := point{t, memStats.Alloc}
-		nClientPoint := point{t, store.nClients}
-		log.WithFields(map[string]interface{}{
-			"ram":     ramPoint.Y,
-			"clients": nClientPoint.Y,
-		}).Info("Logging analytics data")
-
-		store.addRAMPoint(ramPoint)
-		store.addNClientPoint(nClientPoint)
-		store.notify(&message{tickMessageType, dataFrame{
-			Ram:      ramPoint,
-			NClients: nClientPoint,
-			ranking:  store.aggregateRankings(),
-		}})
+		select {
+		case t := <-ticker:
+			runtime.ReadMemStats(memStats)
+			ramPoint := point{t, memStats.Alloc}
+			nClientPoint := point{t, store.nClients}
+			log.WithFields(map[string]interface{}{
+				"ram":     ramPoint.Y,
+				"clients": nClientPoint.Y,
+			}).Info("Logging analytics data")
+
+			store.addRAMPoint(ramPoint)
+			store.addNClientPoint(nClientPoint)
+			store.notify(&message{tickMessageType, dataFrame{
+				Ram:      ramPoint,
+				NClients: nClientPoint,
+				ranking:  store.aggregateRankings(),
+			}})
+		case <-stopDataListener:
+			return
+		}
+
 	}
 	}
 }
 }
 
 

+ 54 - 0
dashboard/http.go

@@ -0,0 +1,54 @@
+package dashboard
+
+import (
+	log "github.com/Sirupsen/logrus"
+	"io"
+	"net"
+	"net/http"
+	"time"
+)
+
+// ListenAndServeWithClose is a non-blocking listen and serve returning a closer
+func ListenAndServeWithClose(addr string, handler http.Handler) (io.Closer, error) {
+
+	var (
+		listener  net.Listener
+		srvCloser io.Closer
+		err       error
+	)
+
+	srv := &http.Server{Addr: addr, Handler: handler}
+
+	if addr == "" {
+		addr = ":http"
+	}
+
+	listener, err = net.Listen("tcp", addr)
+	if err != nil {
+		return nil, err
+	}
+
+	go func() {
+		err := srv.Serve(tcpKeepAliveListener{listener.(*net.TCPListener)})
+		if err != nil {
+			log.Println("HTTP Server Error - ", err)
+		}
+	}()
+
+	srvCloser = listener
+	return srvCloser, nil
+}
+
+type tcpKeepAliveListener struct {
+	*net.TCPListener
+}
+
+func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
+	tc, err := ln.AcceptTCP()
+	if err != nil {
+		return
+	}
+	tc.SetKeepAlive(true)
+	tc.SetKeepAlivePeriod(3 * time.Minute)
+	return tc, nil
+}