egress.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. // Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
  2. //
  3. // This program is free software; you can redistribute it and/or modify
  4. // it under the terms of the GNU General Public License as published by
  5. // the Free Software Foundation; either version 2 of the License, or
  6. // (at your option) any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful,
  9. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  10. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  11. // GNU General Public License for more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program; if not, see <http://www.gnu.org/licenses/>.
  15. package services
  16. import (
  17. "bufio"
  18. "context"
  19. "io"
  20. "log"
  21. "math/rand"
  22. "net/http"
  23. "strings"
  24. "time"
  25. "github.com/libp2p/go-libp2p/core/network"
  26. "github.com/libp2p/go-libp2p/core/peer"
  27. "github.com/mudler/edgevpn/pkg/blockchain"
  28. "github.com/mudler/edgevpn/pkg/node"
  29. "github.com/mudler/edgevpn/pkg/protocol"
  30. "github.com/mudler/edgevpn/pkg/types"
  31. )
  32. func egressHandler(n *node.Node, b *blockchain.Ledger) func(stream network.Stream) {
  33. return func(stream network.Stream) {
  34. // Remember to close the stream when we are done.
  35. defer stream.Close()
  36. // Retrieve current ID for ip in the blockchain
  37. _, found := b.GetKey(protocol.UsersLedgerKey, stream.Conn().RemotePeer().String())
  38. // If mismatch, update the blockchain
  39. if !found {
  40. // ll.Debugf("Reset '%s': not found in the ledger", stream.Conn().RemotePeer().String())
  41. stream.Reset()
  42. return
  43. }
  44. // Create a new buffered reader, as ReadRequest needs one.
  45. // The buffered reader reads from our stream, on which we
  46. // have sent the HTTP request (see ServeHTTP())
  47. buf := bufio.NewReader(stream)
  48. // Read the HTTP request from the buffer
  49. req, err := http.ReadRequest(buf)
  50. if err != nil {
  51. stream.Reset()
  52. log.Println(err)
  53. return
  54. }
  55. defer req.Body.Close()
  56. // We need to reset these fields in the request
  57. // URL as they are not maintained.
  58. req.URL.Scheme = "http"
  59. hp := strings.Split(req.Host, ":")
  60. if len(hp) > 1 && hp[1] == "443" {
  61. req.URL.Scheme = "https"
  62. } else {
  63. req.URL.Scheme = "http"
  64. }
  65. req.URL.Host = req.Host
  66. outreq := new(http.Request)
  67. *outreq = *req
  68. // We now make the request
  69. //fmt.Printf("Making request to %s\n", req.URL)
  70. resp, err := http.DefaultTransport.RoundTrip(outreq)
  71. if err != nil {
  72. stream.Reset()
  73. log.Println(err)
  74. return
  75. }
  76. // resp.Write writes whatever response we obtained for our
  77. // request back to the stream.
  78. resp.Write(stream)
  79. }
  80. }
  81. // ProxyService starts a local http proxy server which redirects requests to egresses into the network
  82. // It takes a deadtime to consider hosts which are alive within a time window
  83. func ProxyService(announceTime time.Duration, listenAddr string, deadtime time.Duration) node.NetworkService {
  84. return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
  85. ps := &proxyService{
  86. host: n,
  87. listenAddr: listenAddr,
  88. deadTime: deadtime,
  89. }
  90. // Announce ourselves so nodes accepts our connection
  91. b.Announce(
  92. ctx,
  93. announceTime,
  94. func() {
  95. // Retrieve current ID for ip in the blockchain
  96. _, found := b.GetKey(protocol.UsersLedgerKey, n.Host().ID().String())
  97. // If mismatch, update the blockchain
  98. if !found {
  99. updatedMap := map[string]interface{}{}
  100. updatedMap[n.Host().ID().String()] = &types.User{
  101. PeerID: n.Host().ID().String(),
  102. Timestamp: time.Now().String(),
  103. }
  104. b.Add(protocol.UsersLedgerKey, updatedMap)
  105. }
  106. },
  107. )
  108. go ps.Serve()
  109. return nil
  110. }
  111. }
  112. type proxyService struct {
  113. host *node.Node
  114. listenAddr string
  115. deadTime time.Duration
  116. }
  117. func (p *proxyService) Serve() error {
  118. return http.ListenAndServe(p.listenAddr, p)
  119. }
  120. func (p *proxyService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  121. l, err := p.host.Ledger()
  122. if err != nil {
  123. //fmt.Printf("no ledger")
  124. return
  125. }
  126. egress := l.CurrentData()[protocol.EgressService]
  127. nodes := AvailableNodes(l, p.deadTime)
  128. availableEgresses := []string{}
  129. for _, n := range nodes {
  130. for e := range egress {
  131. if e == n {
  132. availableEgresses = append(availableEgresses, e)
  133. }
  134. }
  135. }
  136. chosen := availableEgresses[rand.Intn(len(availableEgresses)-1)]
  137. //fmt.Printf("proxying request for %s to peer %s\n", r.URL, chosen)
  138. // We need to send the request to the remote libp2p peer, so
  139. // we open a stream to it
  140. stream, err := p.host.Host().NewStream(context.Background(), peer.ID(chosen), protocol.EgressProtocol.ID())
  141. // If an error happens, we write an error for response.
  142. if err != nil {
  143. log.Println(err)
  144. http.Error(w, err.Error(), http.StatusInternalServerError)
  145. return
  146. }
  147. defer stream.Close()
  148. // r.Write() writes the HTTP request to the stream.
  149. err = r.Write(stream)
  150. if err != nil {
  151. stream.Reset()
  152. log.Println(err)
  153. http.Error(w, err.Error(), http.StatusServiceUnavailable)
  154. return
  155. }
  156. // Now we read the response that was sent from the dest
  157. // peer
  158. buf := bufio.NewReader(stream)
  159. resp, err := http.ReadResponse(buf, r)
  160. if err != nil {
  161. stream.Reset()
  162. log.Println(err)
  163. http.Error(w, err.Error(), http.StatusServiceUnavailable)
  164. return
  165. }
  166. // Copy any headers
  167. for k, v := range resp.Header {
  168. for _, s := range v {
  169. w.Header().Add(k, s)
  170. }
  171. }
  172. // Write response status and headers
  173. w.WriteHeader(resp.StatusCode)
  174. // Finally copy the body
  175. io.Copy(w, resp.Body)
  176. resp.Body.Close()
  177. }
  178. func EgressService(announceTime time.Duration) node.NetworkService {
  179. return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
  180. b.AnnounceUpdate(ctx, announceTime, protocol.EgressService, n.Host().ID().String(), "ok")
  181. return nil
  182. }
  183. }
  184. func Egress(announceTime time.Duration) []node.Option {
  185. return []node.Option{
  186. node.WithNetworkService(EgressService(announceTime)),
  187. node.WithStreamHandler(protocol.EgressProtocol, egressHandler),
  188. }
  189. }
  190. func Proxy(announceTime, deadtime time.Duration, listenAddr string) []node.Option {
  191. return []node.Option{
  192. node.WithNetworkService(ProxyService(announceTime, listenAddr, deadtime)),
  193. }
  194. }