瀏覽代碼

Add relay e2e tests and output some mermaid sequence diagrams (#691)

Nate Brown 3 年之前
父節點
當前提交
0d1ee4214a
共有 10 個文件被更改,包括 416 次插入32 次删除
  1. 12 0
      .github/workflows/test.yml
  2. 1 0
      .gitignore
  3. 22 0
      control_tester.go
  4. 49 17
      e2e/handshakes_test.go
  5. 11 1
      e2e/helpers_test.go
  6. 307 8
      e2e/router/router.go
  7. 5 1
      header/header.go
  8. 5 1
      header/header_test.go
  9. 2 2
      overlay/tun_tester.go
  10. 2 2
      udp/udp_tester.go

+ 12 - 0
.github/workflows/test.yml

@@ -43,6 +43,12 @@ jobs:
     - name: End 2 end
       run: make e2evv
 
+    - uses: actions/upload-artifact@v3
+      with:
+        name: e2e packet flow
+        path: e2e/mermaid/
+        if-no-files-found: warn
+
   test:
     name: Build and test on ${{ matrix.os }}
     runs-on: ${{ matrix.os }}
@@ -78,3 +84,9 @@ jobs:
 
     - name: End 2 end
       run: make e2evv
+
+    - uses: actions/upload-artifact@v3
+      with:
+        name: e2e packet flow
+        path: e2e/mermaid/
+        if-no-files-found: warn

+ 1 - 0
.gitignore

@@ -10,3 +10,4 @@
 /cpu.pprof
 /build
 /*.tar.gz
+/e2e/mermaid/

+ 22 - 0
control_tester.go

@@ -63,6 +63,24 @@ func (c *Control) InjectLightHouseAddr(vpnIp net.IP, toAddr *net.UDPAddr) {
 	}
 }
 
+// InjectRelays will push relayVpnIps into the local lighthouse cache for the vpnIp
+// This is necessary to inform an initiator of possible relays for communicating with a responder
+func (c *Control) InjectRelays(vpnIp net.IP, relayVpnIps []net.IP) {
+	c.f.lightHouse.Lock()
+	remoteList := c.f.lightHouse.unlockedGetRemoteList(iputil.Ip2VpnIp(vpnIp))
+	remoteList.Lock()
+	defer remoteList.Unlock()
+	c.f.lightHouse.Unlock()
+
+	iVpnIp := iputil.Ip2VpnIp(vpnIp)
+	uVpnIp := []uint32{}
+	for _, rVPnIp := range relayVpnIps {
+		uVpnIp = append(uVpnIp, uint32(iputil.Ip2VpnIp(rVPnIp)))
+	}
+
+	remoteList.unlockedSetRelay(iVpnIp, iVpnIp, uVpnIp)
+}
+
 // GetFromTun will pull a packet off the tun side of nebula
 func (c *Control) GetFromTun(block bool) []byte {
 	return c.f.inside.(*overlay.TestTun).Get(block)
@@ -118,6 +136,10 @@ func (c *Control) InjectTunUDPPacket(toIp net.IP, toPort uint16, fromPort uint16
 	c.f.inside.(*overlay.TestTun).Send(buffer.Bytes())
 }
 
+func (c *Control) GetVpnIp() iputil.VpnIp {
+	return c.f.myVpnIp
+}
+
 func (c *Control) GetUDPAddr() string {
 	return c.f.outside.Addr.String()
 }

+ 49 - 17
e2e/handshakes_test.go

@@ -18,8 +18,8 @@ import (
 
 func TestGoodHandshake(t *testing.T) {
 	ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{})
-	myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 1})
-	theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 2})
+	myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 1}, nil)
+	theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 2}, nil)
 
 	// Put their info in our lighthouse
 	myControl.InjectLightHouseAddr(theirVpnIp, theirUdpAddr)
@@ -57,7 +57,9 @@ func TestGoodHandshake(t *testing.T) {
 	assertUdpPacket(t, []byte("Hi from me"), myCachedPacket, myVpnIp, theirVpnIp, 80, 80)
 
 	t.Log("Do a bidirectional tunnel test")
-	assertTunnel(t, myVpnIp, theirVpnIp, myControl, theirControl, router.NewR(myControl, theirControl))
+	r := router.NewR(t, myControl, theirControl)
+	defer r.RenderFlow()
+	assertTunnel(t, myVpnIp, theirVpnIp, myControl, theirControl, r)
 
 	myControl.Stop()
 	theirControl.Stop()
@@ -70,9 +72,9 @@ func TestWrongResponderHandshake(t *testing.T) {
 	// The IPs here are chosen on purpose:
 	// The current remote handling will sort by preference, public, and then lexically.
 	// So we need them to have a higher address than evil (we could apply a preference though)
-	myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 100})
-	theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 99})
-	evilControl, evilVpnIp, evilUdpAddr := newSimpleServer(ca, caKey, "evil", net.IP{10, 0, 0, 2})
+	myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me", net.IP{10, 0, 0, 100}, nil)
+	theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 99}, nil)
+	evilControl, evilVpnIp, evilUdpAddr := newSimpleServer(ca, caKey, "evil", net.IP{10, 0, 0, 2}, nil)
 
 	// Add their real udp addr, which should be tried after evil.
 	myControl.InjectLightHouseAddr(theirVpnIp, theirUdpAddr)
@@ -81,7 +83,8 @@ func TestWrongResponderHandshake(t *testing.T) {
 	myControl.InjectLightHouseAddr(theirVpnIp, evilUdpAddr)
 
 	// Build a router so we don't have to reason who gets which packet
-	r := router.NewR(myControl, theirControl, evilControl)
+	r := router.NewR(t, myControl, theirControl, evilControl)
+	defer r.RenderFlow()
 
 	// Start the servers
 	myControl.Start()
@@ -130,15 +133,16 @@ func TestWrongResponderHandshake(t *testing.T) {
 
 func Test_Case1_Stage1Race(t *testing.T) {
 	ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{})
-	myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me  ", net.IP{10, 0, 0, 1})
-	theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 2})
+	myControl, myVpnIp, myUdpAddr := newSimpleServer(ca, caKey, "me  ", net.IP{10, 0, 0, 1}, nil)
+	theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them", net.IP{10, 0, 0, 2}, nil)
 
 	// Put their info in our lighthouse and vice versa
 	myControl.InjectLightHouseAddr(theirVpnIp, theirUdpAddr)
 	theirControl.InjectLightHouseAddr(myVpnIp, myUdpAddr)
 
 	// Build a router so we don't have to reason who gets which packet
-	r := router.NewR(myControl, theirControl)
+	r := router.NewR(t, myControl, theirControl)
+	defer r.RenderFlow()
 
 	// Start the servers
 	myControl.Start()
@@ -152,16 +156,16 @@ func Test_Case1_Stage1Race(t *testing.T) {
 	myHsForThem := myControl.GetFromUDP(true)
 	theirHsForMe := theirControl.GetFromUDP(true)
 
-	t.Log("Now inject both stage 1 handshake packets")
-	myControl.InjectUDPPacket(theirHsForMe)
-	theirControl.InjectUDPPacket(myHsForThem)
+	r.Log("Now inject both stage 1 handshake packets")
+	r.InjectUDPPacket(theirControl, myControl, theirHsForMe)
+	r.InjectUDPPacket(myControl, theirControl, myHsForThem)
 	//TODO: they should win, grab their index for me and make sure I use it in the end.
 
-	t.Log("They should not have a stage 2 (won the race) but I should send one")
-	theirControl.InjectUDPPacket(myControl.GetFromUDP(true))
+	r.Log("They should not have a stage 2 (won the race) but I should send one")
+	r.InjectUDPPacket(myControl, theirControl, myControl.GetFromUDP(true))
 
-	t.Log("Route for me until I send a message packet to them")
-	myControl.WaitForType(1, 0, theirControl)
+	r.Log("Route for me until I send a message packet to them")
+	r.RouteForAllUntilAfterMsgTypeTo(theirControl, header.Message, header.MessageNone)
 
 	t.Log("My cached packet should be received by them")
 	myCachedPacket := theirControl.GetFromTun(true)
@@ -182,4 +186,32 @@ func Test_Case1_Stage1Race(t *testing.T) {
 	//TODO: assert hostmaps
 }
 
+func TestRelays(t *testing.T) {
+	ca, _, caKey, _ := newTestCaCert(time.Now(), time.Now().Add(10*time.Minute), []*net.IPNet{}, []*net.IPNet{}, []string{})
+	myControl, myVpnIp, _ := newSimpleServer(ca, caKey, "me     ", net.IP{10, 0, 0, 1}, m{"relay": m{"use_relays": true}})
+	relayControl, relayVpnIp, relayUdpAddr := newSimpleServer(ca, caKey, "relay  ", net.IP{10, 0, 0, 128}, m{"relay": m{"am_relay": true}})
+	theirControl, theirVpnIp, theirUdpAddr := newSimpleServer(ca, caKey, "them   ", net.IP{10, 0, 0, 2}, m{"relay": m{"use_relays": true}})
+
+	// Teach my how to get to the relay and that their can be reached via the relay
+	myControl.InjectLightHouseAddr(relayVpnIp, relayUdpAddr)
+	myControl.InjectRelays(theirVpnIp, []net.IP{relayVpnIp})
+	relayControl.InjectLightHouseAddr(theirVpnIp, theirUdpAddr)
+
+	// Build a router so we don't have to reason who gets which packet
+	r := router.NewR(t, myControl, relayControl, theirControl)
+	defer r.RenderFlow()
+
+	// Start the servers
+	myControl.Start()
+	relayControl.Start()
+	theirControl.Start()
+
+	t.Log("Trigger a handshake from me to them via the relay")
+	myControl.InjectTunUDPPacket(theirVpnIp, 80, 80, []byte("Hi from me"))
+
+	p := r.RouteForAllUntilTxTun(theirControl)
+	assertUdpPacket(t, []byte("Hi from me"), p, myVpnIp, theirVpnIp, 80, 80)
+	//TODO: assert we actually used the relay even though it should be impossible for a tunnel to have occurred without it
+}
+
 //TODO: add a test with many lies

+ 11 - 1
e2e/helpers_test.go

@@ -15,6 +15,7 @@ import (
 
 	"github.com/google/gopacket"
 	"github.com/google/gopacket/layers"
+	"github.com/imdario/mergo"
 	"github.com/sirupsen/logrus"
 	"github.com/slackhq/nebula"
 	"github.com/slackhq/nebula/cert"
@@ -30,7 +31,7 @@ import (
 type m map[string]interface{}
 
 // newSimpleServer creates a nebula instance with many assumptions
-func newSimpleServer(caCrt *cert.NebulaCertificate, caKey []byte, name string, udpIp net.IP) (*nebula.Control, net.IP, *net.UDPAddr) {
+func newSimpleServer(caCrt *cert.NebulaCertificate, caKey []byte, name string, udpIp net.IP, overrides m) (*nebula.Control, net.IP, *net.UDPAddr) {
 	l := NewTestLogger()
 
 	vpnIpNet := &net.IPNet{IP: make([]byte, len(udpIp)), Mask: net.IPMask{255, 255, 255, 0}}
@@ -78,6 +79,15 @@ func newSimpleServer(caCrt *cert.NebulaCertificate, caKey []byte, name string, u
 			"level":            l.Level.String(),
 		},
 	}
+
+	if overrides != nil {
+		err = mergo.Merge(&overrides, mc, mergo.WithAppendSlice)
+		if err != nil {
+			panic(err)
+		}
+		mc = overrides
+	}
+
 	cb, err := yaml.Marshal(mc)
 	if err != nil {
 		panic(err)

+ 307 - 8
e2e/router/router.go

@@ -4,14 +4,23 @@
 package router
 
 import (
+	"context"
 	"fmt"
 	"net"
+	"os"
+	"path/filepath"
 	"reflect"
 	"strconv"
+	"strings"
 	"sync"
+	"testing"
+	"time"
 
+	"github.com/google/gopacket"
+	"github.com/google/gopacket/layers"
 	"github.com/slackhq/nebula"
 	"github.com/slackhq/nebula/header"
+	"github.com/slackhq/nebula/iputil"
 	"github.com/slackhq/nebula/udp"
 )
 
@@ -28,38 +37,93 @@ type R struct {
 	// map[from address + ":" + to address] => ip:port to rewrite in the udp packet to receiver
 	outNat map[string]net.UDPAddr
 
+	// A map of vpn ip to the nebula control it belongs to
+	vpnControls map[iputil.VpnIp]*nebula.Control
+
+	flow []flowEntry
+
 	// All interactions are locked to help serialize behavior
 	sync.Mutex
+
+	fn           string
+	cancelRender context.CancelFunc
+	t            *testing.T
+}
+
+type flowEntry struct {
+	note   string
+	packet *packet
+}
+
+type packet struct {
+	from   *nebula.Control
+	to     *nebula.Control
+	packet *udp.Packet
+	tun    bool // a packet pulled off a tun device
+	rx     bool // the packet was received by a udp device
 }
 
 type ExitType int
 
 const (
-	// Keeps routing, the function will get called again on the next packet
+	// KeepRouting the function will get called again on the next packet
 	KeepRouting ExitType = 0
-	// Does not route this packet and exits immediately
+	// ExitNow does not route this packet and exits immediately
 	ExitNow ExitType = 1
-	// Routes this packet and exits immediately afterwards
+	// RouteAndExit routes this packet and exits immediately afterwards
 	RouteAndExit ExitType = 2
 )
 
 type ExitFunc func(packet *udp.Packet, receiver *nebula.Control) ExitType
 
-func NewR(controls ...*nebula.Control) *R {
+// NewR creates a new router to pass packets in a controlled fashion between the provided controllers.
+// The packet flow will be recorded in a file within the mermaid directory under the same name as the test.
+// Renders will occur automatically, roughly every 100ms, until a call to RenderFlow() is made
+func NewR(t *testing.T, controls ...*nebula.Control) *R {
+	ctx, cancel := context.WithCancel(context.Background())
+
+	if err := os.MkdirAll("mermaid", 0755); err != nil {
+		panic(err)
+	}
+
 	r := &R{
-		controls: make(map[string]*nebula.Control),
-		inNat:    make(map[string]*nebula.Control),
-		outNat:   make(map[string]net.UDPAddr),
+		controls:     make(map[string]*nebula.Control),
+		vpnControls:  make(map[iputil.VpnIp]*nebula.Control),
+		inNat:        make(map[string]*nebula.Control),
+		outNat:       make(map[string]net.UDPAddr),
+		fn:           filepath.Join("mermaid", fmt.Sprintf("%s.md", t.Name())),
+		t:            t,
+		cancelRender: cancel,
 	}
 
+	// Try to remove our render file
+	os.Remove(r.fn)
+
 	for _, c := range controls {
 		addr := c.GetUDPAddr()
 		if _, ok := r.controls[addr]; ok {
 			panic("Duplicate listen address: " + addr)
 		}
+
+		r.vpnControls[c.GetVpnIp()] = c
 		r.controls[addr] = c
 	}
 
+	// Spin the renderer in case we go nuts and the test never completes
+	go func() {
+		clockSource := time.NewTicker(time.Millisecond * 100)
+		defer clockSource.Stop()
+
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case <-clockSource.C:
+				r.renderFlow()
+			}
+		}
+	}()
+
 	return r
 }
 
@@ -78,6 +142,112 @@ func (r *R) AddRoute(ip net.IP, port uint16, c *nebula.Control) {
 	r.inNat[inAddr] = c
 }
 
+// RenderFlow renders the packet flow seen up until now and stops further automatic renders from happening.
+func (r *R) RenderFlow() {
+	r.cancelRender()
+	r.renderFlow()
+}
+
+func (r *R) renderFlow() {
+	f, err := os.OpenFile(r.fn, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644)
+	if err != nil {
+		panic(err)
+	}
+
+	var participants = map[string]struct{}{}
+	var participansVals []string
+
+	fmt.Fprintln(f, "```mermaid")
+	fmt.Fprintln(f, "sequenceDiagram")
+
+	// Assemble participants
+	for _, e := range r.flow {
+		if e.packet == nil {
+			continue
+		}
+
+		addr := e.packet.from.GetUDPAddr()
+		if _, ok := participants[addr]; ok {
+			continue
+		}
+		participants[addr] = struct{}{}
+		sanAddr := strings.Replace(addr, ":", "#58;", 1)
+		participansVals = append(participansVals, sanAddr)
+		fmt.Fprintf(
+			f, "    participant %s as Nebula: %s<br/>UDP: %s\n",
+			sanAddr, e.packet.from.GetVpnIp(), sanAddr,
+		)
+	}
+
+	// Print packets
+	h := &header.H{}
+	for _, e := range r.flow {
+		if e.packet == nil {
+			fmt.Fprintf(f, "    note over %s: %s\n", strings.Join(participansVals, ", "), e.note)
+			continue
+		}
+
+		p := e.packet
+		if p.tun {
+			fmt.Fprintln(f, r.formatUdpPacket(p))
+
+		} else {
+			if err := h.Parse(p.packet.Data); err != nil {
+				panic(err)
+			}
+
+			line := "--x"
+			if p.rx {
+				line = "->>"
+			}
+
+			fmt.Fprintf(f,
+				"    %s%s%s: %s(%s), counter: %v\n",
+				strings.Replace(p.from.GetUDPAddr(), ":", "#58;", 1),
+				line,
+				strings.Replace(p.to.GetUDPAddr(), ":", "#58;", 1),
+				h.TypeName(), h.SubTypeName(), h.MessageCounter,
+			)
+		}
+	}
+	fmt.Fprintln(f, "```")
+}
+
+// InjectFlow can be used to record packet flow if the test is handling the routing on its own.
+// The packet is assumed to have been received
+func (r *R) InjectFlow(from, to *nebula.Control, p *udp.Packet) {
+	r.Lock()
+	defer r.Unlock()
+	r.unlockedInjectFlow(from, to, p, false)
+}
+
+func (r *R) Log(arg ...any) {
+	r.Lock()
+	r.flow = append(r.flow, flowEntry{note: fmt.Sprint(arg...)})
+	r.t.Log(arg...)
+	r.Unlock()
+}
+
+func (r *R) Logf(format string, arg ...any) {
+	r.Lock()
+	r.flow = append(r.flow, flowEntry{note: fmt.Sprintf(format, arg...)})
+	r.t.Logf(format, arg...)
+	r.Unlock()
+}
+
+// unlockedInjectFlow is used by the router to record a packet has been transmitted, the packet is returned and
+// should be marked as received AFTER it has been placed on the receivers channel
+func (r *R) unlockedInjectFlow(from, to *nebula.Control, p *udp.Packet, tun bool) *packet {
+	fp := &packet{
+		from:   from,
+		to:     to,
+		packet: p.Copy(),
+		tun:    tun,
+	}
+	r.flow = append(r.flow, flowEntry{packet: fp})
+	return fp
+}
+
 // OnceFrom will route a single packet from sender then return
 // If the router doesn't have the nebula controller for that address, we panic
 func (r *R) OnceFrom(sender *nebula.Control) {
@@ -96,6 +266,11 @@ func (r *R) RouteUntilTxTun(sender *nebula.Control, receiver *nebula.Control) []
 		select {
 		// Maybe we already have something on the tun for us
 		case b := <-tunTx:
+			r.Lock()
+			np := udp.Packet{Data: make([]byte, len(b))}
+			copy(np.Data, b)
+			r.unlockedInjectFlow(receiver, receiver, &np, true)
+			r.Unlock()
 			return b
 
 		// Nope, lets push the sender along
@@ -108,13 +283,73 @@ func (r *R) RouteUntilTxTun(sender *nebula.Control, receiver *nebula.Control) []
 				r.Unlock()
 				panic("No control for udp tx")
 			}
-
+			fp := r.unlockedInjectFlow(sender, c, p, false)
 			c.InjectUDPPacket(p)
+			fp.rx = true
 			r.Unlock()
 		}
 	}
 }
 
+// RouteForAllUntilTxTun will route for everyone and return when a packet is seen on receivers tun
+// If the router doesn't have the nebula controller for that address, we panic
+func (r *R) RouteForAllUntilTxTun(receiver *nebula.Control) []byte {
+	sc := make([]reflect.SelectCase, len(r.controls)+1)
+	cm := make([]*nebula.Control, len(r.controls)+1)
+
+	i := 0
+	sc[i] = reflect.SelectCase{
+		Dir:  reflect.SelectRecv,
+		Chan: reflect.ValueOf(receiver.GetTunTxChan()),
+		Send: reflect.Value{},
+	}
+	cm[i] = receiver
+
+	i++
+	for _, c := range r.controls {
+		sc[i] = reflect.SelectCase{
+			Dir:  reflect.SelectRecv,
+			Chan: reflect.ValueOf(c.GetUDPTxChan()),
+			Send: reflect.Value{},
+		}
+
+		cm[i] = c
+		i++
+	}
+
+	for {
+		x, rx, _ := reflect.Select(sc)
+		r.Lock()
+
+		if x == 0 {
+			// we are the tun tx, we can exit
+			p := rx.Interface().([]byte)
+			np := udp.Packet{Data: make([]byte, len(p))}
+			copy(np.Data, p)
+
+			r.unlockedInjectFlow(cm[x], cm[x], &np, true)
+			r.Unlock()
+			return p
+
+		} else {
+			// we are a udp tx, route and continue
+			p := rx.Interface().(*udp.Packet)
+			outAddr := cm[x].GetUDPAddr()
+
+			inAddr := net.JoinHostPort(p.ToIp.String(), fmt.Sprintf("%v", p.ToPort))
+			c := r.getControl(outAddr, inAddr, p)
+			if c == nil {
+				r.Unlock()
+				panic("No control for udp tx")
+			}
+			fp := r.unlockedInjectFlow(cm[x], c, p, false)
+			c.InjectUDPPacket(p)
+			fp.rx = true
+		}
+		r.Unlock()
+	}
+}
+
 // RouteExitFunc will call the whatDo func with each udp packet from sender.
 // whatDo can return:
 //   - exitNow: the packet will not be routed and this call will return immediately
@@ -144,12 +379,16 @@ func (r *R) RouteExitFunc(sender *nebula.Control, whatDo ExitFunc) {
 			return
 
 		case RouteAndExit:
+			fp := r.unlockedInjectFlow(sender, receiver, p, false)
 			receiver.InjectUDPPacket(p)
+			fp.rx = true
 			r.Unlock()
 			return
 
 		case KeepRouting:
+			fp := r.unlockedInjectFlow(sender, receiver, p, false)
 			receiver.InjectUDPPacket(p)
+			fp.rx = true
 
 		default:
 			panic(fmt.Sprintf("Unknown exitFunc return: %v", e))
@@ -175,6 +414,34 @@ func (r *R) RouteUntilAfterMsgType(sender *nebula.Control, msgType header.Messag
 	})
 }
 
+func (r *R) RouteForAllUntilAfterMsgTypeTo(receiver *nebula.Control, msgType header.MessageType, subType header.MessageSubType) {
+	h := &header.H{}
+	r.RouteForAllExitFunc(func(p *udp.Packet, r *nebula.Control) ExitType {
+		if r != receiver {
+			return KeepRouting
+		}
+
+		if err := h.Parse(p.Data); err != nil {
+			panic(err)
+		}
+
+		if h.Type == msgType && h.Subtype == subType {
+			return RouteAndExit
+		}
+
+		return KeepRouting
+	})
+}
+
+func (r *R) InjectUDPPacket(sender, receiver *nebula.Control, packet *udp.Packet) {
+	r.Lock()
+	defer r.Unlock()
+
+	fp := r.unlockedInjectFlow(sender, receiver, packet, false)
+	receiver.InjectUDPPacket(packet)
+	fp.rx = true
+}
+
 // RouteForUntilAfterToAddr will route for sender and return only after it sees and sends a packet destined for toAddr
 // finish can be any of the exitType values except `keepRouting`, the default value is `routeAndExit`
 // If the router doesn't have the nebula controller for that address, we panic
@@ -234,12 +501,16 @@ func (r *R) RouteForAllExitFunc(whatDo ExitFunc) {
 			return
 
 		case RouteAndExit:
+			fp := r.unlockedInjectFlow(cm[x], receiver, p, false)
 			receiver.InjectUDPPacket(p)
+			fp.rx = true
 			r.Unlock()
 			return
 
 		case KeepRouting:
+			fp := r.unlockedInjectFlow(cm[x], receiver, p, false)
 			receiver.InjectUDPPacket(p)
+			fp.rx = true
 
 		default:
 			panic(fmt.Sprintf("Unknown exitFunc return: %v", e))
@@ -321,3 +592,31 @@ func (r *R) getControl(fromAddr, toAddr string, p *udp.Packet) *nebula.Control {
 
 	return r.controls[toAddr]
 }
+
+func (r *R) formatUdpPacket(p *packet) string {
+	packet := gopacket.NewPacket(p.packet.Data, layers.LayerTypeIPv4, gopacket.Lazy)
+	v4 := packet.Layer(layers.LayerTypeIPv4).(*layers.IPv4)
+	if v4 == nil {
+		panic("not an ipv4 packet")
+	}
+
+	from := "unknown"
+	if c, ok := r.vpnControls[iputil.Ip2VpnIp(v4.SrcIP)]; ok {
+		from = c.GetUDPAddr()
+	}
+
+	udp := packet.Layer(layers.LayerTypeUDP).(*layers.UDP)
+	if udp == nil {
+		panic("not a udp packet")
+	}
+
+	data := packet.ApplicationLayer()
+	return fmt.Sprintf(
+		"    %s-->>%s: src port: %v<br/>dest port: %v<br/>data: \"%v\"\n",
+		strings.Replace(from, ":", "#58;", 1),
+		strings.Replace(p.to.GetUDPAddr(), ":", "#58;", 1),
+		udp.SrcPort,
+		udp.DstPort,
+		string(data.Payload()),
+	)
+}

+ 5 - 1
header/header.go

@@ -46,6 +46,7 @@ var typeMap = map[MessageType]string{
 	LightHouse:  "lightHouse",
 	Test:        "test",
 	CloseTunnel: "closeTunnel",
+	Control:     "control",
 }
 
 const (
@@ -73,7 +74,10 @@ var subTypeTestMap = map[MessageSubType]string{
 var subTypeNoneMap = map[MessageSubType]string{0: "none"}
 
 var subTypeMap = map[MessageType]*map[MessageSubType]string{
-	Message:     &subTypeNoneMap,
+	Message: {
+		MessageNone:  "none",
+		MessageRelay: "relay",
+	},
 	RecvError:   &subTypeNoneMap,
 	LightHouse:  &subTypeNoneMap,
 	Test:        &subTypeTestMap,

+ 5 - 1
header/header_test.go

@@ -82,10 +82,14 @@ func TestTypeMap(t *testing.T) {
 		LightHouse:  "lightHouse",
 		Test:        "test",
 		CloseTunnel: "closeTunnel",
+		Control:     "control",
 	}, typeMap)
 
 	assert.Equal(t, map[MessageType]*map[MessageSubType]string{
-		Message:     &subTypeNoneMap,
+		Message: {
+			MessageNone:  "none",
+			MessageRelay: "relay",
+		},
 		RecvError:   &subTypeNoneMap,
 		LightHouse:  &subTypeNoneMap,
 		Test:        &subTypeTestMap,

+ 2 - 2
overlay/tun_tester.go

@@ -36,8 +36,8 @@ func newTun(l *logrus.Logger, deviceName string, cidr *net.IPNet, _ int, routes
 		Routes:    routes,
 		routeTree: routeTree,
 		l:         l,
-		rxPackets: make(chan []byte, 1),
-		TxPackets: make(chan []byte, 1),
+		rxPackets: make(chan []byte, 10),
+		TxPackets: make(chan []byte, 10),
 	}, nil
 }
 

+ 2 - 2
udp/udp_tester.go

@@ -48,8 +48,8 @@ type Conn struct {
 func NewListener(l *logrus.Logger, ip string, port int, _ bool, _ int) (*Conn, error) {
 	return &Conn{
 		Addr:      &Addr{net.ParseIP(ip), uint16(port)},
-		RxPackets: make(chan *Packet, 1),
-		TxPackets: make(chan *Packet, 1),
+		RxPackets: make(chan *Packet, 10),
+		TxPackets: make(chan *Packet, 10),
 		l:         l,
 	}, nil
 }