Browse Source

get websockets working reliably across reloads, add unsubscribe method to datastore

Jordan Schalm 8 years ago
parent
commit
89060684fd
4 changed files with 36 additions and 27 deletions
  1. 21 2
      dashboard/dashboard.go
  2. 10 23
      dashboard/datastore.go
  3. 1 0
      dashboard/html/index.html
  4. 4 2
      dashboard/session.go

+ 21 - 2
dashboard/dashboard.go

@@ -42,6 +42,7 @@ func Run(c *Config) {
 	r := mux.NewRouter()
 	r.HandleFunc("/", indexHandler)
 	r.HandleFunc("/login", loginHandler)
+	r.HandleFunc("/logout", logoutHandler)
 	r.HandleFunc("/ws", webSocketHandler)
 
 	rand.Seed(time.Now().UnixNano())
@@ -93,6 +94,24 @@ func loginHandler(w http.ResponseWriter, r *http.Request) {
 	}
 }
 
+func logoutHandler(w http.ResponseWriter, r *http.Request) {
+	switch r.Method {
+	case "POST":
+		sess := getSession(r)
+		if sess == nil {
+			w.WriteHeader(http.StatusForbidden)
+			return
+		}
+
+		store.unsubscribe(sess.id)
+		sess.expires = time.Now()
+		http.Redirect(w, r, "/", http.StatusSeeOther)
+
+	default:
+		w.WriteHeader(http.StatusMethodNotAllowed)
+	}
+}
+
 func webSocketHandler(w http.ResponseWriter, r *http.Request) {
 	if !isLoggedIn(r) {
 		w.WriteHeader(http.StatusForbidden)
@@ -109,7 +128,7 @@ func webSocketHandler(w http.ResponseWriter, r *http.Request) {
 	sess.ws = conn
 	c := make(chan *point)
 	sess.send = c
-	store.subscribe(c)
+	store.subscribe(sess.id, c)
 	go sess.receive()
 	go sess.transmit()
 }
@@ -121,7 +140,7 @@ func startSession(w http.ResponseWriter, r *http.Request) error {
 		Name:  "SID",
 		Value: sessionID,
 		Path:  "/",
-		// Secure: true,
+		// Secure: true, // TODO re-add this when TLS is set up
 	}
 
 	sess := &session{

+ 10 - 23
dashboard/datastore.go

@@ -13,12 +13,13 @@ const (
 
 type dataStore struct {
 	ram  []*point
-	subs []chan<- *point
+	subs map[string]chan<- *point
 }
 
 func newDataStore() *dataStore {
 	return &dataStore{
-		ram: make([]*point, 0, maxTicks),
+		ram:  make([]*point, 0, maxTicks),
+		subs: make(map[string]chan<- *point),
 	}
 }
 
@@ -31,34 +32,20 @@ func (ds *dataStore) addPoint(p *point) {
 	ds.notify(p)
 }
 
-func (ds *dataStore) subscribe(c chan<- *point) {
-	ds.subs = append(ds.subs, c)
+func (ds *dataStore) subscribe(id string, c chan<- *point) {
+	ds.subs[id] = c
+}
+
+func (ds *dataStore) unsubscribe(id string) {
+	delete(ds.subs, id)
 }
 
 func (ds *dataStore) notify(p *point) {
-	var toUnsubscribe []int
-	for i, c := range ds.subs {
+	for _, c := range ds.subs {
 		select {
 		case c <- p:
 		default:
-			close(c)
-			toUnsubscribe = append(toUnsubscribe, i)
-		}
-	}
-
-	if len(toUnsubscribe) > 0 {
-		newSubs := ds.subs[:0]
-		for i, c := range ds.subs {
-			if i != toUnsubscribe[0] {
-				newSubs = append(newSubs, c)
-			} else {
-				toUnsubscribe = toUnsubscribe[1:]
-				if len(toUnsubscribe) == 0 {
-					break
-				}
-			}
 		}
-		ds.subs = newSubs
 	}
 }
 

+ 1 - 0
dashboard/html/index.html

@@ -7,6 +7,7 @@
 		<canvas id="ram-graph" width="500" height="200"></canvas>
 	</body>
 	<script src="https://cdnjs.cloudflare.com/ajax/libs/smoothie/1.27.0/smoothie.min.js"></script>
+	<script src="https://cdnjs.cloudflare.com/ajax/libs/Chart.js/2.4.0/Chart.min.js"></script>
 	<script>
 	var conn = new WebSocket('ws://localhost:8080/ws');
 	var smoothie = new SmoothieChart();

+ 4 - 2
dashboard/session.go

@@ -20,7 +20,9 @@ var idCharset = []byte("qwertyuiopasdfghjklzxcvbnmQWERTYUIOPASDFGHJKLZXCVBNM1234
 type session struct {
 	start, expires time.Time
 	id             string
-	ws             *websocket.Conn
+	// Whether we have a valid
+	alive bool
+	ws    *websocket.Conn
 	// Messages to send over the websocket are received on this channel
 	send <-chan *point
 }
@@ -44,8 +46,8 @@ func (s *session) receive() {
 		if err != nil {
 			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
 				log.WithError(err).Error("Websocket closed unexpectedly")
-				break
 			}
+			break
 		}
 		log.Infof("Message: %s", string(message))
 	}