Browse Source

:gear: Fork libp2p-connmanager, adapt it for streams

We have a connmanager for connections, and the code could be actually
re-used for streams too. This is a merely simple fork of
libp2p-connmanager to adapt it to streams.

It also consume streammanager when VPN low-profile is disabled
Ettore Di Giacinto 3 years ago
parent
commit
d0201074e4
9 changed files with 1128 additions and 29 deletions
  1. 10 3
      cmd/util.go
  2. 1 0
      go.mod
  3. 4 3
      pkg/config/config.go
  4. 2 2
      pkg/services/services.go
  5. 379 0
      pkg/stream/decay.go
  6. 578 0
      pkg/stream/manager.go
  7. 77 0
      pkg/stream/options.go
  8. 14 5
      pkg/vpn/config.go
  9. 63 16
      pkg/vpn/vpn.go

+ 10 - 3
cmd/util.go

@@ -154,8 +154,14 @@ var CommonFlags []cli.Flag = []cli.Flag{
 	},
 	},
 	&cli.BoolFlag{
 	&cli.BoolFlag{
 		Name:   "low-profile-vpn",
 		Name:   "low-profile-vpn",
-		Usage:  "Enable low profile on vpn. Doesn't keep open connections",
-		EnvVar: "EDGEVPNVPNLOWPROFILE",
+		Usage:  "Enable low profile on VPN",
+		EnvVar: "EDGEVPNLOWPROFILEVPNN",
+	},
+	&cli.IntFlag{
+		Name:   "max-streams",
+		Usage:  "Number of concurrent streams",
+		Value:  100,
+		EnvVar: "EDGEVPNMAXSTREAMS",
 	},
 	},
 	&cli.StringFlag{
 	&cli.StringFlag{
 		Name:   "log-level",
 		Name:   "log-level",
@@ -201,13 +207,13 @@ func cliToOpts(c *cli.Context) ([]node.Option, []vpn.Option, *logger.Logger) {
 		Libp2pLogLevel:    c.String("libp2p-log-level"),
 		Libp2pLogLevel:    c.String("libp2p-log-level"),
 		LogLevel:          c.String("log-level"),
 		LogLevel:          c.String("log-level"),
 		LowProfile:        c.Bool("low-profile"),
 		LowProfile:        c.Bool("low-profile"),
+		VPNLowProfile:     c.Bool("low-profile-vpn"),
 		Blacklist:         c.StringSlice("blacklist"),
 		Blacklist:         c.StringSlice("blacklist"),
 		Concurrency:       c.Int("concurrency"),
 		Concurrency:       c.Int("concurrency"),
 		FrameTimeout:      c.String("timeout"),
 		FrameTimeout:      c.String("timeout"),
 		ChannelBufferSize: c.Int("channel-buffer-size"),
 		ChannelBufferSize: c.Int("channel-buffer-size"),
 		InterfaceMTU:      c.Int("mtu"),
 		InterfaceMTU:      c.Int("mtu"),
 		PacketMTU:         c.Int("packet-mtu"),
 		PacketMTU:         c.Int("packet-mtu"),
-		LowProfileVPN:     c.Bool("low-profile-vpn"),
 		Ledger: config.Ledger{
 		Ledger: config.Ledger{
 			StateDir:         c.String("ledger-state"),
 			StateDir:         c.String("ledger-state"),
 			AnnounceInterval: time.Duration(c.Int("ledger-announce-interval")) * time.Second,
 			AnnounceInterval: time.Duration(c.Int("ledger-announce-interval")) * time.Second,
@@ -230,6 +236,7 @@ func cliToOpts(c *cli.Context) ([]node.Option, []vpn.Option, *logger.Logger) {
 		Connection: config.Connection{
 		Connection: config.Connection{
 			AutoRelay:      c.Bool("autorelay"),
 			AutoRelay:      c.Bool("autorelay"),
 			MaxConnections: c.Int("max-connections"),
 			MaxConnections: c.Int("max-connections"),
+			MaxStreams:     c.Int("max-streams"),
 			HolePunch:      c.Bool("holepunch"),
 			HolePunch:      c.Bool("holepunch"),
 		},
 		},
 	}
 	}

+ 1 - 0
go.mod

@@ -3,6 +3,7 @@ module github.com/mudler/edgevpn
 go 1.16
 go 1.16
 
 
 require (
 require (
+	github.com/benbjohnson/clock v1.1.0
 	github.com/c-robinson/iplib v1.0.3
 	github.com/c-robinson/iplib v1.0.3
 	github.com/cespare/xxhash/v2 v2.1.2 // indirect
 	github.com/cespare/xxhash/v2 v2.1.2 // indirect
 	github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
 	github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect

+ 4 - 3
pkg/config/config.go

@@ -42,12 +42,11 @@ type Config struct {
 	Router                                     string
 	Router                                     string
 	Interface                                  string
 	Interface                                  string
 	Libp2pLogLevel, LogLevel                   string
 	Libp2pLogLevel, LogLevel                   string
-	LowProfile                                 bool
+	LowProfile, VPNLowProfile                  bool
 	Blacklist                                  []string
 	Blacklist                                  []string
 	Concurrency                                int
 	Concurrency                                int
 	FrameTimeout                               string
 	FrameTimeout                               string
 	ChannelBufferSize, InterfaceMTU, PacketMTU int
 	ChannelBufferSize, InterfaceMTU, PacketMTU int
-	LowProfileVPN                              bool
 	NAT                                        NAT
 	NAT                                        NAT
 	Connection                                 Connection
 	Connection                                 Connection
 	Discovery                                  Discovery
 	Discovery                                  Discovery
@@ -74,6 +73,7 @@ type Connection struct {
 	HolePunch      bool
 	HolePunch      bool
 	AutoRelay      bool
 	AutoRelay      bool
 	MaxConnections int
 	MaxConnections int
+	MaxStreams     int
 }
 }
 
 
 // NAT is the structure relative to NAT configuration settings
 // NAT is the structure relative to NAT configuration settings
@@ -169,9 +169,10 @@ func (c Config) ToOpts(l *logger.Logger) ([]node.Option, []vpn.Option, error) {
 		vpn.WithPacketMTU(c.PacketMTU),
 		vpn.WithPacketMTU(c.PacketMTU),
 		vpn.WithRouterAddress(router),
 		vpn.WithRouterAddress(router),
 		vpn.WithInterfaceName(iface),
 		vpn.WithInterfaceName(iface),
+		vpn.WithMaxStreams(c.Connection.MaxStreams),
 	}
 	}
 
 
-	if c.LowProfileVPN {
+	if c.VPNLowProfile {
 		vpnOpts = append(vpnOpts, vpn.LowProfile)
 		vpnOpts = append(vpnOpts, vpn.LowProfile)
 	}
 	}
 
 

+ 2 - 2
pkg/services/services.go

@@ -179,6 +179,6 @@ func ConnectToService(ctx context.Context, ledger *blockchain.Ledger, node *node
 }
 }
 
 
 func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
 func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
-	_, _ = io.Copy(dst, src)
-	closer <- struct{}{} // connection is closed, send signal to stop proxy
+	defer func() { closer <- struct{}{} }() // connection is closed, send signal to stop proxy
+	io.Copy(dst, src)
 }
 }

+ 379 - 0
pkg/stream/decay.go

@@ -0,0 +1,379 @@
+// The MIT License (MIT)
+
+// Copyright (c) 2017 Whyrusleeping
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+// This package is a port of go-libp2p-connmgr, but adapted for streams
+
+package stream
+
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/libp2p/go-libp2p-core/connmgr"
+	"github.com/libp2p/go-libp2p-core/peer"
+
+	"github.com/benbjohnson/clock"
+)
+
+// DefaultResolution is the default resolution of the decay tracker.
+var DefaultResolution = 1 * time.Minute
+
+// bumpCmd represents a bump command.
+type bumpCmd struct {
+	peer  peer.ID
+	tag   *decayingTag
+	delta int
+}
+
+// removeCmd represents a tag removal command.
+type removeCmd struct {
+	peer peer.ID
+	tag  *decayingTag
+}
+
+// decayer tracks and manages all decaying tags and their values.
+type decayer struct {
+	cfg   *DecayerCfg
+	mgr   *Manager
+	clock clock.Clock // for testing.
+
+	tagsMu    sync.Mutex
+	knownTags map[string]*decayingTag
+
+	// lastTick stores the last time the decayer ticked. Guarded by atomic.
+	lastTick atomic.Value
+
+	// bumpTagCh queues bump commands to be processed by the loop.
+	bumpTagCh   chan bumpCmd
+	removeTagCh chan removeCmd
+	closeTagCh  chan *decayingTag
+
+	// closure thingies.
+	closeCh chan struct{}
+	doneCh  chan struct{}
+	err     error
+}
+
+var _ connmgr.Decayer = (*decayer)(nil)
+
+// DecayerCfg is the configuration object for the Decayer.
+type DecayerCfg struct {
+	Resolution time.Duration
+	Clock      clock.Clock
+}
+
+// WithDefaults writes the default values on this DecayerConfig instance,
+// and returns itself for chainability.
+//
+//  cfg := (&DecayerCfg{}).WithDefaults()
+//  cfg.Resolution = 30 * time.Second
+//  t := NewDecayer(cfg, cm)
+func (cfg *DecayerCfg) WithDefaults() *DecayerCfg {
+	cfg.Resolution = DefaultResolution
+	return cfg
+}
+
+// NewDecayer creates a new decaying tag registry.
+func NewDecayer(cfg *DecayerCfg, mgr *Manager) (*decayer, error) {
+	// use real time if the Clock in the config is nil.
+	if cfg.Clock == nil {
+		cfg.Clock = clock.New()
+	}
+
+	d := &decayer{
+		cfg:         cfg,
+		mgr:         mgr,
+		clock:       cfg.Clock,
+		knownTags:   make(map[string]*decayingTag),
+		bumpTagCh:   make(chan bumpCmd, 128),
+		removeTagCh: make(chan removeCmd, 128),
+		closeTagCh:  make(chan *decayingTag, 128),
+		closeCh:     make(chan struct{}),
+		doneCh:      make(chan struct{}),
+	}
+
+	d.lastTick.Store(d.clock.Now())
+
+	// kick things off.
+	go d.process()
+
+	return d, nil
+}
+
+func (d *decayer) RegisterDecayingTag(name string, interval time.Duration, decayFn connmgr.DecayFn, bumpFn connmgr.BumpFn) (connmgr.DecayingTag, error) {
+	d.tagsMu.Lock()
+	defer d.tagsMu.Unlock()
+
+	if _, ok := d.knownTags[name]; ok {
+		return nil, fmt.Errorf("decaying tag with name %s already exists", name)
+	}
+
+	if interval < d.cfg.Resolution {
+		log.Warnf("decay interval for %s (%s) was lower than tracker's resolution (%s); overridden to resolution",
+			name, interval, d.cfg.Resolution)
+		interval = d.cfg.Resolution
+	}
+
+	if interval%d.cfg.Resolution != 0 {
+		log.Warnf("decay interval for tag %s (%s) is not a multiple of tracker's resolution (%s); "+
+			"some precision may be lost", name, interval, d.cfg.Resolution)
+	}
+
+	lastTick := d.lastTick.Load().(time.Time)
+	tag := &decayingTag{
+		trkr:     d,
+		name:     name,
+		interval: interval,
+		nextTick: lastTick.Add(interval),
+		decayFn:  decayFn,
+		bumpFn:   bumpFn,
+	}
+
+	d.knownTags[name] = tag
+	return tag, nil
+}
+
+// Close closes the Decayer. It is idempotent.
+func (d *decayer) Close() error {
+	select {
+	case <-d.doneCh:
+		return d.err
+	default:
+	}
+
+	close(d.closeCh)
+	<-d.doneCh
+	return d.err
+}
+
+// process is the heart of the tracker. It performs the following duties:
+//
+//  1. Manages decay.
+//  2. Applies score bumps.
+//  3. Yields when closed.
+func (d *decayer) process() {
+	defer close(d.doneCh)
+
+	ticker := d.clock.Ticker(d.cfg.Resolution)
+	defer ticker.Stop()
+
+	var (
+		bmp   bumpCmd
+		now   time.Time
+		visit = make(map[*decayingTag]struct{})
+	)
+
+	for {
+		select {
+		case now = <-ticker.C:
+			d.lastTick.Store(now)
+
+			d.tagsMu.Lock()
+			for _, tag := range d.knownTags {
+				if tag.nextTick.After(now) {
+					// skip the tag.
+					continue
+				}
+				// Mark the tag to be updated in this round.
+				visit[tag] = struct{}{}
+			}
+			d.tagsMu.Unlock()
+
+			// Visit each peer, and decay tags that need to be decayed.
+			for _, s := range d.mgr.segments {
+				s.Lock()
+
+				// Entered a segment that contains peers. Process each peer.
+				for _, p := range s.peers {
+					for tag, v := range p.decaying {
+						if _, ok := visit[tag]; !ok {
+							// skip this tag.
+							continue
+						}
+
+						// ~ this value needs to be visited. ~
+						var delta int
+						if after, rm := tag.decayFn(*v); rm {
+							// delete the value and move on to the next tag.
+							delta -= v.Value
+							delete(p.decaying, tag)
+						} else {
+							// accumulate the delta, and apply the changes.
+							delta += after - v.Value
+							v.Value, v.LastVisit = after, now
+						}
+						p.value += delta
+					}
+				}
+
+				s.Unlock()
+			}
+
+			// Reset each tag's next visit round, and clear the visited set.
+			for tag := range visit {
+				tag.nextTick = tag.nextTick.Add(tag.interval)
+				delete(visit, tag)
+			}
+
+		case bmp = <-d.bumpTagCh:
+			var (
+				now       = d.clock.Now()
+				peer, tag = bmp.peer, bmp.tag
+			)
+
+			s := d.mgr.segments.get(peer)
+			s.Lock()
+
+			p := s.tagInfoFor(peer)
+			v, ok := p.decaying[tag]
+			if !ok {
+				v = &connmgr.DecayingValue{
+					Tag:       tag,
+					Peer:      peer,
+					LastVisit: now,
+					Added:     now,
+					Value:     0,
+				}
+				p.decaying[tag] = v
+			}
+
+			prev := v.Value
+			v.Value, v.LastVisit = v.Tag.(*decayingTag).bumpFn(*v, bmp.delta), now
+			p.value += v.Value - prev
+
+			s.Unlock()
+
+		case rm := <-d.removeTagCh:
+			s := d.mgr.segments.get(rm.peer)
+			s.Lock()
+
+			p := s.tagInfoFor(rm.peer)
+			v, ok := p.decaying[rm.tag]
+			if !ok {
+				s.Unlock()
+				continue
+			}
+			p.value -= v.Value
+			delete(p.decaying, rm.tag)
+			s.Unlock()
+
+		case t := <-d.closeTagCh:
+			// Stop tracking the tag.
+			d.tagsMu.Lock()
+			delete(d.knownTags, t.name)
+			d.tagsMu.Unlock()
+
+			// Remove the tag from all peers that had it in the connmgr.
+			for _, s := range d.mgr.segments {
+				// visit all segments, and attempt to remove the tag from all the peers it stores.
+				s.Lock()
+				for _, p := range s.peers {
+					if dt, ok := p.decaying[t]; ok {
+						// decrease the value of the tagInfo, and delete the tag.
+						p.value -= dt.Value
+						delete(p.decaying, t)
+					}
+				}
+				s.Unlock()
+			}
+
+		case <-d.closeCh:
+			return
+		}
+	}
+}
+
+// decayingTag represents a decaying tag, with an associated decay interval, a
+// decay function, and a bump function.
+type decayingTag struct {
+	trkr     *decayer
+	name     string
+	interval time.Duration
+	nextTick time.Time
+	decayFn  connmgr.DecayFn
+	bumpFn   connmgr.BumpFn
+
+	// closed marks this tag as closed, so that if it's bumped after being
+	// closed, we can return an error. 0 = false; 1 = true; guarded by atomic.
+	closed int32
+}
+
+var _ connmgr.DecayingTag = (*decayingTag)(nil)
+
+func (t *decayingTag) Name() string {
+	return t.name
+}
+
+func (t *decayingTag) Interval() time.Duration {
+	return t.interval
+}
+
+// Bump bumps a tag for this peer.
+func (t *decayingTag) Bump(p peer.ID, delta int) error {
+	if atomic.LoadInt32(&t.closed) == 1 {
+		return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name)
+	}
+
+	bmp := bumpCmd{peer: p, tag: t, delta: delta}
+
+	select {
+	case t.trkr.bumpTagCh <- bmp:
+		return nil
+	default:
+		return fmt.Errorf(
+			"unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)",
+			p.Pretty(), t.name, delta, len(t.trkr.bumpTagCh))
+	}
+}
+
+func (t *decayingTag) Remove(p peer.ID) error {
+	if atomic.LoadInt32(&t.closed) == 1 {
+		return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name)
+	}
+
+	rm := removeCmd{peer: p, tag: t}
+
+	select {
+	case t.trkr.removeTagCh <- rm:
+		return nil
+	default:
+		return fmt.Errorf(
+			"unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)",
+			p.Pretty(), t.name, len(t.trkr.removeTagCh))
+	}
+}
+
+func (t *decayingTag) Close() error {
+	if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
+		log.Warnf("duplicate decaying tag closure: %s; skipping", t.name)
+		return nil
+	}
+
+	select {
+	case t.trkr.closeTagCh <- t:
+		return nil
+	default:
+		return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh))
+	}
+}

