|
|
@@ -96,11 +96,9 @@ type Interface struct {
|
|
|
|
|
|
l *logrus.Logger
|
|
|
|
|
|
- inPool sync.Pool
|
|
|
- inbound chan *packet.Packet
|
|
|
-
|
|
|
- outPool sync.Pool
|
|
|
- outbound chan *[]byte
|
|
|
+ pktPool *packet.Pool
|
|
|
+ inbound chan *packet.Packet
|
|
|
+ outbound chan *packet.Packet
|
|
|
}
|
|
|
|
|
|
type EncWriter interface {
|
|
|
@@ -203,20 +201,13 @@ func NewInterface(ctx context.Context, c *InterfaceConfig) (*Interface, error) {
|
|
|
},
|
|
|
|
|
|
//TODO: configurable size
|
|
|
- inbound: make(chan *packet.Packet, 1028),
|
|
|
- outbound: make(chan *[]byte, 1028),
|
|
|
+ inbound: make(chan *packet.Packet, 2048),
|
|
|
+ outbound: make(chan *packet.Packet, 2048),
|
|
|
|
|
|
l: c.l,
|
|
|
}
|
|
|
|
|
|
- ifce.inPool = sync.Pool{New: func() any {
|
|
|
- return packet.New()
|
|
|
- }}
|
|
|
-
|
|
|
- ifce.outPool = sync.Pool{New: func() any {
|
|
|
- t := make([]byte, mtu)
|
|
|
- return &t
|
|
|
- }}
|
|
|
+ ifce.pktPool = packet.NewPool()
|
|
|
|
|
|
ifce.tryPromoteEvery.Store(c.tryPromoteEvery)
|
|
|
ifce.reQueryEvery.Store(c.reQueryEvery)
|
|
|
@@ -267,19 +258,21 @@ func (f *Interface) activate() error {
|
|
|
|
|
|
func (f *Interface) run(c context.Context) (func(), error) {
|
|
|
for i := 0; i < f.routines; i++ {
|
|
|
- // Launch n queues to read packets from udp
|
|
|
+ // read packets from udp and queue to f.inbound
|
|
|
f.wg.Add(1)
|
|
|
go f.listenOut(i)
|
|
|
|
|
|
- // Launch n queues to read packets from tun dev
|
|
|
- f.wg.Add(1)
|
|
|
+ // Launch n queues to read packets from inside tun dev and queue to f.outbound
|
|
|
+ //todo this never stops f.wg.Add(1)
|
|
|
go f.listenIn(f.readers[i], i)
|
|
|
|
|
|
- // Launch n queues to read packets from tun dev
|
|
|
+ // Launch n workers to process traffic from f.inbound and smash it onto the inside of the tun
|
|
|
+ f.wg.Add(1)
|
|
|
+ go f.workerIn(i, c)
|
|
|
f.wg.Add(1)
|
|
|
go f.workerIn(i, c)
|
|
|
|
|
|
- // Launch n queues to read packets from tun dev
|
|
|
+ // read from f.outbound and write to UDP (outside the tun)
|
|
|
f.wg.Add(1)
|
|
|
go f.workerOut(i, c)
|
|
|
}
|
|
|
@@ -296,22 +289,7 @@ func (f *Interface) listenOut(i int) {
|
|
|
li = f.outside
|
|
|
}
|
|
|
|
|
|
- err := li.ListenOut(func(fromUdpAddr netip.AddrPort, payload []byte) {
|
|
|
- p := f.inPool.Get().(*packet.Packet)
|
|
|
- //TODO: have the listener store this in the msgs array after a read instead of doing a copy
|
|
|
-
|
|
|
- p.Payload = p.Payload[:mtu]
|
|
|
- copy(p.Payload, payload)
|
|
|
- p.Payload = p.Payload[:len(payload)]
|
|
|
- p.Addr = fromUdpAddr
|
|
|
- f.inbound <- p
|
|
|
- //select {
|
|
|
- //case f.inbound <- p:
|
|
|
- //default:
|
|
|
- // f.l.Error("Dropped packet from inbound channel")
|
|
|
- //}
|
|
|
- })
|
|
|
-
|
|
|
+ err := li.ListenOut(f.pktPool.Get, f.inbound)
|
|
|
if err != nil && !f.closed.Load() {
|
|
|
f.l.WithError(err).Error("Error while reading packet inbound packet, closing")
|
|
|
//TODO: Trigger Control to close
|
|
|
@@ -325,9 +303,8 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
|
|
|
runtime.LockOSThread()
|
|
|
|
|
|
for {
|
|
|
- p := f.outPool.Get().(*[]byte)
|
|
|
- *p = (*p)[:mtu]
|
|
|
- n, err := reader.Read(*p)
|
|
|
+ p := f.pktPool.Get()
|
|
|
+ n, err := reader.Read(p.Payload)
|
|
|
if err != nil {
|
|
|
if !f.closed.Load() {
|
|
|
f.l.WithError(err).Error("Error while reading outbound packet, closing")
|
|
|
@@ -336,7 +313,7 @@ func (f *Interface) listenIn(reader io.ReadWriteCloser, i int) {
|
|
|
break
|
|
|
}
|
|
|
|
|
|
- *p = (*p)[:n]
|
|
|
+ p.Payload = (p.Payload)[:n]
|
|
|
//TODO: nonblocking channel write
|
|
|
f.outbound <- p
|
|
|
//select {
|
|
|
@@ -362,8 +339,7 @@ func (f *Interface) workerIn(i int, ctx context.Context) {
|
|
|
select {
|
|
|
case p := <-f.inbound:
|
|
|
f.readOutsidePackets(p.Addr, nil, result2[:0], p.Payload, h, fwPacket2, lhh, nb2, i, conntrackCache.Get(f.l))
|
|
|
- p.Payload = p.Payload[:mtu]
|
|
|
- f.inPool.Put(p)
|
|
|
+ f.pktPool.Put(p)
|
|
|
case <-ctx.Done():
|
|
|
f.wg.Done()
|
|
|
return
|
|
|
@@ -380,9 +356,8 @@ func (f *Interface) workerOut(i int, ctx context.Context) {
|
|
|
for {
|
|
|
select {
|
|
|
case data := <-f.outbound:
|
|
|
- f.consumeInsidePacket(*data, fwPacket1, nb1, result1, i, conntrackCache.Get(f.l))
|
|
|
- *data = (*data)[:mtu]
|
|
|
- f.outPool.Put(data)
|
|
|
+ f.consumeInsidePacket(data.Payload, fwPacket1, nb1, result1, i, conntrackCache.Get(f.l))
|
|
|
+ f.pktPool.Put(data)
|
|
|
case <-ctx.Done():
|
|
|
f.wg.Done()
|
|
|
return
|