vpn.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. // Copyright © 2022 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package vpn
  16. import (
  17. "context"
  18. "fmt"
  19. "io"
  20. "net"
  21. "os"
  22. "runtime"
  23. "sync"
  24. "time"
  25. "github.com/ipfs/go-log"
  26. "github.com/libp2p/go-libp2p-core/network"
  27. "github.com/libp2p/go-libp2p-core/peer"
  28. "github.com/mudler/edgevpn/internal"
  29. "github.com/mudler/edgevpn/pkg/blockchain"
  30. "github.com/mudler/edgevpn/pkg/logger"
  31. "github.com/mudler/edgevpn/pkg/node"
  32. "github.com/mudler/edgevpn/pkg/protocol"
  33. "github.com/mudler/edgevpn/pkg/stream"
  34. "github.com/mudler/edgevpn/pkg/types"
  35. "github.com/pkg/errors"
  36. "github.com/songgao/packets/ethernet"
  37. "github.com/songgao/water"
  38. "golang.org/x/net/ipv4"
  39. )
  40. type streamManager interface {
  41. Connected(n network.Network, c network.Stream)
  42. Disconnected(n network.Network, c network.Stream)
  43. HasStream(n network.Network, pid peer.ID) (network.Stream, error)
  44. Close() error
  45. }
  46. func VPNNetworkService(p ...Option) node.NetworkService {
  47. return func(ctx context.Context, nc node.Config, n *node.Node, b *blockchain.Ledger) error {
  48. c := &Config{
  49. Concurrency: 1,
  50. LedgerAnnounceTime: 5 * time.Second,
  51. Timeout: 15 * time.Second,
  52. Logger: logger.New(log.LevelDebug),
  53. MaxStreams: 30,
  54. }
  55. if err := c.Apply(p...); err != nil {
  56. return err
  57. }
  58. ifce, err := createInterface(c)
  59. if err != nil {
  60. return err
  61. }
  62. defer ifce.Close()
  63. var mgr streamManager
  64. if !c.lowProfile {
  65. // Create stream manager for outgoing connections
  66. mgr, err = stream.NewConnManager(10, c.MaxStreams)
  67. if err != nil {
  68. return err
  69. }
  70. // Attach it to the same context
  71. go func() {
  72. <-ctx.Done()
  73. mgr.Close()
  74. }()
  75. }
  76. // Set stream handler during runtime
  77. n.Host().SetStreamHandler(protocol.EdgeVPN.ID(), streamHandler(b, ifce, c))
  78. // Announce our IP
  79. ip, _, err := net.ParseCIDR(c.InterfaceAddress)
  80. if err != nil {
  81. return err
  82. }
  83. b.Announce(
  84. ctx,
  85. c.LedgerAnnounceTime,
  86. func() {
  87. machine := &types.Machine{}
  88. // Retrieve current ID for ip in the blockchain
  89. existingValue, found := b.GetKey(protocol.MachinesLedgerKey, ip.String())
  90. existingValue.Unmarshal(machine)
  91. // If mismatch, update the blockchain
  92. if !found || machine.PeerID != n.Host().ID().String() {
  93. updatedMap := map[string]interface{}{}
  94. updatedMap[ip.String()] = newBlockChainData(n, ip.String())
  95. b.Add(protocol.MachinesLedgerKey, updatedMap)
  96. }
  97. },
  98. )
  99. if c.NetLinkBootstrap {
  100. if err := prepareInterface(c); err != nil {
  101. return err
  102. }
  103. }
  104. // read packets from the interface
  105. return readPackets(ctx, mgr, c, n, b, ifce)
  106. }
  107. }
  108. // Start the node and the vpn. Returns an error in case of failure
  109. // When starting the vpn, there is no need to start the node
  110. func Register(p ...Option) ([]node.Option, error) {
  111. return []node.Option{node.WithNetworkService(VPNNetworkService(p...))}, nil
  112. }
  113. func streamHandler(l *blockchain.Ledger, ifce *water.Interface, c *Config) func(stream network.Stream) {
  114. return func(stream network.Stream) {
  115. if !l.Exists(protocol.MachinesLedgerKey,
  116. func(d blockchain.Data) bool {
  117. machine := &types.Machine{}
  118. d.Unmarshal(machine)
  119. return machine.PeerID == stream.Conn().RemotePeer().String()
  120. }) {
  121. stream.Reset()
  122. return
  123. }
  124. _, err := io.Copy(ifce.ReadWriteCloser, stream)
  125. if err != nil {
  126. stream.Reset()
  127. }
  128. if c.lowProfile {
  129. stream.Close()
  130. }
  131. }
  132. }
  133. func newBlockChainData(n *node.Node, address string) types.Machine {
  134. hostname, _ := os.Hostname()
  135. return types.Machine{
  136. PeerID: n.Host().ID().String(),
  137. Hostname: hostname,
  138. OS: runtime.GOOS,
  139. Arch: runtime.GOARCH,
  140. Version: internal.Version,
  141. Address: address,
  142. }
  143. }
  144. func getFrame(ifce *water.Interface, c *Config) (ethernet.Frame, error) {
  145. var frame ethernet.Frame
  146. frame.Resize(c.MTU)
  147. n, err := ifce.Read([]byte(frame))
  148. if err != nil {
  149. return frame, errors.Wrap(err, "could not read from interface")
  150. }
  151. frame = frame[:n]
  152. return frame, nil
  153. }
  154. func handleFrame(mgr streamManager, frame ethernet.Frame, c *Config, n *node.Node, ip net.IP, ledger *blockchain.Ledger, ifce *water.Interface) error {
  155. ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
  156. defer cancel()
  157. header, err := ipv4.ParseHeader(frame)
  158. if err != nil {
  159. return errors.Wrap(err, "could not parse ipv4 header from frame")
  160. }
  161. dst := header.Dst.String()
  162. if c.RouterAddress != "" && header.Src.Equal(ip) {
  163. dst = c.RouterAddress
  164. }
  165. // Query the routing table
  166. value, found := ledger.GetKey(protocol.MachinesLedgerKey, dst)
  167. if !found {
  168. return fmt.Errorf("'%s' not found in the routing table", dst)
  169. }
  170. machine := &types.Machine{}
  171. value.Unmarshal(machine)
  172. // Decode the Peer
  173. d, err := peer.Decode(machine.PeerID)
  174. if err != nil {
  175. return errors.Wrap(err, "could not decode peer")
  176. }
  177. var stream network.Stream
  178. if mgr != nil {
  179. // Open a stream if necessary
  180. stream, err = mgr.HasStream(n.Host().Network(), d)
  181. if err == nil {
  182. _, err = stream.Write(frame)
  183. if err == nil {
  184. return nil
  185. }
  186. mgr.Disconnected(n.Host().Network(), stream)
  187. }
  188. }
  189. stream, err = n.Host().NewStream(ctx, d, protocol.EdgeVPN.ID())
  190. if err != nil {
  191. return fmt.Errorf("could not open stream to %s: %w", d.String(), err)
  192. }
  193. if mgr != nil {
  194. mgr.Connected(n.Host().Network(), stream)
  195. }
  196. _, err = stream.Write(frame)
  197. if c.lowProfile {
  198. return stream.Close()
  199. }
  200. return err
  201. }
  202. func connectionWorker(
  203. p chan ethernet.Frame,
  204. mgr streamManager,
  205. c *Config,
  206. n *node.Node,
  207. ip net.IP,
  208. wg *sync.WaitGroup,
  209. ledger *blockchain.Ledger,
  210. ifce *water.Interface) {
  211. defer wg.Done()
  212. for f := range p {
  213. if err := handleFrame(mgr, f, c, n, ip, ledger, ifce); err != nil {
  214. c.Logger.Debugf("could not handle frame: %s", err.Error())
  215. }
  216. }
  217. }
  218. // redirects packets from the interface to the node using the routing table in the blockchain
  219. func readPackets(ctx context.Context, mgr streamManager, c *Config, n *node.Node, ledger *blockchain.Ledger, ifce *water.Interface) error {
  220. ip, _, err := net.ParseCIDR(c.InterfaceAddress)
  221. if err != nil {
  222. return err
  223. }
  224. wg := new(sync.WaitGroup)
  225. packets := make(chan ethernet.Frame, c.ChannelBufferSize)
  226. defer func() {
  227. close(packets)
  228. wg.Wait()
  229. }()
  230. for i := 0; i < c.Concurrency; i++ {
  231. wg.Add(1)
  232. go connectionWorker(packets, mgr, c, n, ip, wg, ledger, ifce)
  233. }
  234. for {
  235. select {
  236. case <-ctx.Done():
  237. return nil
  238. default:
  239. frame, err := getFrame(ifce, c)
  240. if err != nil {
  241. c.Logger.Errorf("could not get frame '%s'", err.Error())
  242. continue
  243. }
  244. packets <- frame
  245. }
  246. }
  247. }