+ 578 - 0
pkg/stream/manager.go

@@ -0,0 +1,578 @@
+// The MIT License (MIT)
+
+// Copyright (c) 2017 Whyrusleeping (MIT)
+// Copyright (c) 2022 Ettore Di Giacinto (GPL-3)
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+// This package is a port of go-libp2p-connmgr, but adapted for streams
+
+package stream
+
+import (
+	"context"
+	"errors"
+	"sort"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/libp2p/go-libp2p-core/connmgr"
+	"github.com/libp2p/go-libp2p-core/network"
+	"github.com/libp2p/go-libp2p-core/peer"
+
+	logging "github.com/ipfs/go-log/v2"
+)
+
+var log = logging.Logger("peermgr")
+
+// Manager is a ConnManager that trims connections whenever the count exceeds the
+// high watermark. New connections are given a grace period before they're subject
+// to trimming. Trims are automatically run on demand, only if the time from the
+// previous trim is higher than 10 seconds. Furthermore, trims can be explicitly
+// requested through the public interface of this struct (see TrimOpenConns).
+//
+// See configuration parameters in NewConnManager.
+type Manager struct {
+	*decayer
+
+	cfg      *config
+	segments segments
+
+	plk       sync.RWMutex
+	protected map[peer.ID]map[string]struct{}
+
+	// channel-based semaphore that enforces only a single trim is in progress
+	trimMutex sync.Mutex
+	connCount int32
+	// to be accessed atomically. This is mimicking the implementation of a sync.Once.
+	// Take care of correct alignment when modifying this struct.
+	trimCount uint64
+
+	lastTrimMu sync.RWMutex
+	lastTrim   time.Time
+
+	refCount                sync.WaitGroup
+	ctx                     context.Context
+	cancel                  func()
+	unregisterMemoryWatcher func()
+}
+
+type segment struct {
+	sync.Mutex
+	peers map[peer.ID]*peerInfo
+}
+
+type segments [256]*segment
+
+func (ss *segments) get(p peer.ID) *segment {
+	return ss[byte(p[len(p)-1])]
+}
+
+func (ss *segments) countPeers() (count int) {
+	for _, seg := range ss {
+		seg.Lock()
+		count += len(seg.peers)
+		seg.Unlock()
+	}
+	return count
+}
+
+func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
+	pi, ok := s.peers[p]
+	if ok {
+		return pi
+	}
+	// create a temporary peer to buffer early tags before the Connected notification arrives.
+	pi = &peerInfo{
+		id:        p,
+		firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
+		temp:      true,
+		tags:      make(map[string]int),
+		decaying:  make(map[*decayingTag]*connmgr.DecayingValue),
+		conns:     make(map[network.Stream]time.Time),
+	}
+	s.peers[p] = pi
+	return pi
+}
+
+// NewConnManager creates a new Manager with the provided params:
+// lo and hi are watermarks governing the number of connections that'll be maintained.
+// When the peer count exceeds the 'high watermark', as many peers will be pruned (and
+// their connections terminated) until 'low watermark' peers remain.
+func NewConnManager(low, hi int, opts ...Option) (*Manager, error) {
+	cfg := &config{
+		highWater:     hi,
+		lowWater:      low,
+		gracePeriod:   10 * time.Second,
+		silencePeriod: 10 * time.Second,
+	}
+	for _, o := range opts {
+		if err := o(cfg); err != nil {
+			return nil, err
+		}
+	}
+
+	if cfg.decayer == nil {
+		// Set the default decayer config.
+		cfg.decayer = (&DecayerCfg{}).WithDefaults()
+	}
+
+	cm := &Manager{
+		cfg:       cfg,
+		protected: make(map[peer.ID]map[string]struct{}, 16),
+		segments: func() (ret segments) {
+			for i := range ret {
+				ret[i] = &segment{
+					peers: make(map[peer.ID]*peerInfo),
+				}
+			}
+			return ret
+		}(),
+	}
+	cm.ctx, cm.cancel = context.WithCancel(context.Background())
+
+	decay, _ := NewDecayer(cfg.decayer, cm)
+	cm.decayer = decay
+
+	cm.refCount.Add(1)
+	go cm.background()
+	return cm, nil
+}
+
+func (cm *Manager) Close() error {
+	cm.cancel()
+	if cm.unregisterMemoryWatcher != nil {
+		cm.unregisterMemoryWatcher()
+	}
+	if err := cm.decayer.Close(); err != nil {
+		return err
+	}
+	cm.refCount.Wait()
+	return nil
+}
+
+func (cm *Manager) Protect(id peer.ID, tag string) {
+	cm.plk.Lock()
+	defer cm.plk.Unlock()
+
+	tags, ok := cm.protected[id]
+	if !ok {
+		tags = make(map[string]struct{}, 2)
+		cm.protected[id] = tags
+	}
+	tags[tag] = struct{}{}
+}
+
+func (cm *Manager) Unprotect(id peer.ID, tag string) (protected bool) {
+	cm.plk.Lock()
+	defer cm.plk.Unlock()
+
+	tags, ok := cm.protected[id]
+	if !ok {
+		return false
+	}
+	if delete(tags, tag); len(tags) == 0 {
+		delete(cm.protected, id)
+		return false
+	}
+	return true
+}
+
+func (cm *Manager) IsProtected(id peer.ID, tag string) (protected bool) {
+	cm.plk.Lock()
+	defer cm.plk.Unlock()
+
+	tags, ok := cm.protected[id]
+	if !ok {
+		return false
+	}
+
+	if tag == "" {
+		return true
+	}
+
+	_, protected = tags[tag]
+	return protected
+}
+
+// peerInfo stores metadata for a given peer.
+type peerInfo struct {
+	id       peer.ID
+	tags     map[string]int                          // value for each tag
+	decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags
+
+	value int  // cached sum of all tag values
+	temp  bool // this is a temporary entry holding early tags, and awaiting connections
+
+	conns map[network.Stream]time.Time // start time of each connection
+
+	firstSeen time.Time // timestamp when we began tracking this peer.
+}
+
+type peerInfos []peerInfo
+
+func (p peerInfos) SortByValue() {
+	sort.Slice(p, func(i, j int) bool {
+		left, right := p[i], p[j]
+		// temporary peers are preferred for pruning.
+		if left.temp != right.temp {
+			return left.temp
+		}
+		// otherwise, compare by value.
+		return left.value < right.value
+	})
+}
+
+// TrimOpenConns closes the connections of as many peers as needed to make the peer count
+// equal the low watermark. Peers are sorted in ascending order based on their total value,
+// pruning those peers with the lowest scores first, as long as they are not within their
+// grace period.
+//
+// This function blocks until a trim is completed. If a trim is underway, a new
+// one won't be started, and instead it'll wait until that one is completed before
+// returning.
+func (cm *Manager) TrimOpenConns(_ context.Context) {
+	// TODO: error return value so we can cleanly signal we are aborting because:
+	// (a) there's another trim in progress, or (b) the silence period is in effect.
+
+	cm.doTrim()
+}
+
+func (cm *Manager) background() {
+	defer cm.refCount.Done()
+
+	interval := cm.cfg.gracePeriod / 2
+	if cm.cfg.silencePeriod != 0 {
+		interval = cm.cfg.silencePeriod
+	}
+
+	ticker := time.NewTicker(interval)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
+				// Below high water, skip.
+				continue
+			}
+		case <-cm.ctx.Done():
+			return
+		}
+		cm.trim()
+	}
+}
+
+func (cm *Manager) doTrim() {
+	// This logic is mimicking the implementation of sync.Once in the standard library.
+	count := atomic.LoadUint64(&cm.trimCount)
+	cm.trimMutex.Lock()
+	defer cm.trimMutex.Unlock()
+	if count == atomic.LoadUint64(&cm.trimCount) {
+		cm.trim()
+		cm.lastTrimMu.Lock()
+		cm.lastTrim = time.Now()
+		cm.lastTrimMu.Unlock()
+		atomic.AddUint64(&cm.trimCount, 1)
+	}
+}
+
+// trim starts the trim, if the last trim happened before the configured silence period.
+func (cm *Manager) trim() {
+	// do the actual trim.
+	for _, c := range cm.getConnsToClose() {
+		c.Close()
+	}
+}
+
+// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
+// connections to close.
+func (cm *Manager) getConnsToClose() []network.Stream {
+	if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
+		// disabled
+		return nil
+	}
+
+	if int(atomic.LoadInt32(&cm.connCount)) <= cm.cfg.lowWater {
+		log.Info("open connection count below limit")
+		return nil
+	}
+
+	candidates := make(peerInfos, 0, cm.segments.countPeers())
+	var ncandidates int
+	gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)
+
+	cm.plk.RLock()
+	for _, s := range cm.segments {
+		s.Lock()
+		for id, inf := range s.peers {
+			if _, ok := cm.protected[id]; ok {
+				// skip over protected peer.
+				continue
+			}
+			if inf.firstSeen.After(gracePeriodStart) {
+				// skip peers in the grace period.
+				continue
+			}
+			// note that we're copying the entry here,
+			// but since inf.conns is a map, it will still point to the original object
+			candidates = append(candidates, *inf)
+			ncandidates += len(inf.conns)
+		}
+		s.Unlock()
+	}
+	cm.plk.RUnlock()
+
+	if ncandidates < cm.cfg.lowWater {
+		log.Info("open connection count above limit but too many are in the grace period")
+		// We have too many connections but fewer than lowWater
+		// connections out of the grace period.
+		//
+		// If we trimmed now, we'd kill potentially useful connections.
+		return nil
+	}
+
+	// Sort peers according to their value.
+	candidates.SortByValue()
+
+	target := ncandidates - cm.cfg.lowWater
+
+	// slightly overallocate because we may have more than one conns per peer
+	selected := make([]network.Stream, 0, target+10)
+
+	for _, inf := range candidates {
+		if target <= 0 {
+			break
+		}
+
+		// lock this to protect from concurrent modifications from connect/disconnect events
+		s := cm.segments.get(inf.id)
+		s.Lock()
+		if len(inf.conns) == 0 && inf.temp {
+			// handle temporary entries for early tags -- this entry has gone past the grace period
+			// and still holds no connections, so prune it.
+			delete(s.peers, inf.id)
+		} else {
+			for c := range inf.conns {
+				selected = append(selected, c)
+			}
+			target -= len(inf.conns)
+		}
+		s.Unlock()
+	}
+
+	return selected
+}
+
+// GetTagInfo is called to fetch the tag information associated with a given
+// peer, nil is returned if p refers to an unknown peer.
+func (cm *Manager) GetTagInfo(p peer.ID) *connmgr.TagInfo {
+	s := cm.segments.get(p)
+	s.Lock()
+	defer s.Unlock()
+
+	pi, ok := s.peers[p]
+	if !ok {
+		return nil
+	}
+
+	out := &connmgr.TagInfo{
+		FirstSeen: pi.firstSeen,
+		Value:     pi.value,
+		Tags:      make(map[string]int),
+		Conns:     make(map[string]time.Time),
+	}
+
+	for t, v := range pi.tags {
+		out.Tags[t] = v
+	}
+	for t, v := range pi.decaying {
+		out.Tags[t.name] = v.Value
+	}
+	for c, t := range pi.conns {
+		out.Conns[c.ID()] = t
+	}
+
+	return out
+}
+
+// TagPeer is called to associate a string and integer with a given peer.
+func (cm *Manager) TagPeer(p peer.ID, tag string, val int) {
+	s := cm.segments.get(p)
+	s.Lock()
+	defer s.Unlock()
+
+	pi := s.tagInfoFor(p)
+
+	// Update the total value of the peer.
+	pi.value += val - pi.tags[tag]
+	pi.tags[tag] = val
+}
+
+// UntagPeer is called to disassociate a string and integer from a given peer.
+func (cm *Manager) UntagPeer(p peer.ID, tag string) {
+	s := cm.segments.get(p)
+	s.Lock()
+	defer s.Unlock()
+
+	pi, ok := s.peers[p]
+	if !ok {
+		log.Info("tried to remove tag from untracked peer: ", p)
+		return
+	}
+
+	// Update the total value of the peer.
+	pi.value -= pi.tags[tag]
+	delete(pi.tags, tag)
+}
+
+// UpsertTag is called to insert/update a peer tag
+func (cm *Manager) UpsertTag(p peer.ID, tag string, upsert func(int) int) {
+	s := cm.segments.get(p)
+	s.Lock()
+	defer s.Unlock()
+
+	pi := s.tagInfoFor(p)
+
+	oldval := pi.tags[tag]
+	newval := upsert(oldval)
+	pi.value += newval - oldval
+	pi.tags[tag] = newval
+}
+
+// CMInfo holds the configuration for Manager, as well as status data.
+type CMInfo struct {
+	// The low watermark, as described in NewConnManager.
+	LowWater int
+
+	// The high watermark, as described in NewConnManager.
+	HighWater int
+
+	// The timestamp when the last trim was triggered.
+	LastTrim time.Time
+
+	// The configured grace period, as described in NewConnManager.
+	GracePeriod time.Duration
+
+	// The current connection count.
+	ConnCount int
+}
+
+// GetInfo returns the configuration and status data for this connection manager.
+func (cm *Manager) GetInfo() CMInfo {
+	cm.lastTrimMu.RLock()
+	lastTrim := cm.lastTrim
+	cm.lastTrimMu.RUnlock()
+
+	return CMInfo{
+		HighWater:   cm.cfg.highWater,
+		LowWater:    cm.cfg.lowWater,
+		LastTrim:    lastTrim,
+		GracePeriod: cm.cfg.gracePeriod,
+		ConnCount:   int(atomic.LoadInt32(&cm.connCount)),
+	}
+}
+
+// HasStream is called to retrieve a stream if it does exist for the pid
+func (cm *Manager) HasStream(n network.Network, pid peer.ID) (network.Stream, error) {
+	s := cm.segments.get(pid)
+	s.Lock()
+	defer s.Unlock()
+
+	pinfo, ok := s.peers[pid]
+	if !ok {
+		return nil, errors.New("no stream available for pid")
+	}
+
+	for c, _ := range pinfo.conns {
+		pinfo.conns[c] = time.Now()
+		return c, nil
+	}
+
+	return nil, errors.New("no stream available")
+}
+
+// Connected is called by notifiers to inform that a new connection has been established.
+// The notifee updates the Manager to start tracking the connection. If the new connection
+// count exceeds the high watermark, a trim may be triggered.
+func (cm *Manager) Connected(n network.Network, c network.Stream) {
+
+	p := c.Conn().RemotePeer()
+	s := cm.segments.get(p)
+	s.Lock()
+	defer s.Unlock()
+
+	pinfo, ok := s.peers[p]
+	if !ok {
+		pinfo = &peerInfo{
+			id:        p,
+			firstSeen: time.Now(),
+			tags:      make(map[string]int),
+			decaying:  make(map[*decayingTag]*connmgr.DecayingValue),
+			conns:     make(map[network.Stream]time.Time),
+		}
+		s.peers[p] = pinfo
+	} else if pinfo.temp {
+		// we had created a temporary entry for this peer to buffer early tags before the
+		// Connected notification arrived: flip the temporary flag, and update the firstSeen
+		// timestamp to the real one.
+		pinfo.temp = false
+		pinfo.firstSeen = time.Now()
+	}
+
+	// cache one stream for peer
+	pinfo.conns = map[network.Stream]time.Time{c: time.Now()}
+	// _, ok = pinfo.conns[c]
+	// if ok {
+	// 	log.Error("received connected notification for conn we are already tracking: ", p)
+	// 	return
+	// }
+
+	// pinfo.conns[c] = time.Now()
+	atomic.AddInt32(&cm.connCount, 1)
+}
+
+// Disconnected is called by notifiers to inform that an existing connection has been closed or terminated.
+// The notifee updates the Manager accordingly to stop tracking the connection, and performs housekeeping.
+func (cm *Manager) Disconnected(n network.Network, c network.Stream) {
+
+	p := c.Conn().RemotePeer()
+	s := cm.segments.get(p)
+	s.Lock()
+	defer s.Unlock()
+
+	cinf, ok := s.peers[p]
+	if !ok {
+		log.Error("received disconnected notification for peer we are not tracking: ", p)
+		return
+	}
+
+	_, ok = cinf.conns[c]
+	if !ok {
+		log.Error("received disconnected notification for conn we are not tracking: ", p)
+		return
+	}
+
+	delete(cinf.conns, c)
+	if len(cinf.conns) == 0 {
+		delete(s.peers, p)
+	}
+	atomic.AddInt32(&cm.connCount, -1)
+}

