Jelajahi Sumber

add meta packet statistics (#230)

This change add more metrics around "meta" (non "message" type packets).
For lighthouse packets, we also record statistics around the specific
lighthouse meta type.

We don't keep statistics for the "message" type so that we don't slow
down the fast path (and you can just look at metrics on the tun
interface to find that information).
Wade Simmons 5 tahun lalu
induk
melakukan
b37a91cfbc
12 mengubah file dengan 186 tambahan dan 16 penghapusan
  1. 2 2
      connection_manager_test.go
  2. 9 0
      examples/config.yml
  3. 2 0
      handshake_ix.go
  4. 7 0
      handshake_manager.go
  5. 9 0
      hostmap.go
  6. 7 2
      inside.go
  7. 5 6
      interface.go
  8. 27 1
      lighthouse.go
  9. 2 2
      lighthouse_test.go
  10. 12 0
      main.go
  11. 97 0
      message_metrics.go
  12. 7 3
      outside.go

+ 2 - 2
connection_manager_test.go

@@ -28,7 +28,7 @@ func Test_NewConnectionManagerTest(t *testing.T) {
 		rawCertificateNoKey: []byte{},
 		rawCertificateNoKey: []byte{},
 	}
 	}
 
 
-	lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1)
+	lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1, false)
 	ifce := &Interface{
 	ifce := &Interface{
 		hostMap:          hostMap,
 		hostMap:          hostMap,
 		inside:           &Tun{},
 		inside:           &Tun{},
@@ -91,7 +91,7 @@ func Test_NewConnectionManagerTest2(t *testing.T) {
 		rawCertificateNoKey: []byte{},
 		rawCertificateNoKey: []byte{},
 	}
 	}
 
 
-	lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1)
+	lh := NewLightHouse(false, 0, []uint32{}, 1000, 0, &udpConn{}, false, 1, false)
 	ifce := &Interface{
 	ifce := &Interface{
 		hostMap:          hostMap,
 		hostMap:          hostMap,
 		inside:           &Tun{},
 		inside:           &Tun{},

+ 9 - 0
examples/config.yml

@@ -177,6 +177,15 @@ logging:
   #subsystem: nebula
   #subsystem: nebula
   #interval: 10s
   #interval: 10s
 
 
+  # enables counter metrics for meta packets
+  #   e.g.: `messages.tx.handshake`
+  # NOTE: `message.{tx,rx}.recv_error` is always emitted
+  #message_metrics: false
+
+  # enables detailed counter metrics for lighthouse packets
+  #   e.g.: `lighthouse.rx.HostQuery`
+  #lighthouse_metrics: false
+
 # Handshake Manger Settings
 # Handshake Manger Settings
 #handshakes:
 #handshakes:
   # Total time to try a handshake = sequence of `try_interval * retries`
   # Total time to try a handshake = sequence of `try_interval * retries`

+ 2 - 0
handshake_ix.go

@@ -98,6 +98,7 @@ func ixHandshakeStage1(f *Interface, addr *udpAddr, hostinfo *HostInfo, packet [
 		hostinfo, _ := f.handshakeManager.pendingHostMap.QueryReverseIndex(hs.Details.InitiatorIndex)
 		hostinfo, _ := f.handshakeManager.pendingHostMap.QueryReverseIndex(hs.Details.InitiatorIndex)
 		if hostinfo != nil && bytes.Equal(hostinfo.HandshakePacket[0], packet[HeaderLen:]) {
 		if hostinfo != nil && bytes.Equal(hostinfo.HandshakePacket[0], packet[HeaderLen:]) {
 			if msg, ok := hostinfo.HandshakePacket[2]; ok {
 			if msg, ok := hostinfo.HandshakePacket[2]; ok {
+				f.messageMetrics.Tx(handshake, NebulaMessageSubType(msg[1]), 1)
 				err := f.outside.WriteTo(msg, addr)
 				err := f.outside.WriteTo(msg, addr)
 				if err != nil {
 				if err != nil {
 					l.WithField("vpnIp", IntIp(hostinfo.hostId)).WithField("udpAddr", addr).
 					l.WithField("vpnIp", IntIp(hostinfo.hostId)).WithField("udpAddr", addr).
@@ -191,6 +192,7 @@ func ixHandshakeStage1(f *Interface, addr *udpAddr, hostinfo *HostInfo, packet [
 			hostinfo.HandshakePacket[2] = make([]byte, len(msg))
 			hostinfo.HandshakePacket[2] = make([]byte, len(msg))
 			copy(hostinfo.HandshakePacket[2], msg)
 			copy(hostinfo.HandshakePacket[2], msg)
 
 
+			f.messageMetrics.Tx(handshake, NebulaMessageSubType(msg[1]), 1)
 			err := f.outside.WriteTo(msg, addr)
 			err := f.outside.WriteTo(msg, addr)
 			if err != nil {
 			if err != nil {
 				l.WithField("vpnIp", IntIp(vpnIP)).WithField("udpAddr", addr).
 				l.WithField("vpnIp", IntIp(vpnIP)).WithField("udpAddr", addr).

+ 7 - 0
handshake_manager.go

@@ -31,6 +31,8 @@ type HandshakeConfig struct {
 	tryInterval  time.Duration
 	tryInterval  time.Duration
 	retries      int
 	retries      int
 	waitRotation int
 	waitRotation int
+
+	messageMetrics *MessageMetrics
 }
 }
 
 
 type HandshakeManager struct {
 type HandshakeManager struct {
@@ -42,6 +44,8 @@ type HandshakeManager struct {
 
 
 	OutboundHandshakeTimer *SystemTimerWheel
 	OutboundHandshakeTimer *SystemTimerWheel
 	InboundHandshakeTimer  *SystemTimerWheel
 	InboundHandshakeTimer  *SystemTimerWheel
+
+	messageMetrics *MessageMetrics
 }
 }
 
 
 func NewHandshakeManager(tunCidr *net.IPNet, preferredRanges []*net.IPNet, mainHostMap *HostMap, lightHouse *LightHouse, outside *udpConn, config HandshakeConfig) *HandshakeManager {
 func NewHandshakeManager(tunCidr *net.IPNet, preferredRanges []*net.IPNet, mainHostMap *HostMap, lightHouse *LightHouse, outside *udpConn, config HandshakeConfig) *HandshakeManager {
@@ -55,6 +59,8 @@ func NewHandshakeManager(tunCidr *net.IPNet, preferredRanges []*net.IPNet, mainH
 
 
 		OutboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)),
 		OutboundHandshakeTimer: NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)),
 		InboundHandshakeTimer:  NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)),
 		InboundHandshakeTimer:  NewSystemTimerWheel(config.tryInterval, config.tryInterval*time.Duration(config.retries)),
+
+		messageMetrics: config.messageMetrics,
 	}
 	}
 }
 }
 
 
@@ -111,6 +117,7 @@ func (c *HandshakeManager) NextOutboundHandshakeTimerTick(now time.Time, f EncWr
 
 
 			// Ensure the handshake is ready to avoid a race in timer tick and stage 0 handshake generation
 			// Ensure the handshake is ready to avoid a race in timer tick and stage 0 handshake generation
 			if hostinfo.HandshakeReady && hostinfo.remote != nil {
 			if hostinfo.HandshakeReady && hostinfo.remote != nil {
+				c.messageMetrics.Tx(handshake, NebulaMessageSubType(hostinfo.HandshakePacket[0][1]), 1)
 				err := c.outside.WriteTo(hostinfo.HandshakePacket[0], hostinfo.remote)
 				err := c.outside.WriteTo(hostinfo.HandshakePacket[0], hostinfo.remote)
 				if err != nil {
 				if err != nil {
 					hostinfo.logger().WithField("udpAddr", hostinfo.remote).
 					hostinfo.logger().WithField("udpAddr", hostinfo.remote).

+ 9 - 0
hostmap.go

@@ -30,6 +30,7 @@ type HostMap struct {
 	vpnCIDR         *net.IPNet
 	vpnCIDR         *net.IPNet
 	defaultRoute    uint32
 	defaultRoute    uint32
 	unsafeRoutes    *CIDRTree
 	unsafeRoutes    *CIDRTree
+	metricsEnabled  bool
 }
 }
 
 
 type HostInfo struct {
 type HostInfo struct {
@@ -384,8 +385,16 @@ func (hm *HostMap) PunchList() []*udpAddr {
 }
 }
 
 
 func (hm *HostMap) Punchy(conn *udpConn) {
 func (hm *HostMap) Punchy(conn *udpConn) {
+	var metricsTxPunchy metrics.Counter
+	if hm.metricsEnabled {
+		metricsTxPunchy = metrics.GetOrRegisterCounter("messages.tx.punchy", nil)
+	} else {
+		metricsTxPunchy = metrics.NilCounter{}
+	}
+
 	for {
 	for {
 		for _, addr := range hm.PunchList() {
 		for _, addr := range hm.PunchList() {
+			metricsTxPunchy.Inc(1)
 			conn.WriteTo([]byte{1}, addr)
 			conn.WriteTo([]byte{1}, addr)
 		}
 		}
 		time.Sleep(time.Second * 30)
 		time.Sleep(time.Second * 30)

+ 7 - 2
inside.go

@@ -46,7 +46,7 @@ func (f *Interface) consumeInsidePacket(packet []byte, fwPacket *FirewallPacket,
 
 
 	dropReason := f.firewall.Drop(packet, *fwPacket, false, hostinfo, trustedCAs)
 	dropReason := f.firewall.Drop(packet, *fwPacket, false, hostinfo, trustedCAs)
 	if dropReason == nil {
 	if dropReason == nil {
-		f.send(message, 0, ci, hostinfo, hostinfo.remote, packet, nb, out)
+		f.sendNoMetrics(message, 0, ci, hostinfo, hostinfo.remote, packet, nb, out)
 		if f.lightHouse != nil && *ci.messageCounter%5000 == 0 {
 		if f.lightHouse != nil && *ci.messageCounter%5000 == 0 {
 			f.lightHouse.Query(fwPacket.RemoteIP, f)
 			f.lightHouse.Query(fwPacket.RemoteIP, f)
 		}
 		}
@@ -118,7 +118,7 @@ func (f *Interface) sendMessageNow(t NebulaMessageType, st NebulaMessageSubType,
 		return
 		return
 	}
 	}
 
 
-	f.send(message, st, hostInfo.ConnectionState, hostInfo, hostInfo.remote, p, nb, out)
+	f.sendNoMetrics(message, st, hostInfo.ConnectionState, hostInfo, hostInfo.remote, p, nb, out)
 	if f.lightHouse != nil && *hostInfo.ConnectionState.messageCounter%5000 == 0 {
 	if f.lightHouse != nil && *hostInfo.ConnectionState.messageCounter%5000 == 0 {
 		f.lightHouse.Query(fp.RemoteIP, f)
 		f.lightHouse.Query(fp.RemoteIP, f)
 	}
 	}
@@ -175,6 +175,11 @@ func (f *Interface) sendMessageToAll(t NebulaMessageType, st NebulaMessageSubTyp
 }
 }
 
 
 func (f *Interface) send(t NebulaMessageType, st NebulaMessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote *udpAddr, p, nb, out []byte) {
 func (f *Interface) send(t NebulaMessageType, st NebulaMessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote *udpAddr, p, nb, out []byte) {
+	f.messageMetrics.Tx(t, st, 1)
+	f.sendNoMetrics(t, st, ci, hostinfo, remote, p, nb, out)
+}
+
+func (f *Interface) sendNoMetrics(t NebulaMessageType, st NebulaMessageSubType, ci *ConnectionState, hostinfo *HostInfo, remote *udpAddr, p, nb, out []byte) {
 	if ci.eKey == nil {
 	if ci.eKey == nil {
 		//TODO: log warning
 		//TODO: log warning
 		return
 		return

+ 5 - 6
interface.go

@@ -25,6 +25,7 @@ type InterfaceConfig struct {
 	DropLocalBroadcast      bool
 	DropLocalBroadcast      bool
 	DropMulticast           bool
 	DropMulticast           bool
 	UDPBatchSize            int
 	UDPBatchSize            int
+	MessageMetrics          *MessageMetrics
 }
 }
 
 
 type Interface struct {
 type Interface struct {
@@ -45,9 +46,8 @@ type Interface struct {
 	udpBatchSize       int
 	udpBatchSize       int
 	version            string
 	version            string
 
 
-	metricRxRecvError metrics.Counter
-	metricTxRecvError metrics.Counter
-	metricHandshakes  metrics.Histogram
+	metricHandshakes metrics.Histogram
+	messageMetrics   *MessageMetrics
 }
 }
 
 
 func NewInterface(c *InterfaceConfig) (*Interface, error) {
 func NewInterface(c *InterfaceConfig) (*Interface, error) {
@@ -80,9 +80,8 @@ func NewInterface(c *InterfaceConfig) (*Interface, error) {
 		dropMulticast:      c.DropMulticast,
 		dropMulticast:      c.DropMulticast,
 		udpBatchSize:       c.UDPBatchSize,
 		udpBatchSize:       c.UDPBatchSize,
 
 
-		metricRxRecvError: metrics.GetOrRegisterCounter("messages.rx.recv_error", nil),
-		metricTxRecvError: metrics.GetOrRegisterCounter("messages.tx.recv_error", nil),
-		metricHandshakes:  metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)),
+		metricHandshakes: metrics.GetOrRegisterHistogram("handshakes", nil, metrics.NewExpDecaySample(1028, 0.015)),
+		messageMetrics:   c.MessageMetrics,
 	}
 	}
 
 
 	ifce.connectionManager = newConnectionManager(ifce, c.checkInterval, c.pendingDeletionInterval)
 	ifce.connectionManager = newConnectionManager(ifce, c.checkInterval, c.pendingDeletionInterval)

+ 27 - 1
lighthouse.go

@@ -7,6 +7,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/golang/protobuf/proto"
 	"github.com/golang/protobuf/proto"
+	"github.com/rcrowley/go-metrics"
 	"github.com/slackhq/nebula/cert"
 	"github.com/slackhq/nebula/cert"
 )
 )
 
 
@@ -37,6 +38,9 @@ type LightHouse struct {
 	nebulaPort  int
 	nebulaPort  int
 	punchBack   bool
 	punchBack   bool
 	punchDelay  time.Duration
 	punchDelay  time.Duration
+
+	metrics           *MessageMetrics
+	metricHolepunchTx metrics.Counter
 }
 }
 
 
 type EncWriter interface {
 type EncWriter interface {
@@ -44,7 +48,7 @@ type EncWriter interface {
 	SendMessageToAll(t NebulaMessageType, st NebulaMessageSubType, vpnIp uint32, p, nb, out []byte)
 	SendMessageToAll(t NebulaMessageType, st NebulaMessageSubType, vpnIp uint32, p, nb, out []byte)
 }
 }
 
 
-func NewLightHouse(amLighthouse bool, myIp uint32, ips []uint32, interval int, nebulaPort int, pc *udpConn, punchBack bool, punchDelay time.Duration) *LightHouse {
+func NewLightHouse(amLighthouse bool, myIp uint32, ips []uint32, interval int, nebulaPort int, pc *udpConn, punchBack bool, punchDelay time.Duration, metricsEnabled bool) *LightHouse {
 	h := LightHouse{
 	h := LightHouse{
 		amLighthouse: amLighthouse,
 		amLighthouse: amLighthouse,
 		myIp:         myIp,
 		myIp:         myIp,
@@ -58,6 +62,14 @@ func NewLightHouse(amLighthouse bool, myIp uint32, ips []uint32, interval int, n
 		punchDelay:   punchDelay,
 		punchDelay:   punchDelay,
 	}
 	}
 
 
+	if metricsEnabled {
+		h.metrics = newLighthouseMetrics()
+
+		h.metricHolepunchTx = metrics.GetOrRegisterCounter("messages.tx.holepunch", nil)
+	} else {
+		h.metricHolepunchTx = metrics.NilCounter{}
+	}
+
 	for _, ip := range ips {
 	for _, ip := range ips {
 		h.lighthouses[ip] = struct{}{}
 		h.lighthouses[ip] = struct{}{}
 	}
 	}
@@ -111,6 +123,7 @@ func (lh *LightHouse) QueryServer(ip uint32, f EncWriter) {
 			return
 			return
 		}
 		}
 
 
+		lh.metricTx(NebulaMeta_HostQuery, int64(len(lh.lighthouses)))
 		nb := make([]byte, 12, 12)
 		nb := make([]byte, 12, 12)
 		out := make([]byte, mtu)
 		out := make([]byte, mtu)
 		for n := range lh.lighthouses {
 		for n := range lh.lighthouses {
@@ -249,6 +262,7 @@ func (lh *LightHouse) LhUpdateWorker(f EncWriter) {
 			},
 			},
 		}
 		}
 
 
+		lh.metricTx(NebulaMeta_HostUpdateNotification, int64(len(lh.lighthouses)))
 		nb := make([]byte, 12, 12)
 		nb := make([]byte, 12, 12)
 		out := make([]byte, mtu)
 		out := make([]byte, mtu)
 		for vpnIp := range lh.lighthouses {
 		for vpnIp := range lh.lighthouses {
@@ -281,6 +295,8 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c
 		return
 		return
 	}
 	}
 
 
+	lh.metricRx(n.Type, 1)
+
 	switch n.Type {
 	switch n.Type {
 	case NebulaMeta_HostQuery:
 	case NebulaMeta_HostQuery:
 		// Exit if we don't answer queries
 		// Exit if we don't answer queries
@@ -308,6 +324,7 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c
 				l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).Error("Failed to marshal lighthouse host query reply")
 				l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).Error("Failed to marshal lighthouse host query reply")
 				return
 				return
 			}
 			}
+			lh.metricTx(NebulaMeta_HostQueryReply, 1)
 			f.SendMessageToVpnIp(lightHouse, 0, vpnIp, reply, make([]byte, 12, 12), make([]byte, mtu))
 			f.SendMessageToVpnIp(lightHouse, 0, vpnIp, reply, make([]byte, 12, 12), make([]byte, mtu))
 
 
 			// This signals the other side to punch some zero byte udp packets
 			// This signals the other side to punch some zero byte udp packets
@@ -326,6 +343,7 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c
 					},
 					},
 				}
 				}
 				reply, _ := proto.Marshal(answer)
 				reply, _ := proto.Marshal(answer)
+				lh.metricTx(NebulaMeta_HostPunchNotification, 1)
 				f.SendMessageToVpnIp(lightHouse, 0, n.Details.VpnIp, reply, make([]byte, 12, 12), make([]byte, mtu))
 				f.SendMessageToVpnIp(lightHouse, 0, n.Details.VpnIp, reply, make([]byte, 12, 12), make([]byte, mtu))
 			}
 			}
 			//fmt.Println(reply, remoteaddr)
 			//fmt.Println(reply, remoteaddr)
@@ -362,6 +380,7 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c
 			vpnPeer := NewUDPAddr(a.Ip, uint16(a.Port))
 			vpnPeer := NewUDPAddr(a.Ip, uint16(a.Port))
 			go func() {
 			go func() {
 				time.Sleep(lh.punchDelay)
 				time.Sleep(lh.punchDelay)
+				lh.metricHolepunchTx.Inc(1)
 				lh.punchConn.WriteTo(empty, vpnPeer)
 				lh.punchConn.WriteTo(empty, vpnPeer)
 
 
 			}()
 			}()
@@ -380,6 +399,13 @@ func (lh *LightHouse) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *c
 	}
 	}
 }
 }
 
 
+func (lh *LightHouse) metricRx(t NebulaMeta_MessageType, i int64) {
+	lh.metrics.Rx(NebulaMessageType(t), 0, i)
+}
+func (lh *LightHouse) metricTx(t NebulaMeta_MessageType, i int64) {
+	lh.metrics.Tx(NebulaMessageType(t), 0, i)
+}
+
 /*
 /*
 func (f *Interface) sendPathCheck(ci *ConnectionState, endpoint *net.UDPAddr, counter int) {
 func (f *Interface) sendPathCheck(ci *ConnectionState, endpoint *net.UDPAddr, counter int) {
 	c := ci.messageCounter
 	c := ci.messageCounter

+ 2 - 2
lighthouse_test.go

@@ -52,7 +52,7 @@ func Test_lhStaticMapping(t *testing.T) {
 
 
 	udpServer, _ := NewListener("0.0.0.0", 0, true)
 	udpServer, _ := NewListener("0.0.0.0", 0, true)
 
 
-	meh := NewLightHouse(true, 1, []uint32{ip2int(lh1IP)}, 10, 10003, udpServer, false, 1)
+	meh := NewLightHouse(true, 1, []uint32{ip2int(lh1IP)}, 10, 10003, udpServer, false, 1, false)
 	meh.AddRemote(ip2int(lh1IP), NewUDPAddr(ip2int(lh1IP), uint16(4242)), true)
 	meh.AddRemote(ip2int(lh1IP), NewUDPAddr(ip2int(lh1IP), uint16(4242)), true)
 	err := meh.ValidateLHStaticEntries()
 	err := meh.ValidateLHStaticEntries()
 	assert.Nil(t, err)
 	assert.Nil(t, err)
@@ -60,7 +60,7 @@ func Test_lhStaticMapping(t *testing.T) {
 	lh2 := "10.128.0.3"
 	lh2 := "10.128.0.3"
 	lh2IP := net.ParseIP(lh2)
 	lh2IP := net.ParseIP(lh2)
 
 
-	meh = NewLightHouse(true, 1, []uint32{ip2int(lh1IP), ip2int(lh2IP)}, 10, 10003, udpServer, false, 1)
+	meh = NewLightHouse(true, 1, []uint32{ip2int(lh1IP), ip2int(lh2IP)}, 10, 10003, udpServer, false, 1, false)
 	meh.AddRemote(ip2int(lh1IP), NewUDPAddr(ip2int(lh1IP), uint16(4242)), true)
 	meh.AddRemote(ip2int(lh1IP), NewUDPAddr(ip2int(lh1IP), uint16(4242)), true)
 	err = meh.ValidateLHStaticEntries()
 	err = meh.ValidateLHStaticEntries()
 	assert.EqualError(t, err, "Lighthouse 10.128.0.3 does not have a static_host_map entry")
 	assert.EqualError(t, err, "Lighthouse 10.128.0.3 does not have a static_host_map entry")

+ 12 - 0
main.go

@@ -172,6 +172,7 @@ func Main(configPath string, configTest bool, buildVersion string) {
 	hostMap := NewHostMap("main", tunCidr, preferredRanges)
 	hostMap := NewHostMap("main", tunCidr, preferredRanges)
 	hostMap.SetDefaultRoute(ip2int(net.ParseIP(config.GetString("default_route", "0.0.0.0"))))
 	hostMap.SetDefaultRoute(ip2int(net.ParseIP(config.GetString("default_route", "0.0.0.0"))))
 	hostMap.addUnsafeRoutes(&unsafeRoutes)
 	hostMap.addUnsafeRoutes(&unsafeRoutes)
+	hostMap.metricsEnabled = config.GetBool("stats.message_metrics", false)
 
 
 	l.WithField("network", hostMap.vpnCIDR).WithField("preferredRanges", hostMap.preferredRanges).Info("Main HostMap created")
 	l.WithField("network", hostMap.vpnCIDR).WithField("preferredRanges", hostMap.preferredRanges).Info("Main HostMap created")
 
 
@@ -226,6 +227,7 @@ func Main(configPath string, configTest bool, buildVersion string) {
 		udpServer,
 		udpServer,
 		punchy.Respond,
 		punchy.Respond,
 		punchy.Delay,
 		punchy.Delay,
+		config.GetBool("stats.lighthouse_metrics", false),
 	)
 	)
 
 
 	remoteAllowList, err := config.GetAllowList("lighthouse.remote_allow_list", false)
 	remoteAllowList, err := config.GetAllowList("lighthouse.remote_allow_list", false)
@@ -280,10 +282,19 @@ func Main(configPath string, configTest bool, buildVersion string) {
 		l.WithError(err).Error("Lighthouse unreachable")
 		l.WithError(err).Error("Lighthouse unreachable")
 	}
 	}
 
 
+	var messageMetrics *MessageMetrics
+	if config.GetBool("stats.message_metrics", false) {
+		messageMetrics = newMessageMetrics()
+	} else {
+		messageMetrics = newMessageMetricsOnlyRecvError()
+	}
+
 	handshakeConfig := HandshakeConfig{
 	handshakeConfig := HandshakeConfig{
 		tryInterval:  config.GetDuration("handshakes.try_interval", DefaultHandshakeTryInterval),
 		tryInterval:  config.GetDuration("handshakes.try_interval", DefaultHandshakeTryInterval),
 		retries:      config.GetInt("handshakes.retries", DefaultHandshakeRetries),
 		retries:      config.GetInt("handshakes.retries", DefaultHandshakeRetries),
 		waitRotation: config.GetInt("handshakes.wait_rotation", DefaultHandshakeWaitRotation),
 		waitRotation: config.GetInt("handshakes.wait_rotation", DefaultHandshakeWaitRotation),
+
+		messageMetrics: messageMetrics,
 	}
 	}
 
 
 	handshakeManager := NewHandshakeManager(tunCidr, preferredRanges, hostMap, lightHouse, udpServer, handshakeConfig)
 	handshakeManager := NewHandshakeManager(tunCidr, preferredRanges, hostMap, lightHouse, udpServer, handshakeConfig)
@@ -310,6 +321,7 @@ func Main(configPath string, configTest bool, buildVersion string) {
 		DropLocalBroadcast:      config.GetBool("tun.drop_local_broadcast", false),
 		DropLocalBroadcast:      config.GetBool("tun.drop_local_broadcast", false),
 		DropMulticast:           config.GetBool("tun.drop_multicast", false),
 		DropMulticast:           config.GetBool("tun.drop_multicast", false),
 		UDPBatchSize:            config.GetInt("listen.batch", 64),
 		UDPBatchSize:            config.GetInt("listen.batch", 64),
+		MessageMetrics:          messageMetrics,
 	}
 	}
 
 
 	switch ifConfig.Cipher {
 	switch ifConfig.Cipher {

+ 97 - 0
message_metrics.go

@@ -0,0 +1,97 @@
+package nebula
+
+import (
+	"fmt"
+
+	"github.com/rcrowley/go-metrics"
+)
+
+type MessageMetrics struct {
+	rx [][]metrics.Counter
+	tx [][]metrics.Counter
+
+	rxUnknown metrics.Counter
+	txUnknown metrics.Counter
+}
+
+func (m *MessageMetrics) Rx(t NebulaMessageType, s NebulaMessageSubType, i int64) {
+	if m != nil {
+		if t >= 0 && int(t) < len(m.rx) && s >= 0 && int(s) < len(m.rx[t]) {
+			m.rx[t][s].Inc(i)
+		} else if m.rxUnknown != nil {
+			m.rxUnknown.Inc(i)
+		}
+	}
+}
+func (m *MessageMetrics) Tx(t NebulaMessageType, s NebulaMessageSubType, i int64) {
+	if m != nil {
+		if t >= 0 && int(t) < len(m.tx) && s >= 0 && int(s) < len(m.tx[t]) {
+			m.tx[t][s].Inc(i)
+		} else if m.txUnknown != nil {
+			m.txUnknown.Inc(i)
+		}
+	}
+}
+
+func newMessageMetrics() *MessageMetrics {
+	gen := func(t string) [][]metrics.Counter {
+		return [][]metrics.Counter{
+			{
+				metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.handshake_ixpsk0", t), nil),
+			},
+			nil,
+			{metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.recv_error", t), nil)},
+			{metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.lighthouse", t), nil)},
+			{
+				metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.test_request", t), nil),
+				metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.test_response", t), nil),
+			},
+			{metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.close_tunnel", t), nil)},
+		}
+	}
+	return &MessageMetrics{
+		rx: gen("rx"),
+		tx: gen("tx"),
+
+		rxUnknown: metrics.GetOrRegisterCounter("messages.rx.other", nil),
+		txUnknown: metrics.GetOrRegisterCounter("messages.tx.other", nil),
+	}
+}
+
+// Historically we only recorded recv_error, so this is backwards compat
+func newMessageMetricsOnlyRecvError() *MessageMetrics {
+	gen := func(t string) [][]metrics.Counter {
+		return [][]metrics.Counter{
+			nil,
+			nil,
+			{metrics.GetOrRegisterCounter(fmt.Sprintf("messages.%s.recv_error", t), nil)},
+		}
+	}
+	return &MessageMetrics{
+		rx: gen("rx"),
+		tx: gen("tx"),
+	}
+}
+
+func newLighthouseMetrics() *MessageMetrics {
+	gen := func(t string) [][]metrics.Counter {
+		h := make([][]metrics.Counter, len(NebulaMeta_MessageType_name))
+		used := []NebulaMeta_MessageType{
+			NebulaMeta_HostQuery,
+			NebulaMeta_HostQueryReply,
+			NebulaMeta_HostUpdateNotification,
+			NebulaMeta_HostPunchNotification,
+		}
+		for _, i := range used {
+			h[i] = []metrics.Counter{metrics.GetOrRegisterCounter(fmt.Sprintf("lighthouse.%s.%s", t, i.String()), nil)}
+		}
+		return h
+	}
+	return &MessageMetrics{
+		rx: gen("rx"),
+		tx: gen("tx"),
+
+		rxUnknown: metrics.GetOrRegisterCounter("lighthouse.rx.other", nil),
+		txUnknown: metrics.GetOrRegisterCounter("lighthouse.tx.other", nil),
+	}
+}

+ 7 - 3
outside.go

@@ -54,6 +54,7 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte,
 		// Fallthrough to the bottom to record incoming traffic
 		// Fallthrough to the bottom to record incoming traffic
 
 
 	case lightHouse:
 	case lightHouse:
+		f.messageMetrics.Rx(header.Type, header.Subtype, 1)
 		if !f.handleEncrypted(ci, addr, header) {
 		if !f.handleEncrypted(ci, addr, header) {
 			return
 			return
 		}
 		}
@@ -74,6 +75,7 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte,
 		// Fallthrough to the bottom to record incoming traffic
 		// Fallthrough to the bottom to record incoming traffic
 
 
 	case test:
 	case test:
+		f.messageMetrics.Rx(header.Type, header.Subtype, 1)
 		if !f.handleEncrypted(ci, addr, header) {
 		if !f.handleEncrypted(ci, addr, header) {
 			return
 			return
 		}
 		}
@@ -102,15 +104,18 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte,
 		// are unauthenticated
 		// are unauthenticated
 
 
 	case handshake:
 	case handshake:
+		f.messageMetrics.Rx(header.Type, header.Subtype, 1)
 		HandleIncomingHandshake(f, addr, packet, header, hostinfo)
 		HandleIncomingHandshake(f, addr, packet, header, hostinfo)
 		return
 		return
 
 
 	case recvError:
 	case recvError:
+		f.messageMetrics.Rx(header.Type, header.Subtype, 1)
 		// TODO: Remove this with recv_error deprecation
 		// TODO: Remove this with recv_error deprecation
 		f.handleRecvError(addr, header)
 		f.handleRecvError(addr, header)
 		return
 		return
 
 
 	case closeTunnel:
 	case closeTunnel:
+		f.messageMetrics.Rx(header.Type, header.Subtype, 1)
 		if !f.handleEncrypted(ci, addr, header) {
 		if !f.handleEncrypted(ci, addr, header) {
 			return
 			return
 		}
 		}
@@ -122,6 +127,7 @@ func (f *Interface) readOutsidePackets(addr *udpAddr, out []byte, packet []byte,
 		return
 		return
 
 
 	default:
 	default:
+		f.messageMetrics.Rx(header.Type, header.Subtype, 1)
 		hostinfo.logger().Debugf("Unexpected packet received from %s", addr)
 		hostinfo.logger().Debugf("Unexpected packet received from %s", addr)
 		return
 		return
 	}
 	}
@@ -298,7 +304,7 @@ func (f *Interface) decryptToTun(hostinfo *HostInfo, messageCounter uint64, out
 }
 }
 
 
 func (f *Interface) sendRecvError(endpoint *udpAddr, index uint32) {
 func (f *Interface) sendRecvError(endpoint *udpAddr, index uint32) {
-	f.metricTxRecvError.Inc(1)
+	f.messageMetrics.Tx(recvError, 0, 1)
 
 
 	//TODO: this should be a signed message so we can trust that we should drop the index
 	//TODO: this should be a signed message so we can trust that we should drop the index
 	b := HeaderEncode(make([]byte, HeaderLen), Version, uint8(recvError), 0, index, 0)
 	b := HeaderEncode(make([]byte, HeaderLen), Version, uint8(recvError), 0, index, 0)
@@ -311,8 +317,6 @@ func (f *Interface) sendRecvError(endpoint *udpAddr, index uint32) {
 }
 }
 
 
 func (f *Interface) handleRecvError(addr *udpAddr, h *Header) {
 func (f *Interface) handleRecvError(addr *udpAddr, h *Header) {
-	f.metricRxRecvError.Inc(1)
-
 	// This flag is to stop caring about recv_error from old versions
 	// This flag is to stop caring about recv_error from old versions
 	// This should go away when the old version is gone from prod
 	// This should go away when the old version is gone from prod
 	if l.Level >= logrus.DebugLevel {
 	if l.Level >= logrus.DebugLevel {