services.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. // Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package services
  16. import (
  17. "context"
  18. "io"
  19. "net"
  20. "time"
  21. "github.com/ipfs/go-log"
  22. "github.com/libp2p/go-libp2p/core/network"
  23. "github.com/libp2p/go-libp2p/core/peer"
  24. "github.com/mudler/edgevpn/pkg/blockchain"
  25. "github.com/mudler/edgevpn/pkg/node"
  26. protocol "github.com/mudler/edgevpn/pkg/protocol"
  27. "github.com/pkg/errors"
  28. "github.com/mudler/edgevpn/pkg/types"
  29. )
  30. func ExposeNetworkService(announcetime time.Duration, serviceID string) node.NetworkService {
  31. return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
  32. b.Announce(
  33. ctx,
  34. announcetime,
  35. func() {
  36. // Retrieve current ID for ip in the blockchain
  37. existingValue, found := b.GetKey(protocol.ServicesLedgerKey, serviceID)
  38. service := &types.Service{}
  39. existingValue.Unmarshal(service)
  40. // If mismatch, update the blockchain
  41. if !found || service.PeerID != n.Host().ID().String() {
  42. updatedMap := map[string]interface{}{}
  43. updatedMap[serviceID] = types.Service{PeerID: n.Host().ID().String(), Name: serviceID}
  44. b.Add(protocol.ServicesLedgerKey, updatedMap)
  45. }
  46. },
  47. )
  48. return nil
  49. }
  50. }
  51. // ExposeService exposes a service to the p2p network.
  52. // meant to be called before a node is started with Start()
  53. func RegisterService(ll log.StandardLogger, announcetime time.Duration, serviceID, dstaddress string) []node.Option {
  54. ll.Infof("Exposing service '%s' (%s)", serviceID, dstaddress)
  55. return []node.Option{
  56. node.WithStreamHandler(protocol.ServiceProtocol, func(n *node.Node, l *blockchain.Ledger) func(stream network.Stream) {
  57. return func(stream network.Stream) {
  58. go func() {
  59. ll.Infof("(service %s) Received connection from %s", serviceID, stream.Conn().RemotePeer().String())
  60. // Retrieve current ID for ip in the blockchain
  61. _, found := l.GetKey(protocol.UsersLedgerKey, stream.Conn().RemotePeer().String())
  62. // If mismatch, update the blockchain
  63. if !found {
  64. ll.Debugf("Reset '%s': not found in the ledger", stream.Conn().RemotePeer().String())
  65. stream.Reset()
  66. return
  67. }
  68. ll.Infof("Connecting to '%s'", dstaddress)
  69. c, err := net.Dial("tcp", dstaddress)
  70. if err != nil {
  71. ll.Debugf("Reset %s: %s", stream.Conn().RemotePeer().String(), err.Error())
  72. stream.Reset()
  73. return
  74. }
  75. closer := make(chan struct{}, 2)
  76. go copyStream(closer, stream, c)
  77. go copyStream(closer, c, stream)
  78. <-closer
  79. stream.Close()
  80. c.Close()
  81. ll.Infof("(service %s) Handled correctly '%s'", serviceID, stream.Conn().RemotePeer().String())
  82. }()
  83. }
  84. }),
  85. node.WithNetworkService(ExposeNetworkService(announcetime, serviceID))}
  86. }
  87. // ConnectNetworkService returns a network service that binds to a service
  88. func ConnectNetworkService(announcetime time.Duration, serviceID string, srcaddr string) node.NetworkService {
  89. return func(ctx context.Context, c node.Config, node *node.Node, ledger *blockchain.Ledger) error {
  90. // Open local port for listening
  91. l, err := net.Listen("tcp", srcaddr)
  92. if err != nil {
  93. return err
  94. }
  95. // ll.Info("Binding local port on", srcaddr)
  96. // Announce ourselves so nodes accepts our connection
  97. ledger.Announce(
  98. ctx,
  99. announcetime,
  100. func() {
  101. // Retrieve current ID for ip in the blockchain
  102. _, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
  103. // If mismatch, update the blockchain
  104. if !found {
  105. updatedMap := map[string]interface{}{}
  106. updatedMap[node.Host().ID().String()] = &types.User{
  107. PeerID: node.Host().ID().String(),
  108. Timestamp: time.Now().String(),
  109. }
  110. ledger.Add(protocol.UsersLedgerKey, updatedMap)
  111. }
  112. },
  113. )
  114. defer l.Close()
  115. for {
  116. select {
  117. case <-ctx.Done():
  118. return errors.New("context canceled")
  119. default:
  120. // Listen for an incoming connection.
  121. conn, err := l.Accept()
  122. if err != nil {
  123. // ll.Error("Error accepting: ", err.Error())
  124. continue
  125. }
  126. // ll.Info("New connection from", l.Addr().String())
  127. // Handle connections in a new goroutine, forwarding to the p2p service
  128. go func() {
  129. // Retrieve current ID for ip in the blockchain
  130. existingValue, found := ledger.GetKey(protocol.ServicesLedgerKey, serviceID)
  131. service := &types.Service{}
  132. existingValue.Unmarshal(service)
  133. // If mismatch, update the blockchain
  134. if !found {
  135. conn.Close()
  136. // ll.Debugf("service '%s' not found on blockchain", serviceID)
  137. return
  138. }
  139. // Decode the Peer
  140. d, err := peer.Decode(service.PeerID)
  141. if err != nil {
  142. conn.Close()
  143. // ll.Debugf("could not decode peer '%s'", service.PeerID)
  144. return
  145. }
  146. // Open a stream
  147. stream, err := node.Host().NewStream(ctx, d, protocol.ServiceProtocol.ID())
  148. if err != nil {
  149. conn.Close()
  150. // ll.Debugf("could not open stream '%s'", err.Error())
  151. return
  152. }
  153. // ll.Debugf("(service %s) Redirecting", serviceID, l.Addr().String())
  154. closer := make(chan struct{}, 2)
  155. go copyStream(closer, stream, conn)
  156. go copyStream(closer, conn, stream)
  157. <-closer
  158. stream.Close()
  159. conn.Close()
  160. // ll.Infof("(service %s) Done handling %s", serviceID, l.Addr().String())
  161. }()
  162. }
  163. }
  164. }
  165. }
  166. func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
  167. defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
  168. io.Copy(dst, src)
  169. }