Browse Source

Send actions over websockets directly as redux actions
New connections to dashboard page see all dashboard history rather than just new data

Jordan Schalm 8 years ago
parent
commit
c4fea8385e

+ 2 - 2
dashboard/dashboard.go

@@ -82,13 +82,13 @@ func webSocketHandler(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 	sess.ws = conn
 	sess.ws = conn
-	c := make(chan *dataFrame)
+	c := make(chan *message)
 	sess.send = c
 	sess.send = c
 	// TODO send store contents at connection time
 	// TODO send store contents at connection time
 	store.subscribe(sess.id, c)
 	store.subscribe(sess.id, c)
-	store.initSession(sess)
 	go sess.receive()
 	go sess.receive()
 	go sess.transmit()
 	go sess.transmit()
+	go store.initSession(sess)
 }
 }
 
 
 func startSession(w http.ResponseWriter, r *http.Request) *session {
 func startSession(w http.ResponseWriter, r *http.Request) *session {

+ 28 - 10
dashboard/datastore.go

@@ -12,6 +12,8 @@ const (
 	tickInterval = time.Second * 5
 	tickInterval = time.Second * 5
 	maxWindow    = time.Hour * 24
 	maxWindow    = time.Hour * 24
 	maxTicks     = int(maxWindow / tickInterval)
 	maxTicks     = int(maxWindow / tickInterval)
+	INIT_MESSAGE = "INIT"
+	TICK_MESSAGE = "TICK"
 )
 )
 
 
 // Log for sending client events from the server to the dashboard.
 // Log for sending client events from the server to the dashboard.
@@ -28,11 +30,11 @@ type dataStore struct {
 	nClientTicks []point
 	nClientTicks []point
 	// Up-to-date number of clients
 	// Up-to-date number of clients
 	nClients uint64
 	nClients uint64
-	subs     map[string]chan<- *dataFrame
+	subs     map[string]chan<- *message
 }
 }
 
 
 func newDataStore() *dataStore {
 func newDataStore() *dataStore {
-	subs := make(map[string]chan<- *dataFrame)
+	subs := make(map[string]chan<- *message)
 	return &dataStore{
 	return &dataStore{
 		ramTicks:     make([]point, 0, maxTicks),
 		ramTicks:     make([]point, 0, maxTicks),
 		nClientTicks: make([]point, 0, maxTicks),
 		nClientTicks: make([]point, 0, maxTicks),
@@ -56,7 +58,7 @@ func (ds *dataStore) addNClientPoint(p point) {
 	}
 	}
 }
 }
 
 
-func (ds *dataStore) subscribe(id string, c chan<- *dataFrame) {
+func (ds *dataStore) subscribe(id string, c chan<- *message) {
 	ds.subs[id] = c
 	ds.subs[id] = c
 }
 }
 
 
@@ -64,17 +66,20 @@ func (ds *dataStore) unsubscribe(id string) {
 	delete(ds.subs, id)
 	delete(ds.subs, id)
 }
 }
 
 
-func (ds *dataStore) notify(p *dataFrame) {
+func (ds *dataStore) notify(m *message) {
 	for _, c := range ds.subs {
 	for _, c := range ds.subs {
 		select {
 		select {
-		case c <- p:
+		case c <- m:
 		default:
 		default:
 		}
 		}
 	}
 	}
 }
 }
 
 
 func (ds *dataStore) initSession(sess *session) {
 func (ds *dataStore) initSession(sess *session) {
-	// TODO implement
+	store.subs[sess.id] <- &message{INIT_MESSAGE, initFrame{
+		Ram:      store.ramTicks,
+		NClients: store.nClientTicks,
+	}}
 }
 }
 
 
 type point struct {
 type point struct {
@@ -94,24 +99,37 @@ func dataListener(interval time.Duration) {
 		log.Info("datastore:89", ramPoint, nClientPoint)
 		log.Info("datastore:89", ramPoint, nClientPoint)
 		store.addRAMPoint(ramPoint)
 		store.addRAMPoint(ramPoint)
 		store.addNClientPoint(nClientPoint)
 		store.addNClientPoint(nClientPoint)
-		store.notify(&dataFrame{
+		store.notify(&message{TICK_MESSAGE, dataFrame{
 			Ram:      ramPoint,
 			Ram:      ramPoint,
 			NClients: nClientPoint,
 			NClients: nClientPoint,
-		})
+		}})
 	}
 	}
 }
 }
 
 
 type dataFrame struct {
 type dataFrame struct {
 	Ram      point `json:"ram"`
 	Ram      point `json:"ram"`
-	NClients point `json:"n_clients"`
+	NClients point `json:"nClients"`
 	// top5Helo []string // TODO add for aggregation
 	// top5Helo []string // TODO add for aggregation
 	// top5IP   []string
 	// top5IP   []string
 }
 }
 
 
