123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- /*
- Copyright © 2021-2022 Ettore Di Giacinto <[email protected]>
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package node
- import (
- "context"
- "fmt"
- "sync"
- "time"
- "github.com/ipfs/go-log"
- "github.com/libp2p/go-libp2p"
- "github.com/libp2p/go-libp2p/core/host"
- "github.com/libp2p/go-libp2p/core/network"
- "github.com/libp2p/go-libp2p/p2p/net/conngater"
- "github.com/mudler/edgevpn/pkg/crypto"
- protocol "github.com/mudler/edgevpn/pkg/protocol"
- "github.com/mudler/edgevpn/pkg/blockchain"
- hub "github.com/mudler/edgevpn/pkg/hub"
- "github.com/mudler/edgevpn/pkg/logger"
- )
- type Node struct {
- config Config
- MessageHub *hub.MessageHub
- //HubRoom *hub.Room
- inputCh chan *hub.Message
- genericHubCh chan *hub.Message
- seed int64
- host host.Host
- cg *conngater.BasicConnectionGater
- ledger *blockchain.Ledger
- sync.Mutex
- }
- const defaultChanSize = 3000
- var defaultLibp2pOptions = []libp2p.Option{
- libp2p.EnableNATService(),
- libp2p.NATPortMap(),
- }
- func New(p ...Option) (*Node, error) {
- c := &Config{
- DiscoveryInterval: 5 * time.Minute,
- StreamHandlers: make(map[protocol.Protocol]StreamHandler),
- LedgerAnnounceTime: 5 * time.Second,
- LedgerSyncronizationTime: 5 * time.Second,
- SealKeyLength: defaultKeyLength,
- Options: defaultLibp2pOptions,
- Logger: logger.New(log.LevelDebug),
- Sealer: &crypto.AESSealer{},
- Store: &blockchain.MemoryStore{},
- }
- if err := c.Apply(p...); err != nil {
- return nil, err
- }
- return &Node{
- config: *c,
- inputCh: make(chan *hub.Message, defaultChanSize),
- genericHubCh: make(chan *hub.Message, defaultChanSize),
- seed: 0,
- }, nil
- }
- // Ledger return the ledger which uses the node
- // connection to broadcast messages
- func (e *Node) Ledger() (*blockchain.Ledger, error) {
- e.Lock()
- defer e.Unlock()
- if e.ledger != nil {
- return e.ledger, nil
- }
- mw, err := e.messageWriter()
- if err != nil {
- return nil, err
- }
- e.ledger = blockchain.New(mw, e.config.Store)
- return e.ledger, nil
- }
- // PeerGater returns the node peergater
- func (e *Node) PeerGater() Gater {
- return e.config.PeerGater
- }
- // Start joins the node over the p2p network
- func (e *Node) Start(ctx context.Context) error {
- ledger, err := e.Ledger()
- if err != nil {
- return err
- }
- // Set the handler when we receive messages
- // The ledger needs to read them and update the internal blockchain
- e.config.Handlers = append(e.config.Handlers, ledger.Update)
- e.config.Logger.Info("Starting EdgeVPN network")
- // Startup libp2p network
- err = e.startNetwork(ctx)
- if err != nil {
- return err
- }
- // Send periodically messages to the channel with our blockchain content
- ledger.Syncronizer(ctx, e.config.LedgerSyncronizationTime)
- // Start eventual declared NetworkServices
- for _, s := range e.config.NetworkServices {
- err := s(ctx, e.config, e, ledger)
- if err != nil {
- return fmt.Errorf("error while starting network service: '%w'", err)
- }
- }
- return nil
- }
- // messageWriter returns a new MessageWriter bound to the edgevpn instance
- // with the given options
- func (e *Node) messageWriter(opts ...hub.MessageOption) (*messageWriter, error) {
- mess := &hub.Message{}
- mess.Apply(opts...)
- return &messageWriter{
- c: e.config,
- input: e.inputCh,
- mess: mess,
- }, nil
- }
- func (e *Node) startNetwork(ctx context.Context) error {
- e.config.Logger.Debug("Generating host data")
- host, err := e.genHost(ctx)
- if err != nil {
- e.config.Logger.Error(err.Error())
- return err
- }
- e.host = host
- ledger, err := e.Ledger()
- if err != nil {
- return err
- }
- for pid, strh := range e.config.StreamHandlers {
- host.SetStreamHandler(pid.ID(), network.StreamHandler(strh(e, ledger)))
- }
- e.config.Logger.Info("Node ID:", host.ID())
- e.config.Logger.Info("Node Addresses:", host.Addrs())
- // Hub rotates within sealkey interval.
- // this time length should be enough to make room for few block exchanges. This is ideally on minutes (10, 20, etc. )
- // it makes sure that if a bruteforce is attempted over the encrypted messages, the real key is not exposed.
- e.MessageHub = hub.NewHub(e.config.RoomName, e.config.MaxMessageSize, e.config.SealKeyLength, e.config.SealKeyInterval, e.config.GenericHub)
- for _, sd := range e.config.ServiceDiscovery {
- if err := sd.Run(e.config.Logger, ctx, host); err != nil {
- e.config.Logger.Fatal(fmt.Errorf("while starting service discovery %+v: '%w", sd, err))
- }
- }
- go e.handleEvents(ctx, e.inputCh, e.MessageHub.Messages, e.MessageHub.PublishMessage, e.config.Handlers, true)
- go e.MessageHub.Start(ctx, host)
- // If generic hub is enabled one is created separately with a set of generic channel handlers associated with.
- // note peergating is disabled in order to freely exchange messages that can be used for authentication or for other public means.
- if e.config.GenericHub {
- go e.handleEvents(ctx, e.genericHubCh, e.MessageHub.PublicMessages, e.MessageHub.PublishPublicMessage, e.config.GenericChannelHandler, false)
- }
- e.config.Logger.Debug("Network started")
- return nil
- }
- // PublishMessage publishes a message to the generic channel (if enabled)
- // See GenericChannelHandlers(..) to attach handlers to receive messages from this channel.
- func (e *Node) PublishMessage(m *hub.Message) error {
- if !e.config.GenericHub {
- return fmt.Errorf("generic hub disabled")
- }
- e.genericHubCh <- m
- return nil
- }
|