tun_tester.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. //go:build e2e_testing
  2. // +build e2e_testing
  3. package overlay
  4. import (
  5. "fmt"
  6. "io"
  7. "net/netip"
  8. "os"
  9. "sync/atomic"
  10. "github.com/gaissmai/bart"
  11. "github.com/sirupsen/logrus"
  12. "github.com/slackhq/nebula/config"
  13. "github.com/slackhq/nebula/packet"
  14. "github.com/slackhq/nebula/routing"
  15. )
  16. type TestTun struct {
  17. Device string
  18. vpnNetworks []netip.Prefix
  19. Routes []Route
  20. routeTree *bart.Table[routing.Gateways]
  21. l *logrus.Logger
  22. closed atomic.Bool
  23. rxPackets chan []byte // Packets to receive into nebula
  24. TxPackets chan []byte // Packets transmitted outside by nebula
  25. buffers [][]byte
  26. }
  27. func newTun(c *config.C, l *logrus.Logger, vpnNetworks []netip.Prefix, _ bool) (*TestTun, error) {
  28. _, routes, err := getAllRoutesFromConfig(c, vpnNetworks, true)
  29. if err != nil {
  30. return nil, err
  31. }
  32. routeTree, err := makeRouteTree(l, routes, false)
  33. if err != nil {
  34. return nil, err
  35. }
  36. return &TestTun{
  37. Device: c.GetString("tun.dev", ""),
  38. vpnNetworks: vpnNetworks,
  39. Routes: routes,
  40. routeTree: routeTree,
  41. l: l,
  42. rxPackets: make(chan []byte, 10),
  43. TxPackets: make(chan []byte, 10),
  44. }, nil
  45. }
  46. func newTunFromFd(_ *config.C, _ *logrus.Logger, _ int, _ []netip.Prefix) (*TestTun, error) {
  47. return nil, fmt.Errorf("newTunFromFd not supported")
  48. }
  49. // Send will place a byte array onto the receive queue for nebula to consume
  50. // These are unencrypted ip layer frames destined for another nebula node.
  51. // packets should exit the udp side, capture them with udpConn.Get
  52. func (t *TestTun) Send(packet []byte) {
  53. if t.closed.Load() {
  54. return
  55. }
  56. if t.l.Level >= logrus.DebugLevel {
  57. t.l.WithField("dataLen", len(packet)).Debug("Tun receiving injected packet")
  58. }
  59. t.rxPackets <- packet
  60. }
  61. // Get will pull an unencrypted ip layer frame from the transmit queue
  62. // nebula meant to send this message to some application on the local system
  63. // packets were ingested from the udp side, you can send them with udpConn.Send
  64. func (t *TestTun) Get(block bool) []byte {
  65. if block {
  66. return <-t.TxPackets
  67. }
  68. select {
  69. case p := <-t.TxPackets:
  70. return p
  71. default:
  72. return nil
  73. }
  74. }
  75. //********************************************************************************************************************//
  76. // Below this is boilerplate implementation to make nebula actually work
  77. //********************************************************************************************************************//
  78. func (t *TestTun) RoutesFor(ip netip.Addr) routing.Gateways {
  79. r, _ := t.routeTree.Lookup(ip)
  80. return r
  81. }
  82. func (t *TestTun) Activate() error {
  83. return nil
  84. }
  85. func (t *TestTun) Networks() []netip.Prefix {
  86. return t.vpnNetworks
  87. }
  88. func (t *TestTun) Name() string {
  89. return t.Device
  90. }
  91. func (t *TestTun) ReadMany(x []TunPacket, q int) (int, error) {
  92. p, ok := <-t.rxPackets
  93. if !ok {
  94. return 0, os.ErrClosed
  95. }
  96. x[0].Payload = p
  97. return 1, nil
  98. }
  99. func (t *TestTun) AllocSeg(pkt *packet.OutPacket, q int) (int, error) {
  100. buf := make([]byte, 9000)
  101. t.buffers = append(t.buffers, buf)
  102. idx := len(t.buffers) - 1
  103. isV6 := false //todo?
  104. x := pkt.UseSegment(uint16(idx), buf, isV6)
  105. return x, nil
  106. }
  107. func (t *TestTun) Write(b []byte) (int, error) {
  108. //todo garbagey
  109. out := packet.NewOut()
  110. x, err := t.AllocSeg(out, 0)
  111. if err != nil {
  112. return 0, err
  113. }
  114. copy(out.SegmentPayloads[x], b)
  115. return t.WriteOne(out, true, 0)
  116. }
  117. func (t *TestTun) WriteOne(x *packet.OutPacket, kick bool, q int) (int, error) {
  118. if t.closed.Load() {
  119. return 0, io.ErrClosedPipe
  120. }
  121. if len(x.SegmentIDs) == 0 {
  122. return 0, nil
  123. }
  124. for i, _ := range x.SegmentIDs {
  125. t.TxPackets <- x.SegmentPayloads[i]
  126. }
  127. //todo if kick, delete alloced seg
  128. return 1, nil
  129. }
  130. func (t *TestTun) WriteMany(x []*packet.OutPacket, q int) (int, error) {
  131. if len(x) == 0 {
  132. return 0, nil
  133. }
  134. for _, pkt := range x {
  135. _, err := t.WriteOne(pkt, true, q)
  136. if err != nil {
  137. return 0, err
  138. }
  139. }
  140. return len(x), nil
  141. }
  142. func (t *TestTun) RecycleRxSeg(pkt *TunPacket, kick bool, q int) error {
  143. //todo this ought to maybe track something
  144. return nil
  145. }
  146. func (t *TestTun) Close() error {
  147. if t.closed.CompareAndSwap(false, true) {
  148. close(t.rxPackets)
  149. close(t.TxPackets)
  150. }
  151. return nil
  152. }
  153. func (t *TestTun) SupportsMultiqueue() bool {
  154. return false
  155. }
  156. func (t *TestTun) NewMultiQueueReader() (TunDev, error) {
  157. return nil, fmt.Errorf("TODO: multiqueue not implemented")
  158. }