+type initFrame struct {
+	Ram      []point `json:"ram"`
+	NClients []point `json:"nClients"`
+	// top5Helo []string // TODO add for aggregation
+	// top5IP   []string
+}
+
+// Format of messages to be sent over WebSocket
+type message struct {
+	Type    string      `json:"type"`
+	Payload interface{} `json:"payload"`
+}
+
 type logHook int
 type logHook int
 
 
 func (h logHook) Levels() []log.Level {
 func (h logHook) Levels() []log.Level {
-	return []log.Level{log.InfoLevel}
+	return log.AllLevels
 }
 }
 
 
 func (h logHook) Fire(e *log.Entry) error {
 func (h logHook) Fire(e *log.Entry) error {

+ 4 - 3
dashboard/js/src/components/App.js

@@ -1,6 +1,7 @@
 import React, { Component } from 'react';
 import React, { Component } from 'react';
 import {connect} from 'react-redux';
 import {connect} from 'react-redux';
 import {init, tick} from '../action-creators';
 import {init, tick} from '../action-creators';
+import ActionTypes from '../action-types';
 import LineChart from './LineChart';
 import LineChart from './LineChart';
 
 
 const styles = {
 const styles = {
@@ -25,9 +26,9 @@ class App extends Component {
 		ws.onopen = event => console.log(event);
 		ws.onopen = event => console.log(event);
 		ws.onclose = event => console.log(event);
 		ws.onclose = event => console.log(event);
 		ws.onmessage = event => {
 		ws.onmessage = event => {
-			const data = JSON.parse(event.data);
-			console.log(data);
-			props.dispatch(tick(data));
+			const message = JSON.parse(event.data);
+			console.log(message);
+			props.dispatch(message);
 		};
 		};
 
 
 		this.state = {ws};
 		this.state = {ws};

+ 1 - 0
dashboard/js/src/reducer.js

@@ -18,6 +18,7 @@ const initialState = Immutable.Map({
 });
 });
 
 
 const reducer = (state = initialState, {type, payload}) => {
 const reducer = (state = initialState, {type, payload}) => {
+	console.log(type, payload);
 	let newState = state;
 	let newState = state;
 
 
 	switch (type) {
 	switch (type) {

+ 2 - 7
dashboard/session.go

@@ -1,7 +1,6 @@
 package dashboard
 package dashboard
 
 
 import (
 import (
-	"encoding/json"
 	"math/rand"
 	"math/rand"
 	"time"
 	"time"
 
 
@@ -23,7 +22,7 @@ type session struct {
 	id string
 	id string
 	ws *websocket.Conn
 	ws *websocket.Conn
 	// Messages to send over the websocket are received on this channel
 	// Messages to send over the websocket are received on this channel
-	send <-chan *dataFrame
+	send <-chan *message
 }
 }
 
 
 // Receives messages from the websocket connection associated with a session
 // Receives messages from the websocket connection associated with a session
@@ -59,17 +58,13 @@ transmit:
 	for {
 	for {
 		select {
 		select {
 		case p, ok := <-s.send:
 		case p, ok := <-s.send:
-			data, err := json.Marshal(p)
-			log.Info("session:63", string(data), err)
-			log.Info("session:64", p.NClients, p.Ram)
 			s.ws.SetWriteDeadline(time.Now().Add(writeWait))
 			s.ws.SetWriteDeadline(time.Now().Add(writeWait))
 			if !ok {
 			if !ok {
 				s.ws.WriteMessage(websocket.CloseMessage, []byte{})
 				s.ws.WriteMessage(websocket.CloseMessage, []byte{})
 				break transmit
 				break transmit
 			}
 			}
 
 
-			err = s.ws.WriteJSON(p)
-			log.Info("session:67", err)
+			err := s.ws.WriteJSON(p)
 			if err != nil {
 			if err != nil {
 				log.WithError(err).Debug("Failed to write next websocket message. Closing connection")
 				log.WithError(err).Debug("Failed to write next websocket message. Closing connection")
 				break transmit
 				break transmit

+ 2 - 1
tests/.gitignore

@@ -1 +1,2 @@
-*.pem
+*.pem
+testlog