123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- // Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
- //
- // This program is free software; you can redistribute it and/or modify
- // it under the terms of the GNU General Public License as published by
- // the Free Software Foundation; either version 2 of the License, or
- // (at your option) any later version.
- //
- // This program is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU General Public License for more details.
- //
- // You should have received a copy of the GNU General Public License along
- // with this program; if not, see <http://www.gnu.org/licenses/>.
- package services
- import (
- "bufio"
- "context"
- "io"
- "log"
- "math/rand"
- "net/http"
- "strings"
- "time"
- "github.com/libp2p/go-libp2p/core/network"
- "github.com/libp2p/go-libp2p/core/peer"
- "github.com/mudler/edgevpn/pkg/blockchain"
- "github.com/mudler/edgevpn/pkg/node"
- "github.com/mudler/edgevpn/pkg/protocol"
- "github.com/mudler/edgevpn/pkg/types"
- )
- func egressHandler(n *node.Node, b *blockchain.Ledger) func(stream network.Stream) {
- return func(stream network.Stream) {
- // Remember to close the stream when we are done.
- defer stream.Close()
- // Retrieve current ID for ip in the blockchain
- _, found := b.GetKey(protocol.UsersLedgerKey, stream.Conn().RemotePeer().String())
- // If mismatch, update the blockchain
- if !found {
- // ll.Debugf("Reset '%s': not found in the ledger", stream.Conn().RemotePeer().String())
- stream.Reset()
- return
- }
- // Create a new buffered reader, as ReadRequest needs one.
- // The buffered reader reads from our stream, on which we
- // have sent the HTTP request (see ServeHTTP())
- buf := bufio.NewReader(stream)
- // Read the HTTP request from the buffer
- req, err := http.ReadRequest(buf)
- if err != nil {
- stream.Reset()
- log.Println(err)
- return
- }
- defer req.Body.Close()
- // We need to reset these fields in the request
- // URL as they are not maintained.
- req.URL.Scheme = "http"
- hp := strings.Split(req.Host, ":")
- if len(hp) > 1 && hp[1] == "443" {
- req.URL.Scheme = "https"
- } else {
- req.URL.Scheme = "http"
- }
- req.URL.Host = req.Host
- outreq := new(http.Request)
- *outreq = *req
- // We now make the request
- //fmt.Printf("Making request to %s\n", req.URL)
- resp, err := http.DefaultTransport.RoundTrip(outreq)
- if err != nil {
- stream.Reset()
- log.Println(err)
- return
- }
- // resp.Write writes whatever response we obtained for our
- // request back to the stream.
- resp.Write(stream)
- }
- }
- // ProxyService starts a local http proxy server which redirects requests to egresses into the network
- // It takes a deadtime to consider hosts which are alive within a time window
- func ProxyService(announceTime time.Duration, listenAddr string, deadtime time.Duration) node.NetworkService {
- return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
- ps := &proxyService{
- host: n,
- listenAddr: listenAddr,
- deadTime: deadtime,
- }
- // Announce ourselves so nodes accepts our connection
- b.Announce(
- ctx,
- announceTime,
- func() {
- // Retrieve current ID for ip in the blockchain
- _, found := b.GetKey(protocol.UsersLedgerKey, n.Host().ID().String())
- // If mismatch, update the blockchain
- if !found {
- updatedMap := map[string]interface{}{}
- updatedMap[n.Host().ID().String()] = &types.User{
- PeerID: n.Host().ID().String(),
- Timestamp: time.Now().String(),
- }
- b.Add(protocol.UsersLedgerKey, updatedMap)
- }
- },
- )
- go ps.Serve()
- return nil
- }
- }
- type proxyService struct {
- host *node.Node
- listenAddr string
- deadTime time.Duration
- }
- func (p *proxyService) Serve() error {
- return http.ListenAndServe(p.listenAddr, p)
- }
- func (p *proxyService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- l, err := p.host.Ledger()
- if err != nil {
- //fmt.Printf("no ledger")
- return
- }
- egress := l.CurrentData()[protocol.EgressService]
- nodes := AvailableNodes(l, p.deadTime)
- availableEgresses := []string{}
- for _, n := range nodes {
- for e := range egress {
- if e == n {
- availableEgresses = append(availableEgresses, e)
- }
- }
- }
- chosen := availableEgresses[rand.Intn(len(availableEgresses)-1)]
- //fmt.Printf("proxying request for %s to peer %s\n", r.URL, chosen)
- // We need to send the request to the remote libp2p peer, so
- // we open a stream to it
- stream, err := p.host.Host().NewStream(context.Background(), peer.ID(chosen), protocol.EgressProtocol.ID())
- // If an error happens, we write an error for response.
- if err != nil {
- log.Println(err)
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- defer stream.Close()
- // r.Write() writes the HTTP request to the stream.
- err = r.Write(stream)
- if err != nil {
- stream.Reset()
- log.Println(err)
- http.Error(w, err.Error(), http.StatusServiceUnavailable)
- return
- }
- // Now we read the response that was sent from the dest
- // peer
- buf := bufio.NewReader(stream)
- resp, err := http.ReadResponse(buf, r)
- if err != nil {
- stream.Reset()
- log.Println(err)
- http.Error(w, err.Error(), http.StatusServiceUnavailable)
- return
- }
- // Copy any headers
- for k, v := range resp.Header {
- for _, s := range v {
- w.Header().Add(k, s)
- }
- }
- // Write response status and headers
- w.WriteHeader(resp.StatusCode)
- // Finally copy the body
- io.Copy(w, resp.Body)
- resp.Body.Close()
- }
- func EgressService(announceTime time.Duration) node.NetworkService {
- return func(ctx context.Context, c node.Config, n *node.Node, b *blockchain.Ledger) error {
- b.AnnounceUpdate(ctx, announceTime, protocol.EgressService, n.Host().ID().String(), "ok")
- return nil
- }
- }
- func Egress(announceTime time.Duration) []node.Option {
- return []node.Option{
- node.WithNetworkService(EgressService(announceTime)),
- node.WithStreamHandler(protocol.EgressProtocol, egressHandler),
- }
- }
- func Proxy(announceTime, deadtime time.Duration, listenAddr string) []node.Option {
- return []node.Option{
- node.WithNetworkService(ProxyService(announceTime, listenAddr, deadtime)),
- }
- }
|