瀏覽代碼

- split hook to hook.go
- point log to use log from go-guerrilla/log package so the logger can be injected
- dashboard coverage increased to > 80%

flashmob 8 年之前
父節點
當前提交
7e86a4e19f
共有 7 個文件被更改,包括 190 次插入84 次删除
  1. 18 5
      dashboard/dashboard.go
  2. 108 16
      dashboard/dashboard_test.go
  3. 1 55
      dashboard/datastore.go
  4. 57 0
      dashboard/hook.go
  5. 1 2
      dashboard/http.go
  6. 4 5
      dashboard/session.go
  7. 1 1
      guerrilla.go

+ 18 - 5
dashboard/dashboard.go

@@ -8,11 +8,12 @@ import (
 
 	"sync"
 
-	log "github.com/Sirupsen/logrus"
 	_ "github.com/flashmob/go-guerrilla/dashboard/statik"
+	"github.com/flashmob/go-guerrilla/log"
 	"github.com/gorilla/mux"
 	"github.com/gorilla/websocket"
 	"github.com/rakyll/statik/fs"
+	"sync/atomic"
 )
 
 var (
@@ -27,6 +28,8 @@ var (
 	started sync.WaitGroup
 
 	s state
+
+	mainlogStore atomic.Value
 )
 
 type state int
@@ -60,9 +63,10 @@ type Config struct {
 }
 
 // Begin collecting data and listening for dashboard clients
-func Run(c *Config) {
+func Run(c *Config, l log.Logger) {
+	mainlogStore.Store(l)
 	statikFS, _ := fs.New()
-
+	//store = newDataStore()
 	applyConfig(c)
 	sessions = map[string]*session{}
 
@@ -80,11 +84,11 @@ func Run(c *Config) {
 
 	closer, err := ListenAndServeWithClose(c.ListenInterface, r)
 	if err != nil {
-		log.WithError(err).Error("Dashboard server failed to start")
+		mainlog().WithError(err).Error("Dashboard server failed to start")
 		started.Done()
 		return
 	}
-	log.Infof("started dashboard, listening on http [%s]", c.ListenInterface)
+	mainlog().Infof("started dashboard, listening on http [%s]", c.ListenInterface)
 	wg.Add(1)
 
 	go func() {
@@ -116,10 +120,19 @@ func Stop() {
 		stopRankingManager <- true
 		stopHttp <- true
 		wg.Wait()
+
 	}
 
 }
 
+func mainlog() log.Logger {
+	if v, ok := mainlogStore.Load().(log.Logger); ok {
+		return v
+	}
+	l, _ := log.GetLogger(log.OutputStderr.String(), log.InfoLevel.String())
+	return l
+}
+
 // Parses options in config and applies to global variables
 func applyConfig(c *Config) {
 	config = c

+ 108 - 16
dashboard/dashboard_test.go

@@ -2,16 +2,25 @@ package dashboard
 
 import (
 	"bufio"
+	"encoding/json"
+	"fmt"
+	"github.com/flashmob/go-guerrilla/log"
+	"github.com/gorilla/websocket"
+	"net/url"
 	"os"
+	"regexp"
+	"strings"
 	"sync"
 	"testing"
 	"time"
-	//"fmt"
-	"github.com/flashmob/go-guerrilla/log"
-	"regexp"
-	"strings"
 )
 
+var testlog log.Logger
+
+func init() {
+	testlog, _ = log.GetLogger(log.OutputOff.String(), log.InfoLevel.String())
+}
+
 func TestRunStop(t *testing.T) {
 
 	config := &Config{
@@ -26,7 +35,7 @@ func TestRunStop(t *testing.T) {
 
 	wg.Add(1)
 	go func() {
-		Run(config)
+		Run(config, testlog)
 		wg.Done()
 	}()
 	// give Run some time to start
@@ -54,7 +63,7 @@ func TestRunStopBadAddress(t *testing.T) {
 
 	wg.Add(1)
 	go func() {
-		Run(config)
+		Run(config, testlog)
 		wg.Done()
 	}()
 
@@ -82,12 +91,23 @@ func TestSimulationRun(t *testing.T) {
 
 	wg.Add(1)
 	go func() {
-		Run(config)
+		Run(config, testlog)
 		wg.Done()
 	}()
 	// give Run some time to start
 	time.Sleep(time.Second)
 
+	simulateEvents(t)
+
+	Stop()
+
+	// Wait for Run() to exit
+	wg.Wait()
+
+}
+
+func simulateEvents(t *testing.T) {
+
 	file, err := os.Open("simulation.log")
 
 	if err != nil {
@@ -101,8 +121,7 @@ func TestSimulationRun(t *testing.T) {
 
 	scanner.Split(bufio.ScanLines)
 
-	l, _ := log.GetLogger("stderr", "info")
-	l.AddHook(LogHook)
+	testlog.AddHook(LogHook)
 
 	// match with quotes or without, ie. time="..." or level=
 	r := regexp.MustCompile(`(.+?)=("[^"]*"|\S*)\s*`)
@@ -141,15 +160,9 @@ func TestSimulationRun(t *testing.T) {
 		time.Sleep(diff)              // wait so that we don't go too fast
 		simStart = simStart.Add(diff) // catch up
 
-		l.WithFields(fields).Info(msg)
+		testlog.WithFields(fields).Info(msg)
 
 	}
-
-	Stop()
-
-	// Wait for Run() to exit
-	wg.Wait()
-
 }
 
 // parseItem parses a log item, eg time="2017-03-24T11:55:44+11:00" will be:
@@ -168,3 +181,82 @@ func parseItem(item string) (key string, val string) {
 	val = strings.TrimSpace(val)
 	return
 }
+
+// Run a simulation from an already captured log
+// Then open a websocket and validate that we are getting some data from it
+func TestWebsocket(t *testing.T) {
+
+	config := &Config{
+		Enabled:               true,
+		ListenInterface:       "127.0.0.1:8082",
+		TickInterval:          "1s",
+		MaxWindow:             "24h",
+		RankingUpdateInterval: "6h",
+	}
+
+	var wg sync.WaitGroup
+
+	wg.Add(1)
+	go func() {
+		Run(config, testlog)
+		wg.Done()
+	}()
+
+	var simWg sync.WaitGroup
+	go func() {
+		simWg.Add(1)
+		simulateEvents(t)
+		simWg.Done()
+	}()
+
+	time.Sleep(time.Second)
+
+	// lets talk to the websocket
+	u := url.URL{Scheme: "ws", Host: "127.0.0.1:8082", Path: "/ws"}
+
+	c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
+	if err != nil {
+		t.Error("cant conect':", err)
+		return
+	}
+
+	simWg.Add(1)
+	go func() {
+		defer func() {
+			simWg.Done()
+		}()
+		i := 0
+		for {
+			c.SetReadDeadline(time.Now().Add(time.Second + 5))
+			_, msg, err := c.ReadMessage()
+			if err != nil {
+				fmt.Println("socket err:", err)
+				t.Error("websocket failed to connect")
+				return
+			}
+			var objmap map[string]*json.RawMessage
+			json.Unmarshal(msg, &objmap)
+
+			if pl, ok := objmap["payload"]; ok {
+				df := &dataFrame{}
+				json.Unmarshal(*pl, &df)
+				if df.NClients.Y > 10 && len(df.TopHelo) > 10 && len(df.TopDomain) > 10 && len(df.TopIP) > 10 {
+					return
+				}
+			}
+			fmt.Println("recv:", string(msg))
+			i++
+			if i > 2 {
+				t.Error("Websocket did get find expected result")
+				return
+			}
+		}
+
+	}()
+	simWg.Wait() // wait for sim to exit, wait for websocket to finish reading
+	Stop()
+	// Wait for Run() to exit
+	wg.Wait()
+	c.Close()
+
+}

+ 1 - 55
dashboard/datastore.go

@@ -4,8 +4,6 @@ import (
 	"runtime"
 	"sync"
 	"time"
-
-	log "github.com/Sirupsen/logrus"
 )
 
 const (
@@ -216,7 +214,7 @@ func dataListener(interval time.Duration) {
 			runtime.ReadMemStats(memStats)
 			ramPoint := point{t, memStats.Alloc}
 			nClientPoint := point{t, store.nClients}
-			log.WithFields(map[string]interface{}{
+			mainlog().WithFields(map[string]interface{}{
 				"ram":     ramPoint.Y,
 				"clients": nClientPoint.Y,
 			}).Info("Logging analytics data")
@@ -259,55 +257,3 @@ type message struct {
 	Type    string      `json:"type"`
 	Payload interface{} `json:"payload"`
 }
-
-type logHook int
-
-func (h logHook) Levels() []log.Level {
-	return log.AllLevels
-}
-
-// Checks fired logs for information that is relevant to the dashboard
-func (h logHook) Fire(e *log.Entry) error {
-	event, ok := e.Data["event"].(string)
-	if !ok {
-		return nil
-	}
-
-	var helo, ip, domain string
-	if event == "mailfrom" {
-		helo, ok = e.Data["helo"].(string)
-		if !ok {
-			return nil
-		}
-		if len(helo) > 16 {
-			helo = helo[:16]
-		}
-		ip, ok = e.Data["address"].(string)
-		if !ok {
-			return nil
-		}
-		domain, ok = e.Data["domain"].(string)
-		if !ok {
-			return nil
-		}
-	}
-
-	switch event {
-	case "connect":
-		store.lock.Lock()
-		store.nClients++
-		store.lock.Unlock()
-	case "mailfrom":
-		store.newConns <- conn{
-			domain: domain,
-			helo:   helo,
-			ip:     ip,
-		}
-	case "disconnect":
-		log.Infof("disconnect in dashboard, nclients: %d", store.nClients)
-		store.lock.Lock()
-		store.nClients--
-		store.lock.Unlock()
-	}
-	return nil
-}

+ 57 - 0
dashboard/hook.go

@@ -0,0 +1,57 @@
+package dashboard
+
+import (
+	log "github.com/Sirupsen/logrus"
+)
+
+type logHook int
+
+func (h logHook) Levels() []log.Level {
+	return log.AllLevels
+}
+
+// Checks fired logs for information that is relevant to the dashboard
+func (h logHook) Fire(e *log.Entry) error {
+	event, ok := e.Data["event"].(string)
+	if !ok {
+		return nil
+	}
+
+	var helo, ip, domain string
+	if event == "mailfrom" {
+		helo, ok = e.Data["helo"].(string)
+		if !ok {
+			return nil
+		}
+		if len(helo) > 16 {
+			helo = helo[:16]
+		}
+		ip, ok = e.Data["address"].(string)
+		if !ok {
+			return nil
+		}
+		domain, ok = e.Data["domain"].(string)
+		if !ok {
+			return nil
+		}
+	}
+
+	switch event {
+	case "connect":
+		store.lock.Lock()
+		store.nClients++
+		store.lock.Unlock()
+	case "mailfrom":
+		store.newConns <- conn{
+			domain: domain,
+			helo:   helo,
+			ip:     ip,
+		}
+	case "disconnect":
+		mainlog().Infof("disconnect in dashboard, nclients: %d", store.nClients)
+		store.lock.Lock()
+		store.nClients--
+		store.lock.Unlock()
+	}
+	return nil
+}

+ 1 - 2
dashboard/http.go

@@ -1,7 +1,6 @@
 package dashboard
 
 import (
-	log "github.com/Sirupsen/logrus"
 	"io"
 	"net"
 	"net/http"
@@ -31,7 +30,7 @@ func ListenAndServeWithClose(addr string, handler http.Handler) (io.Closer, erro
 	go func() {
 		err := srv.Serve(tcpKeepAliveListener{listener.(*net.TCPListener)})
 		if err != nil {
-			log.Println("HTTP Server Error - ", err)
+			mainlog().Error("HTTP Server Error - ", err)
 		}
 	}()
 

+ 4 - 5
dashboard/session.go

@@ -4,7 +4,6 @@ import (
 	"math/rand"
 	"time"
 
-	log "github.com/Sirupsen/logrus"
 	"github.com/gorilla/websocket"
 )
 
@@ -39,11 +38,11 @@ func (s *session) receive() {
 		_, message, err := s.ws.ReadMessage()
 		if err != nil {
 			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
-				log.WithError(err).Error("Websocket closed unexpectedly")
+				mainlog().WithError(err).Error("Websocket closed unexpectedly")
 			}
 			break
 		}
-		log.Infof("Message: %s", string(message))
+		mainlog().Infof("Message: %s", string(message))
 	}
 }
 
@@ -66,13 +65,13 @@ transmit:
 
 			err := s.ws.WriteJSON(p)
 			if err != nil {
-				log.WithError(err).Debug("Failed to write next websocket message. Closing connection")
+				mainlog().WithError(err).Debug("Failed to write next websocket message. Closing connection")
 				break transmit
 			}
 		case <-ticker.C:
 			s.ws.SetWriteDeadline(time.Now().Add(writeWait))
 			if err := s.ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
-				log.WithError(err).Debug("Failed to write next websocket message. Closing connection")
+				mainlog().WithError(err).Debug("Failed to write next websocket message. Closing connection")
 				break transmit
 			}
 		}

+ 1 - 1
guerrilla.go

@@ -456,7 +456,7 @@ func (g *guerrilla) Start() error {
 	startWG.Wait()
 
 	if g.Config.Dashboard.Enabled {
-		go dashboard.Run(&g.Config.Dashboard)
+		go dashboard.Run(&g.Config.Dashboard, g.mainlog())
 	}
 
 	// close, then read any errors