+ 77 - 0
pkg/stream/options.go

@@ -0,0 +1,77 @@
+// The MIT License (MIT)
+
+// Copyright (c) 2017 Whyrusleeping
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+// This package is a port of go-libp2p-connmgr, but adapted for streams
+
+package stream
+
+import (
+	"errors"
+	"time"
+)
+
+// config is the configuration struct for the basic connection manager.
+type config struct {
+	highWater     int
+	lowWater      int
+	gracePeriod   time.Duration
+	silencePeriod time.Duration
+	decayer       *DecayerCfg
+	emergencyTrim bool
+}
+
+// Option represents an option for the basic connection manager.
+type Option func(*config) error
+
+// DecayerConfig applies a configuration for the decayer.
+func DecayerConfig(opts *DecayerCfg) Option {
+	return func(cfg *config) error {
+		cfg.decayer = opts
+		return nil
+	}
+}
+
+// WithGracePeriod sets the grace period.
+// The grace period is the time a newly opened connection is given before it becomes
+// subject to pruning.
+func WithGracePeriod(p time.Duration) Option {
+	return func(cfg *config) error {
+		if p < 0 {
+			return errors.New("grace period must be non-negative")
+		}
+		cfg.gracePeriod = p
+		return nil
+	}
+}
+
+// WithSilencePeriod sets the silence period.
+// The connection manager will perform a cleanup once per silence period
+// if the number of connections surpasses the high watermark.
+func WithSilencePeriod(p time.Duration) Option {
+	return func(cfg *config) error {
+		if p <= 0 {
+			return errors.New("silence period must be non-zero")
+		}
+		cfg.silencePeriod = p
+		return nil
+	}
+}

