udp_tester.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. //go:build e2e_testing
  2. // +build e2e_testing
  3. package udp
  4. import (
  5. "fmt"
  6. "io"
  7. "net"
  8. "sync/atomic"
  9. "github.com/sirupsen/logrus"
  10. "github.com/slackhq/nebula/config"
  11. "github.com/slackhq/nebula/firewall"
  12. "github.com/slackhq/nebula/header"
  13. )
  14. type Packet struct {
  15. ToIp net.IP
  16. ToPort uint16
  17. FromIp net.IP
  18. FromPort uint16
  19. Data []byte
  20. }
  21. func (u *Packet) Copy() *Packet {
  22. n := &Packet{
  23. ToIp: make(net.IP, len(u.ToIp)),
  24. ToPort: u.ToPort,
  25. FromIp: make(net.IP, len(u.FromIp)),
  26. FromPort: u.FromPort,
  27. Data: make([]byte, len(u.Data)),
  28. }
  29. copy(n.ToIp, u.ToIp)
  30. copy(n.FromIp, u.FromIp)
  31. copy(n.Data, u.Data)
  32. return n
  33. }
  34. type TesterConn struct {
  35. Addr *Addr
  36. RxPackets chan *Packet // Packets to receive into nebula
  37. TxPackets chan *Packet // Packets transmitted outside by nebula
  38. closed atomic.Bool
  39. l *logrus.Logger
  40. }
  41. func NewListener(l *logrus.Logger, ip net.IP, port int, _ bool, _ int) (Conn, error) {
  42. return &TesterConn{
  43. Addr: &Addr{ip, uint16(port)},
  44. RxPackets: make(chan *Packet, 10),
  45. TxPackets: make(chan *Packet, 10),
  46. l: l,
  47. }, nil
  48. }
  49. // Send will place a UdpPacket onto the receive queue for nebula to consume
  50. // this is an encrypted packet or a handshake message in most cases
  51. // packets were transmitted from another nebula node, you can send them with Tun.Send
  52. func (u *TesterConn) Send(packet *Packet) {
  53. if u.closed.Load() {
  54. return
  55. }
  56. h := &header.H{}
  57. if err := h.Parse(packet.Data); err != nil {
  58. panic(err)
  59. }
  60. if u.l.Level >= logrus.DebugLevel {
  61. u.l.WithField("header", h).
  62. WithField("udpAddr", fmt.Sprintf("%v:%v", packet.FromIp, packet.FromPort)).
  63. WithField("dataLen", len(packet.Data)).
  64. Debug("UDP receiving injected packet")
  65. }
  66. u.RxPackets <- packet
  67. }
  68. // Get will pull a UdpPacket from the transmit queue
  69. // nebula meant to send this message on the network, it will be encrypted
  70. // packets were ingested from the tun side (in most cases), you can send them with Tun.Send
  71. func (u *TesterConn) Get(block bool) *Packet {
  72. if block {
  73. return <-u.TxPackets
  74. }
  75. select {
  76. case p := <-u.TxPackets:
  77. return p
  78. default:
  79. return nil
  80. }
  81. }
  82. //********************************************************************************************************************//
  83. // Below this is boilerplate implementation to make nebula actually work
  84. //********************************************************************************************************************//
  85. func (u *TesterConn) WriteTo(b []byte, addr *Addr) error {
  86. if u.closed.Load() {
  87. return io.ErrClosedPipe
  88. }
  89. p := &Packet{
  90. Data: make([]byte, len(b), len(b)),
  91. FromIp: make([]byte, 16),
  92. FromPort: u.Addr.Port,
  93. ToIp: make([]byte, 16),
  94. ToPort: addr.Port,
  95. }
  96. copy(p.Data, b)
  97. copy(p.ToIp, addr.IP.To16())
  98. copy(p.FromIp, u.Addr.IP.To16())
  99. u.TxPackets <- p
  100. return nil
  101. }
  102. func (u *TesterConn) ListenOut(r EncReader, lhf LightHouseHandlerFunc, cache *firewall.ConntrackCacheTicker, q int) {
  103. plaintext := make([]byte, MTU)
  104. h := &header.H{}
  105. fwPacket := &firewall.Packet{}
  106. ua := &Addr{IP: make([]byte, 16)}
  107. nb := make([]byte, 12, 12)
  108. for {
  109. p, ok := <-u.RxPackets
  110. if !ok {
  111. return
  112. }
  113. ua.Port = p.FromPort
  114. copy(ua.IP, p.FromIp.To16())
  115. r(ua, plaintext[:0], p.Data, h, fwPacket, lhf, nb, q, cache.Get(u.l))
  116. }
  117. }
  118. func (u *TesterConn) ReloadConfig(*config.C) {}
  119. func NewUDPStatsEmitter(_ []Conn) func() {
  120. // No UDP stats for non-linux
  121. return func() {}
  122. }
  123. func (u *TesterConn) LocalAddr() (*Addr, error) {
  124. return u.Addr, nil
  125. }
  126. func (u *TesterConn) Rebind() error {
  127. return nil
  128. }
  129. func (u *TesterConn) Close() error {
  130. if u.closed.CompareAndSwap(false, true) {
  131. close(u.RxPackets)
  132. close(u.TxPackets)
  133. }
  134. return nil
  135. }