3
0

lighthouse.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608
  1. package nebula
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "sync"
  7. "time"
  8. "github.com/golang/protobuf/proto"
  9. "github.com/rcrowley/go-metrics"
  10. "github.com/sirupsen/logrus"
  11. "github.com/slackhq/nebula/cert"
  12. )
  13. var ErrHostNotKnown = errors.New("host not known")
  14. type LightHouse struct {
  15. sync.RWMutex //Because we concurrently read and write to our maps
  16. amLighthouse bool
  17. myIp uint32
  18. punchConn *udpConn
  19. // Local cache of answers from light houses
  20. addrMap map[uint32][]*udpAddr
  21. // filters remote addresses allowed for each host
  22. // - When we are a lighthouse, this filters what addresses we store and
  23. // respond with.
  24. // - When we are not a lighthouse, this filters which addresses we accept
  25. // from lighthouses.
  26. remoteAllowList *AllowList
  27. // filters local addresses that we advertise to lighthouses
  28. localAllowList *AllowList
  29. // used to trigger the HandshakeManager when we receive HostQueryReply
  30. handshakeTrigger chan<- uint32
  31. // staticList exists to avoid having a bool in each addrMap entry
  32. // since static should be rare
  33. staticList map[uint32]struct{}
  34. lighthouses map[uint32]struct{}
  35. interval int
  36. nebulaPort uint32 // 32 bits because protobuf does not have a uint16
  37. punchBack bool
  38. punchDelay time.Duration
  39. metrics *MessageMetrics
  40. metricHolepunchTx metrics.Counter
  41. l *logrus.Logger
  42. }
  43. type EncWriter interface {
  44. SendMessageToVpnIp(t NebulaMessageType, st NebulaMessageSubType, vpnIp uint32, p, nb, out []byte)
  45. SendMessageToAll(t NebulaMessageType, st NebulaMessageSubType, vpnIp uint32, p, nb, out []byte)
  46. }
  47. func NewLightHouse(l *logrus.Logger, amLighthouse bool, myIp uint32, ips []uint32, interval int, nebulaPort uint32, pc *udpConn, punchBack bool, punchDelay time.Duration, metricsEnabled bool) *LightHouse {
  48. h := LightHouse{
  49. amLighthouse: amLighthouse,
  50. myIp: myIp,
  51. addrMap: make(map[uint32][]*udpAddr),
  52. nebulaPort: nebulaPort,
  53. lighthouses: make(map[uint32]struct{}),
  54. staticList: make(map[uint32]struct{}),
  55. interval: interval,
  56. punchConn: pc,
  57. punchBack: punchBack,
  58. punchDelay: punchDelay,
  59. l: l,
  60. }
  61. if metricsEnabled {
  62. h.metrics = newLighthouseMetrics()
  63. h.metricHolepunchTx = metrics.GetOrRegisterCounter("messages.tx.holepunch", nil)
  64. } else {
  65. h.metricHolepunchTx = metrics.NilCounter{}
  66. }
  67. for _, ip := range ips {
  68. h.lighthouses[ip] = struct{}{}
  69. }
  70. return &h
  71. }
  72. func (lh *LightHouse) SetRemoteAllowList(allowList *AllowList) {
  73. lh.Lock()
  74. defer lh.Unlock()
  75. lh.remoteAllowList = allowList
  76. }
  77. func (lh *LightHouse) SetLocalAllowList(allowList *AllowList) {
  78. lh.Lock()
  79. defer lh.Unlock()
  80. lh.localAllowList = allowList
  81. }
  82. func (lh *LightHouse) ValidateLHStaticEntries() error {
  83. for lhIP, _ := range lh.lighthouses {
  84. if _, ok := lh.staticList[lhIP]; !ok {
  85. return fmt.Errorf("Lighthouse %s does not have a static_host_map entry", IntIp(lhIP))
  86. }
  87. }
  88. return nil
  89. }
  90. func (lh *LightHouse) Query(ip uint32, f EncWriter) ([]*udpAddr, error) {
  91. if !lh.IsLighthouseIP(ip) {
  92. lh.QueryServer(ip, f)
  93. }
  94. lh.RLock()
  95. if v, ok := lh.addrMap[ip]; ok {
  96. lh.RUnlock()
  97. return v, nil
  98. }
  99. lh.RUnlock()
  100. return nil, ErrHostNotKnown
  101. }
  102. // This is asynchronous so no reply should be expected
  103. func (lh *LightHouse) QueryServer(ip uint32, f EncWriter) {
  104. if !lh.amLighthouse {
  105. // Send a query to the lighthouses and hope for the best next time
  106. query, err := proto.Marshal(NewLhQueryByInt(ip))
  107. if err != nil {
  108. lh.l.WithError(err).WithField("vpnIp", IntIp(ip)).Error("Failed to marshal lighthouse query payload")
  109. return
  110. }
  111. lh.metricTx(NebulaMeta_HostQuery, int64(len(lh.lighthouses)))
  112. nb := make([]byte, 12, 12)
  113. out := make([]byte, mtu)
  114. for n := range lh.lighthouses {
  115. f.SendMessageToVpnIp(lightHouse, 0, n, query, nb, out)
  116. }
  117. }
  118. }
  119. // Query our local lighthouse cached results
  120. func (lh *LightHouse) QueryCache(ip uint32) []*udpAddr {
  121. lh.RLock()
  122. if v, ok := lh.addrMap[ip]; ok {
  123. lh.RUnlock()
  124. return v
  125. }
  126. lh.RUnlock()
  127. return nil
  128. }
  129. func (lh *LightHouse) DeleteVpnIP(vpnIP uint32) {
  130. // First we check the static mapping
  131. // and do nothing if it is there
  132. if _, ok := lh.staticList[vpnIP]; ok {
  133. return
  134. }
  135. lh.Lock()
  136. //l.Debugln(lh.addrMap)
  137. delete(lh.addrMap, vpnIP)
  138. lh.l.Debugf("deleting %s from lighthouse.", IntIp(vpnIP))
  139. lh.Unlock()
  140. }
  141. func (lh *LightHouse) AddRemote(vpnIP uint32, toIp *udpAddr, static bool) {
  142. // First we check if the sender thinks this is a static entry
  143. // and do nothing if it is not, but should be considered static
  144. if static == false {
  145. if _, ok := lh.staticList[vpnIP]; ok {
  146. return
  147. }
  148. }
  149. lh.Lock()
  150. defer lh.Unlock()
  151. for _, v := range lh.addrMap[vpnIP] {
  152. if v.Equals(toIp) {
  153. return
  154. }
  155. }
  156. allow := lh.remoteAllowList.Allow(toIp.IP)
  157. lh.l.WithField("remoteIp", toIp).WithField("allow", allow).Debug("remoteAllowList.Allow")
  158. if !allow {
  159. return
  160. }
  161. //l.Debugf("Adding reply of %s as %s\n", IntIp(vpnIP), toIp)
  162. if static {
  163. lh.staticList[vpnIP] = struct{}{}
  164. }
  165. lh.addrMap[vpnIP] = append(lh.addrMap[vpnIP], toIp.Copy())
  166. }
  167. func (lh *LightHouse) AddRemoteAndReset(vpnIP uint32, toIp *udpAddr) {
  168. if lh.amLighthouse {
  169. lh.DeleteVpnIP(vpnIP)
  170. lh.AddRemote(vpnIP, toIp, false)
  171. }
  172. }
  173. func (lh *LightHouse) IsLighthouseIP(vpnIP uint32) bool {
  174. if _, ok := lh.lighthouses[vpnIP]; ok {
  175. return true
  176. }
  177. return false
  178. }
  179. func NewLhQueryByInt(VpnIp uint32) *NebulaMeta {
  180. return &NebulaMeta{
  181. Type: NebulaMeta_HostQuery,
  182. Details: &NebulaMetaDetails{
  183. VpnIp: VpnIp,
  184. },
  185. }
  186. }
  187. type ip4Or6 struct {
  188. v4 IpAndPort
  189. v6 Ip6AndPort
  190. }
  191. func NewIpAndPort(ip net.IP, port uint32) ip4Or6 {
  192. ipp := ip4Or6{}
  193. if ipv4 := ip.To4(); ipv4 != nil {
  194. ipp.v4 = IpAndPort{Port: port}
  195. ipp.v4.Ip = ip2int(ip)
  196. } else {
  197. ipp.v6 = Ip6AndPort{Port: port}
  198. ipp.v6.Ip = make([]byte, len(ip))
  199. copy(ipp.v6.Ip, ip)
  200. }
  201. return ipp
  202. }
  203. func NewIpAndPortFromUDPAddr(addr *udpAddr) ip4Or6 {
  204. return NewIpAndPort(addr.IP, uint32(addr.Port))
  205. }
  206. func NewUDPAddrFromLH4(ipp *IpAndPort) *udpAddr {
  207. ip := ipp.Ip
  208. return NewUDPAddr(
  209. net.IPv4(byte(ip&0xff000000>>24), byte(ip&0x00ff0000>>16), byte(ip&0x0000ff00>>8), byte(ip&0x000000ff)),
  210. uint16(ipp.Port),
  211. )
  212. }
  213. func NewUDPAddrFromLH6(ipp *Ip6AndPort) *udpAddr {
  214. return NewUDPAddr(ipp.Ip, uint16(ipp.Port))
  215. }
  216. func (lh *LightHouse) LhUpdateWorker(f EncWriter) {
  217. if lh.amLighthouse || lh.interval == 0 {
  218. return
  219. }
  220. for {
  221. lh.SendUpdate(f)
  222. time.Sleep(time.Second * time.Duration(lh.interval))
  223. }
  224. }
  225. func (lh *LightHouse) SendUpdate(f EncWriter) {
  226. var v4 []*IpAndPort
  227. var v6 []*Ip6AndPort
  228. for _, e := range *localIps(lh.l, lh.localAllowList) {
  229. // Only add IPs that aren't my VPN/tun IP
  230. if ip2int(e) != lh.myIp {
  231. ipp := NewIpAndPort(e, lh.nebulaPort)
  232. if len(ipp.v6.Ip) > 0 {
  233. v6 = append(v6, &ipp.v6)
  234. } else {
  235. v4 = append(v4, &ipp.v4)
  236. }
  237. }
  238. }
  239. m := &NebulaMeta{
  240. Type: NebulaMeta_HostUpdateNotification,
  241. Details: &NebulaMetaDetails{
  242. VpnIp: lh.myIp,
  243. IpAndPorts: v4,
  244. Ip6AndPorts: v6,
  245. },
  246. }
  247. lh.metricTx(NebulaMeta_HostUpdateNotification, int64(len(lh.lighthouses)))
  248. nb := make([]byte, 12, 12)
  249. out := make([]byte, mtu)
  250. for vpnIp := range lh.lighthouses {
  251. mm, err := proto.Marshal(m)
  252. if err != nil {
  253. lh.l.Debugf("Invalid marshal to update")
  254. }
  255. //l.Error("LIGHTHOUSE PACKET SEND", mm)
  256. f.SendMessageToVpnIp(lightHouse, 0, vpnIp, mm, nb, out)
  257. }
  258. }
  259. type LightHouseHandler struct {
  260. lh *LightHouse
  261. nb []byte
  262. out []byte
  263. meta *NebulaMeta
  264. iap []ip4Or6
  265. iapp []*ip4Or6
  266. }
  267. func (lh *LightHouse) NewRequestHandler() *LightHouseHandler {
  268. lhh := &LightHouseHandler{
  269. lh: lh,
  270. nb: make([]byte, 12, 12),
  271. out: make([]byte, mtu),
  272. meta: &NebulaMeta{
  273. Details: &NebulaMetaDetails{},
  274. },
  275. }
  276. lhh.resizeIpAndPorts(10)
  277. return lhh
  278. }
  279. // This method is similar to Reset(), but it re-uses the pointer structs
  280. // so that we don't have to re-allocate them
  281. func (lhh *LightHouseHandler) resetMeta() *NebulaMeta {
  282. details := lhh.meta.Details
  283. details.Reset()
  284. lhh.meta.Reset()
  285. lhh.meta.Details = details
  286. return lhh.meta
  287. }
  288. func (lhh *LightHouseHandler) resizeIpAndPorts(n int) {
  289. if cap(lhh.iap) < n {
  290. lhh.iap = make([]ip4Or6, n)
  291. lhh.iapp = make([]*ip4Or6, n)
  292. for i := range lhh.iap {
  293. lhh.iapp[i] = &lhh.iap[i]
  294. }
  295. }
  296. lhh.iap = lhh.iap[:n]
  297. lhh.iapp = lhh.iapp[:n]
  298. }
  299. func (lhh *LightHouseHandler) setIpAndPortsFromNetIps(ips []*udpAddr) []*ip4Or6 {
  300. lhh.resizeIpAndPorts(len(ips))
  301. for i, e := range ips {
  302. lhh.iap[i] = NewIpAndPortFromUDPAddr(e)
  303. }
  304. return lhh.iapp
  305. }
  306. func (lhh *LightHouseHandler) HandleRequest(rAddr *udpAddr, vpnIp uint32, p []byte, c *cert.NebulaCertificate, f EncWriter) {
  307. lh := lhh.lh
  308. n := lhh.resetMeta()
  309. err := proto.UnmarshalMerge(p, n)
  310. if err != nil {
  311. lh.l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).WithField("udpAddr", rAddr).
  312. Error("Failed to unmarshal lighthouse packet")
  313. //TODO: send recv_error?
  314. return
  315. }
  316. if n.Details == nil {
  317. lh.l.WithField("vpnIp", IntIp(vpnIp)).WithField("udpAddr", rAddr).
  318. Error("Invalid lighthouse update")
  319. //TODO: send recv_error?
  320. return
  321. }
  322. lh.metricRx(n.Type, 1)
  323. switch n.Type {
  324. case NebulaMeta_HostQuery:
  325. // Exit if we don't answer queries
  326. if !lh.amLighthouse {
  327. lh.l.Debugln("I don't answer queries, but received from: ", rAddr)
  328. return
  329. }
  330. //l.Debugln("Got Query")
  331. ips, err := lh.Query(n.Details.VpnIp, f)
  332. if err != nil {
  333. //l.Debugf("Can't answer query %s from %s because error: %s", IntIp(n.Details.VpnIp), rAddr, err)
  334. return
  335. } else {
  336. reqVpnIP := n.Details.VpnIp
  337. n = lhh.resetMeta()
  338. n.Type = NebulaMeta_HostQueryReply
  339. n.Details.VpnIp = reqVpnIP
  340. v4s := make([]*IpAndPort, 0)
  341. v6s := make([]*Ip6AndPort, 0)
  342. for _, v := range lhh.setIpAndPortsFromNetIps(ips) {
  343. if len(v.v6.Ip) > 0 {
  344. v6s = append(v6s, &v.v6)
  345. } else {
  346. v4s = append(v4s, &v.v4)
  347. }
  348. }
  349. if len(v4s) > 0 {
  350. n.Details.IpAndPorts = v4s
  351. }
  352. if len(v6s) > 0 {
  353. n.Details.Ip6AndPorts = v6s
  354. }
  355. reply, err := proto.Marshal(n)
  356. if err != nil {
  357. lh.l.WithError(err).WithField("vpnIp", IntIp(vpnIp)).Error("Failed to marshal lighthouse host query reply")
  358. return
  359. }
  360. lh.metricTx(NebulaMeta_HostQueryReply, 1)
  361. f.SendMessageToVpnIp(lightHouse, 0, vpnIp, reply, lhh.nb, lhh.out[:0])
  362. // This signals the other side to punch some zero byte udp packets
  363. ips, err = lh.Query(vpnIp, f)
  364. if err != nil {
  365. lh.l.WithField("vpnIp", IntIp(vpnIp)).Debugln("Can't notify host to punch")
  366. return
  367. } else {
  368. //l.Debugln("Notify host to punch", iap)
  369. n = lhh.resetMeta()
  370. n.Type = NebulaMeta_HostPunchNotification
  371. n.Details.VpnIp = vpnIp
  372. v4s := make([]*IpAndPort, 0)
  373. v6s := make([]*Ip6AndPort, 0)
  374. for _, v := range lhh.setIpAndPortsFromNetIps(ips) {
  375. if len(v.v6.Ip) > 0 {
  376. v6s = append(v6s, &v.v6)
  377. } else {
  378. v4s = append(v4s, &v.v4)
  379. }
  380. }
  381. if len(v4s) > 0 {
  382. n.Details.IpAndPorts = v4s
  383. }
  384. if len(v6s) > 0 {
  385. n.Details.Ip6AndPorts = v6s
  386. }
  387. reply, _ := proto.Marshal(n)
  388. lh.metricTx(NebulaMeta_HostPunchNotification, 1)
  389. f.SendMessageToVpnIp(lightHouse, 0, reqVpnIP, reply, lhh.nb, lhh.out[:0])
  390. }
  391. //fmt.Println(reply, remoteaddr)
  392. }
  393. case NebulaMeta_HostQueryReply:
  394. if !lh.IsLighthouseIP(vpnIp) {
  395. return
  396. }
  397. for _, a := range n.Details.IpAndPorts {
  398. ans := NewUDPAddrFromLH4(a)
  399. if ans != nil {
  400. lh.AddRemote(n.Details.VpnIp, ans, false)
  401. }
  402. }
  403. for _, a := range n.Details.Ip6AndPorts {
  404. ans := NewUDPAddrFromLH6(a)
  405. if ans != nil {
  406. lh.AddRemote(n.Details.VpnIp, ans, false)
  407. }
  408. }
  409. // Non-blocking attempt to trigger, skip if it would block
  410. select {
  411. case lh.handshakeTrigger <- n.Details.VpnIp:
  412. default:
  413. }
  414. case NebulaMeta_HostUpdateNotification:
  415. //Simple check that the host sent this not someone else
  416. if n.Details.VpnIp != vpnIp {
  417. lh.l.WithField("vpnIp", IntIp(vpnIp)).WithField("answer", IntIp(n.Details.VpnIp)).Debugln("Host sent invalid update")
  418. return
  419. }
  420. for _, a := range n.Details.IpAndPorts {
  421. ans := NewUDPAddrFromLH4(a)
  422. if ans != nil {
  423. lh.AddRemote(n.Details.VpnIp, ans, false)
  424. }
  425. }
  426. for _, a := range n.Details.Ip6AndPorts {
  427. ans := NewUDPAddrFromLH6(a)
  428. if ans != nil {
  429. lh.AddRemote(n.Details.VpnIp, ans, false)
  430. }
  431. }
  432. case NebulaMeta_HostMovedNotification:
  433. case NebulaMeta_HostPunchNotification:
  434. if !lh.IsLighthouseIP(vpnIp) {
  435. return
  436. }
  437. empty := []byte{0}
  438. for _, a := range n.Details.IpAndPorts {
  439. vpnPeer := NewUDPAddrFromLH4(a)
  440. if vpnPeer == nil {
  441. continue
  442. }
  443. go func() {
  444. time.Sleep(lh.punchDelay)
  445. lh.metricHolepunchTx.Inc(1)
  446. lh.punchConn.WriteTo(empty, vpnPeer)
  447. }()
  448. if lh.l.Level >= logrus.DebugLevel {
  449. //TODO: lacking the ip we are actually punching on, old: l.Debugf("Punching %s on %d for %s", IntIp(a.Ip), a.Port, IntIp(n.Details.VpnIp))
  450. lh.l.Debugf("Punching on %d for %s", a.Port, IntIp(n.Details.VpnIp))
  451. }
  452. }
  453. for _, a := range n.Details.Ip6AndPorts {
  454. vpnPeer := NewUDPAddrFromLH6(a)
  455. if vpnPeer == nil {
  456. continue
  457. }
  458. go func() {
  459. time.Sleep(lh.punchDelay)
  460. lh.metricHolepunchTx.Inc(1)
  461. lh.punchConn.WriteTo(empty, vpnPeer)
  462. }()
  463. if lh.l.Level >= logrus.DebugLevel {
  464. //TODO: lacking the ip we are actually punching on, old: l.Debugf("Punching %s on %d for %s", IntIp(a.Ip), a.Port, IntIp(n.Details.VpnIp))
  465. lh.l.Debugf("Punching on %d for %s", a.Port, IntIp(n.Details.VpnIp))
  466. }
  467. }
  468. // This sends a nebula test packet to the host trying to contact us. In the case
  469. // of a double nat or other difficult scenario, this may help establish
  470. // a tunnel.
  471. if lh.punchBack {
  472. go func() {
  473. time.Sleep(time.Second * 5)
  474. lh.l.Debugf("Sending a nebula test packet to vpn ip %s", IntIp(n.Details.VpnIp))
  475. // TODO we have to allocate a new output buffer here since we are spawning a new goroutine
  476. // for each punchBack packet. We should move this into a timerwheel or a single goroutine
  477. // managed by a channel.
  478. f.SendMessageToVpnIp(test, testRequest, n.Details.VpnIp, []byte(""), make([]byte, 12, 12), make([]byte, mtu))
  479. }()
  480. }
  481. }
  482. }
  483. func (lh *LightHouse) metricRx(t NebulaMeta_MessageType, i int64) {
  484. lh.metrics.Rx(NebulaMessageType(t), 0, i)
  485. }
  486. func (lh *LightHouse) metricTx(t NebulaMeta_MessageType, i int64) {
  487. lh.metrics.Tx(NebulaMessageType(t), 0, i)
  488. }
  489. /*
  490. func (f *Interface) sendPathCheck(ci *ConnectionState, endpoint *net.UDPAddr, counter int) {
  491. c := ci.messageCounter
  492. b := HeaderEncode(nil, Version, uint8(path_check), 0, ci.remoteIndex, c)
  493. ci.messageCounter++
  494. if ci.eKey != nil {
  495. msg := ci.eKey.EncryptDanger(b, nil, []byte(strconv.Itoa(counter)), c)
  496. //msg := ci.eKey.EncryptDanger(b, nil, []byte(fmt.Sprintf("%d", counter)), c)
  497. f.outside.WriteTo(msg, endpoint)
  498. l.Debugf("path_check sent, remote index: %d, pathCounter %d", ci.remoteIndex, counter)
  499. }
  500. }
  501. func (f *Interface) sendPathCheckReply(ci *ConnectionState, endpoint *net.UDPAddr, counter []byte) {
  502. c := ci.messageCounter
  503. b := HeaderEncode(nil, Version, uint8(path_check_reply), 0, ci.remoteIndex, c)
  504. ci.messageCounter++
  505. if ci.eKey != nil {
  506. msg := ci.eKey.EncryptDanger(b, nil, counter, c)
  507. f.outside.WriteTo(msg, endpoint)
  508. l.Debugln("path_check sent, remote index: ", ci.remoteIndex)
  509. }
  510. }
  511. */