+ 14 - 5
pkg/vpn/config.go

@@ -41,16 +41,12 @@ type Config struct {
 
 
 	Concurrency       int
 	Concurrency       int
 	ChannelBufferSize int
 	ChannelBufferSize int
+	MaxStreams        int
 	lowProfile        bool
 	lowProfile        bool
 }
 }
 
 
 type Option func(cfg *Config) error
 type Option func(cfg *Config) error
 
 
-var LowProfile Option = func(cfg *Config) error {
-	cfg.lowProfile = true
-	return nil
-}
-
 // Apply applies the given options to the config, returning the first error
 // Apply applies the given options to the config, returning the first error
 // encountered (if any).
 // encountered (if any).
 func (cfg *Config) Apply(opts ...Option) error {
 func (cfg *Config) Apply(opts ...Option) error {
@@ -65,6 +61,19 @@ func (cfg *Config) Apply(opts ...Option) error {
 	return nil
 	return nil
 }
 }
 
 
+func WithMaxStreams(i int) func(cfg *Config) error {
+	return func(cfg *Config) error {
+		cfg.MaxStreams = i
+		return nil
+	}
+}
+
+var LowProfile Option = func(cfg *Config) error {
+	cfg.lowProfile = true
+
+	return nil
+}
+
 func WithInterface(i *water.Interface) func(cfg *Config) error {
 func WithInterface(i *water.Interface) func(cfg *Config) error {
 	return func(cfg *Config) error {
 	return func(cfg *Config) error {
 		cfg.Interface = i
 		cfg.Interface = i

+ 63 - 16
pkg/vpn/vpn.go

@@ -34,13 +34,22 @@ import (
 	"github.com/mudler/edgevpn/pkg/logger"
 	"github.com/mudler/edgevpn/pkg/logger"
 	"github.com/mudler/edgevpn/pkg/node"
 	"github.com/mudler/edgevpn/pkg/node"
 	"github.com/mudler/edgevpn/pkg/protocol"
 	"github.com/mudler/edgevpn/pkg/protocol"
+	"github.com/mudler/edgevpn/pkg/stream"
 	"github.com/mudler/edgevpn/pkg/types"
 	"github.com/mudler/edgevpn/pkg/types"
+
 	"github.com/pkg/errors"
 	"github.com/pkg/errors"
 	"github.com/songgao/packets/ethernet"
 	"github.com/songgao/packets/ethernet"
 	"github.com/songgao/water"
 	"github.com/songgao/water"
 	"golang.org/x/net/ipv4"
 	"golang.org/x/net/ipv4"
 )
 )
 
 
+type streamManager interface {
+	Connected(n network.Network, c network.Stream)
+	Disconnected(n network.Network, c network.Stream)
+	HasStream(n network.Network, pid peer.ID) (network.Stream, error)
+	Close() error
+}
+
 func VPNNetworkService(p ...Option) node.NetworkService {
 func VPNNetworkService(p ...Option) node.NetworkService {
 	return func(ctx context.Context, nc node.Config, n *node.Node, b *blockchain.Ledger) error {
 	return func(ctx context.Context, nc node.Config, n *node.Node, b *blockchain.Ledger) error {
 		c := &Config{
 		c := &Config{
@@ -48,6 +57,7 @@ func VPNNetworkService(p ...Option) node.NetworkService {
 			LedgerAnnounceTime: 5 * time.Second,
 			LedgerAnnounceTime: 5 * time.Second,
 			Timeout:            15 * time.Second,
 			Timeout:            15 * time.Second,
 			Logger:             logger.New(log.LevelDebug),
 			Logger:             logger.New(log.LevelDebug),
+			MaxStreams:         30,
 		}
 		}
 		c.Apply(p...)
 		c.Apply(p...)
 
 
@@ -57,8 +67,23 @@ func VPNNetworkService(p ...Option) node.NetworkService {
 		}
 		}
 		defer ifce.Close()
 		defer ifce.Close()
 
 
+		var mgr streamManager
+
+		if !c.lowProfile {
+			// Create stream manager for outgoing connections
+			mgr, err = stream.NewConnManager(10, c.MaxStreams)
+			if err != nil {
+				return err
+			}
+			// Attach it to the same context
+			go func() {
+				<-ctx.Done()
+				mgr.Close()
+			}()
+		}
+
 		// Set stream handler during runtime
 		// Set stream handler during runtime
-		n.Host().SetStreamHandler(protocol.EdgeVPN.ID(), streamHandler(b, ifce))
+		n.Host().SetStreamHandler(protocol.EdgeVPN.ID(), streamHandler(b, ifce, c))
 
 
 		// Announce our IP
 		// Announce our IP
 		ip, _, err := net.ParseCIDR(c.InterfaceAddress)
 		ip, _, err := net.ParseCIDR(c.InterfaceAddress)
@@ -91,7 +116,7 @@ func VPNNetworkService(p ...Option) node.NetworkService {
 		}
 		}
 
 
 		// read packets from the interface
 		// read packets from the interface
-		return readPackets(ctx, c, n, b, ifce)
+		return readPackets(ctx, mgr, c, n, b, ifce)
 	}
 	}
 }
 }
 
 
@@ -101,7 +126,7 @@ func Register(p ...Option) ([]node.Option, error) {
 	return []node.Option{node.WithNetworkService(VPNNetworkService(p...))}, nil
 	return []node.Option{node.WithNetworkService(VPNNetworkService(p...))}, nil
 }
 }
 
 
-func streamHandler(l *blockchain.Ledger, ifce *water.Interface) func(stream network.Stream) {
+func streamHandler(l *blockchain.Ledger, ifce *water.Interface, c *Config) func(stream network.Stream) {
 	return func(stream network.Stream) {
 	return func(stream network.Stream) {
 		if !l.Exists(protocol.MachinesLedgerKey,
 		if !l.Exists(protocol.MachinesLedgerKey,
 			func(d blockchain.Data) bool {
 			func(d blockchain.Data) bool {
@@ -112,8 +137,13 @@ func streamHandler(l *blockchain.Ledger, ifce *water.Interface) func(stream netw
 			stream.Reset()
 			stream.Reset()
 			return
 			return
 		}
 		}
-		io.Copy(ifce.ReadWriteCloser, stream)
-		stream.Close()
+		_, err := io.Copy(ifce.ReadWriteCloser, stream)
+		if err != nil {
+			stream.Reset()
+		}
+		if c.lowProfile {
+			stream.Close()
+		}
 	}
 	}
 }
 }
 
 
@@ -143,7 +173,7 @@ func getFrame(ifce *water.Interface, c *Config) (ethernet.Frame, error) {
 	return frame, nil
 	return frame, nil
 }
 }
 
 
-func handleFrame(frame ethernet.Frame, c *Config, n *node.Node, ip net.IP, ledger *blockchain.Ledger, ifce *water.Interface) error {
+func handleFrame(mgr streamManager, frame ethernet.Frame, c *Config, n *node.Node, ip net.IP, ledger *blockchain.Ledger, ifce *water.Interface) error {
 	ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
 	ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
 	defer cancel()
 	defer cancel()
 
 
@@ -171,21 +201,38 @@ func handleFrame(frame ethernet.Frame, c *Config, n *node.Node, ip net.IP, ledge
 		return errors.Wrap(err, "could not decode peer")
 		return errors.Wrap(err, "could not decode peer")
 	}
 	}
 
 
-	// Open a stream
-	stream, err := n.Host().NewStream(ctx, d, protocol.EdgeVPN.ID())
+	var stream network.Stream
+	if mgr != nil {
+		// Open a stream if necessary
+		stream, err = mgr.HasStream(n.Host().Network(), d)
+		if err == nil {
+			_, err = stream.Write(frame)
+			if err == nil {
+				return nil
+			}
+			mgr.Disconnected(n.Host().Network(), stream)
+		}
+	}
+
+	stream, err = n.Host().NewStream(ctx, d, protocol.EdgeVPN.ID())
 	if err != nil {
 	if err != nil {
 		return errors.Wrap(err, "could not open stream")
 		return errors.Wrap(err, "could not open stream")
 	}
 	}
 
 
-	stream.Write(frame)
-	//if c.lowProfile {
-	stream.Close()
-	//}
-	return nil
+	if mgr != nil {
+		mgr.Connected(n.Host().Network(), stream)
+	}
+
+	_, err = stream.Write(frame)
+	if c.lowProfile && err == nil {
+		return stream.Close()
+	}
+	return err
 }
 }
 
 
 func connectionWorker(
 func connectionWorker(
 	p chan ethernet.Frame,
 	p chan ethernet.Frame,
+	mgr streamManager,
 	c *Config,
 	c *Config,
 	n *node.Node,
 	n *node.Node,
 	ip net.IP,
 	ip net.IP,
@@ -194,14 +241,14 @@ func connectionWorker(
 	ifce *water.Interface) {
 	ifce *water.Interface) {
 	defer wg.Done()
 	defer wg.Done()
 	for f := range p {
 	for f := range p {
-		if err := handleFrame(f, c, n, ip, ledger, ifce); err != nil {
+		if err := handleFrame(mgr, f, c, n, ip, ledger, ifce); err != nil {
 			c.Logger.Debugf("could not handle frame: %s", err.Error())
 			c.Logger.Debugf("could not handle frame: %s", err.Error())
 		}
 		}
 	}
 	}
 }
 }
 
 
 // redirects packets from the interface to the node using the routing table in the blockchain
 // redirects packets from the interface to the node using the routing table in the blockchain
-func readPackets(ctx context.Context, c *Config, n *node.Node, ledger *blockchain.Ledger, ifce *water.Interface) error {
+func readPackets(ctx context.Context, mgr streamManager, c *Config, n *node.Node, ledger *blockchain.Ledger, ifce *water.Interface) error {
 	ip, _, err := net.ParseCIDR(c.InterfaceAddress)
 	ip, _, err := net.ParseCIDR(c.InterfaceAddress)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -218,7 +265,7 @@ func readPackets(ctx context.Context, c *Config, n *node.Node, ledger *blockchai
 
 
 	for i := 0; i < c.Concurrency; i++ {
 	for i := 0; i < c.Concurrency; i++ {
 		wg.Add(1)
 		wg.Add(1)
-		go connectionWorker(packets, c, n, ip, wg, ledger, ifce)
+		go connectionWorker(packets, mgr, c, n, ip, wg, ledger, ifce)
 	}
 	}
 
 
 	for {
 	for {