Parcourir la source

use serverID instead of interface string for generating queuedID

flashmob il y a 5 ans
Parent
commit
8f860d1630
7 fichiers modifiés avec 47 ajouts et 42 suppressions
  1. 3 3
      client.go
  2. 2 2
      guerrilla.go
  3. 18 15
      mail/envelope.go
  4. 3 3
      mail/envelope_test.go
  5. 2 2
      pool.go
  6. 11 9
      server.go
  7. 8 8
      server_test.go

+ 3 - 3
client.go

@@ -56,12 +56,12 @@ type client struct {
 }
 
 // NewClient allocates a new client.
-func NewClient(conn net.Conn, clientID uint64, logger log.Logger, envelope *mail.Pool, iface string) *client {
+func NewClient(conn net.Conn, clientID uint64, logger log.Logger, envelope *mail.Pool, serverID int) *client {
 	c := &client{
 		conn: conn,
 		// Envelope will be borrowed from the envelope pool
 		// the envelope could be 'detached' from the client later when processing
-		Envelope:    envelope.Borrow(getRemoteAddr(conn), clientID, iface),
+		Envelope:    envelope.Borrow(getRemoteAddr(conn), clientID, serverID),
 		ConnectedAt: time.Now(),
 		bufin:       newSMTPBufferedReader(conn),
 		bufout:      bufio.NewWriter(conn),
@@ -172,7 +172,7 @@ func (c *client) init(conn net.Conn, clientID uint64, ep *mail.Pool) {
 	c.ID = clientID
 	c.errors = 0
 	// borrow an envelope from the envelope pool
-	c.Envelope = ep.Borrow(getRemoteAddr(conn), clientID, c.ServerIface)
+	c.Envelope = ep.Borrow(getRemoteAddr(conn), clientID, c.ServerID)
 }
 
 // getID returns the client's unique ID

+ 2 - 2
guerrilla.go

@@ -162,7 +162,7 @@ func New(ac *AppConfig, l log.Logger, b ...backends.Backend) (Guerrilla, error)
 func (g *guerrilla) makeServers() error {
 	g.mainlog().Debug("making servers")
 	var errs Errors
-	for _, sc := range g.Config.Servers {
+	for serverID, sc := range g.Config.Servers {
 		if _, ok := g.servers[sc.ListenInterface]; ok {
 			// server already instantiated
 			continue
@@ -173,7 +173,7 @@ func (g *guerrilla) makeServers() error {
 			continue
 		} else {
 			sc := sc // pin!
-			server, err := newServer(&sc, g.backend(sc.Gateway), g.mainlog())
+			server, err := newServer(&sc, g.backend(sc.Gateway), g.mainlog(), serverID)
 			if err != nil {
 				g.mainlog().WithError(err).Errorf("Failed to create server [%s]", sc.ListenInterface)
 				errs = append(errs, err)

+ 18 - 15
mail/envelope.go

@@ -2,6 +2,7 @@ package mail
 
 import (
 	"bytes"
+	"encoding/binary"
 	"errors"
 	"fmt"
 	"hash/fnv"
@@ -154,25 +155,27 @@ type Envelope struct {
 	TransportType smtp.TransportType
 	// ESMTP: true if EHLO was used
 	ESMTP bool
-	// ServerIface records the server's interface in the config
-	ServerIface string
+	// ServerID records the server's index in the configuration
+	ServerID int
 
 	// When locked, it means that the envelope is being processed by the backend
 	sync.WaitGroup
 }
 
-func NewEnvelope(remoteAddr string, clientID uint64, iface string) *Envelope {
+func NewEnvelope(remoteAddr string, clientID uint64, serverID int) *Envelope {
 	return &Envelope{
-		RemoteIP:    remoteAddr,
-		Values:      make(map[string]interface{}),
-		ServerIface: iface,
-		QueuedId:    queuedID(clientID, iface),
+		RemoteIP: remoteAddr,
+		Values:   make(map[string]interface{}),
+		ServerID: serverID,
+		QueuedId: queuedID(clientID, serverID),
 	}
 }
 
-func queuedID(clientID uint64, iface string) string {
+func queuedID(clientID uint64, serverID int) string {
 	h := fnv.New128a()
-	_, _ = h.Write([]byte(fmt.Sprintf("%v%v%v", time.Now(), clientID, iface)))
+	tmp := make([]byte, 8)
+	binary.LittleEndian.PutUint64(tmp, uint64(time.Now().UnixNano())+clientID+uint64(serverID))
+	h.Write(tmp)
 	return fmt.Sprintf("%x", h.Sum([]byte{}))
 }
 
@@ -236,10 +239,10 @@ func (e *Envelope) ResetTransaction() {
 }
 
 // Reseed is called when used with a new connection, once it's accepted
-func (e *Envelope) Reseed(remoteIP string, clientID uint64, iface string) {
+func (e *Envelope) Reseed(remoteIP string, clientID uint64, serverID int) {
 	e.RemoteIP = remoteIP
-	e.ServerIface = iface
-	e.QueuedId = queuedID(clientID, iface)
+	e.ServerID = serverID
+	e.QueuedId = queuedID(clientID, serverID)
 	e.Helo = ""
 	e.TLS = false
 	e.ESMTP = false
@@ -432,14 +435,14 @@ func NewPool(poolSize int) *Pool {
 	}
 }
 
-func (p *Pool) Borrow(remoteAddr string, clientID uint64, iface string) *Envelope {
+func (p *Pool) Borrow(remoteAddr string, clientID uint64, serverID int) *Envelope {
 	var e *Envelope
 	p.sem <- true // block the envelope until more room
 	select {
 	case e = <-p.pool:
-		e.Reseed(remoteAddr, clientID, iface)
+		e.Reseed(remoteAddr, clientID, serverID)
 	default:
-		e = NewEnvelope(remoteAddr, clientID, iface)
+		e = NewEnvelope(remoteAddr, clientID, serverID)
 	}
 	return e
 }

+ 3 - 3
mail/envelope_test.go

@@ -97,7 +97,7 @@ func TestAddressWithIP(t *testing.T) {
 }
 
 func TestEnvelope(t *testing.T) {
-	e := NewEnvelope("127.0.0.1", 22, "127.0.0.1:25")
+	e := NewEnvelope("127.0.0.1", 22, 0)
 
 	e.QueuedId = "abc123"
 	e.Helo = "helo.example.com"
@@ -162,11 +162,11 @@ func TestEncodedWordAhead(t *testing.T) {
 }
 
 func TestQueuedID(t *testing.T) {
-	str := queuedID(555, "127.0.7.4")
+	str := queuedID(555, 1)
 	if len(str) != 32 {
 		t.Error("queuedID needs to be 32 bytes in length")
 	}
-	str2 := queuedID(555, "127.0.7.4")
+	str2 := queuedID(555, 1)
 	if str == str2 {
 		t.Error("hashes should not be equal")
 	}

+ 2 - 2
pool.go

@@ -127,7 +127,7 @@ func (p *Pool) GetActiveClientsCount() int {
 }
 
 // Borrow a Client from the pool. Will block if len(activeClients) > maxClients
-func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger, ep *mail.Pool, iface string) (Poolable, error) {
+func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger, ep *mail.Pool, serverID int) (Poolable, error) {
 	p.poolGuard.Lock()
 	defer p.poolGuard.Unlock()
 
@@ -142,7 +142,7 @@ func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger, ep *mai
 		case c = <-p.pool:
 			c.init(conn, clientID, ep)
 		default:
-			c = NewClient(conn, clientID, logger, ep, iface)
+			c = NewClient(conn, clientID, logger, ep, serverID)
 		}
 		p.activeClientsAdd(c)
 

+ 11 - 9
server.go

@@ -46,6 +46,7 @@ type server struct {
 	tlsConfigStore  atomic.Value
 	timeout         atomic.Value // stores time.Duration
 	listenInterface string
+	serverID        int
 	clientPool      *Pool
 	wg              sync.WaitGroup // for waiting to shutdown
 	listener        net.Listener
@@ -87,11 +88,12 @@ func (c command) match(in []byte) bool {
 }
 
 // Creates and returns a new ready-to-run Server from a ServerConfig configuration
-func newServer(sc *ServerConfig, b backends.Backend, mainlog log.Logger) (*server, error) {
+func newServer(sc *ServerConfig, b backends.Backend, mainlog log.Logger, serverID int) (*server, error) {
 	server := &server{
 		clientPool:      NewPool(sc.MaxClients),
 		closedListener:  make(chan bool, 1),
 		listenInterface: sc.ListenInterface,
+		serverID:        serverID,
 		state:           ServerStateNew,
 		envelopePool:    mail.NewPool(sc.MaxClients * 2),
 	}
@@ -242,26 +244,26 @@ func (s *server) Start(startWG *sync.WaitGroup) error {
 		return fmt.Errorf("[%s] cannot listen on port: %s ", s.listenInterface, err.Error())
 	}
 
-	s.log().Fields("iface", s.listenInterface).Info("listening on TCP")
+	s.log().Fields("iface", s.listenInterface, "serverID", s.serverID).Info("listening on TCP")
 	s.state = ServerStateRunning
 	startWG.Done() // start successful, don't wait for me
 
 	for {
-		s.log().Fields("iface", s.listenInterface, "nextSeq", clientID+1).Debug("waiting for a new client")
+		s.log().Fields("serverID", s.serverID, "nextSeq", clientID+1).Debug("waiting for a new client")
 		conn, err := listener.Accept()
 		clientID++
 		if err != nil {
 			if e, ok := err.(net.Error); ok && !e.Temporary() {
-				s.log().Fields("iface", s.listenInterface).Info("server has stopped accepting new clients")
+				s.log().Fields("iface", s.listenInterface, "serverID", s.serverID).Info("server has stopped accepting new clients")
 				// the listener has been closed, wait for clients to exit
-				s.log().Fields("iface", s.listenInterface).Info("shutting down pool")
+				s.log().Fields("iface", s.listenInterface, "serverID", s.serverID).Info("shutting down pool")
 				s.clientPool.ShutdownState()
 				s.clientPool.ShutdownWait()
 				s.state = ServerStateStopped
 				s.closedListener <- true
 				return nil
 			}
-			s.mainlog().Fields("error", err, "iface", s.listenInterface).Info("temporary error accepting client")
+			s.mainlog().Fields("error", err, "serverID", s.serverID).Error("temporary error accepting client")
 			continue
 		}
 		go func(p Poolable, borrowErr error) {
@@ -271,14 +273,14 @@ func (s *server) Start(startWG *sync.WaitGroup) error {
 				s.envelopePool.Return(c.Envelope)
 				s.clientPool.Return(c)
 			} else {
-				s.log().WithError(borrowErr).Info("couldn't borrow a new client")
+				s.log().Fields("error", borrowErr, "serverID", s.serverID).Error("couldn't borrow a new client")
 				// we could not get a client, so close the connection.
 				_ = conn.Close()
 
 			}
 			// intentionally placed Borrow in args so that it's called in the
 			// same main goroutine.
-		}(s.clientPool.Borrow(conn, clientID, s.log(), s.envelopePool, s.listenInterface))
+		}(s.clientPool.Borrow(conn, clientID, s.log(), s.envelopePool, s.serverID))
 
 	}
 }
@@ -599,7 +601,7 @@ func (s *server) handleClient(client *client) {
 					e := s.envelopePool.Borrow(
 						client.Envelope.RemoteIP,
 						client.ID,
-						client.Envelope.ServerIface,
+						client.Envelope.ServerID,
 					)
 					s.copyEnvelope(client.Envelope, e)
 					// process in the background then return the envelope

+ 8 - 8
server_test.go

@@ -67,7 +67,7 @@ func getMockServerConn(sc *ServerConfig, t *testing.T) (*mocks.Conn, *server) {
 	if err != nil {
 		t.Error("new dummy backend failed because:", err)
 	}
-	server, err := newServer(sc, backend, mainlog)
+	server, err := newServer(sc, backend, mainlog, 0)
 	if err != nil {
 		//t.Error("new server failed because:", err)
 	} else {
@@ -284,7 +284,7 @@ func TestHandleClient(t *testing.T) {
 	}
 	conn, server := getMockServerConn(sc, t)
 	// call the serve.handleClient() func in a goroutine.
-	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), sc.ListenInterface)
+	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), 0)
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
@@ -328,7 +328,7 @@ func TestGithubIssue197(t *testing.T) {
 	// [2001:DB8::FF00:42:8329] is an address literal
 	server.setAllowedHosts([]string{"1.1.1.1", "[2001:DB8::FF00:42:8329]"})
 
-	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), sc.ListenInterface)
+	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), 0)
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
@@ -444,7 +444,7 @@ func TestGithubIssue198(t *testing.T) {
 
 	server.setAllowedHosts([]string{"1.1.1.1", "[2001:DB8::FF00:42:8329]"})
 
-	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), sc.ListenInterface)
+	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), 0)
 	client.RemoteIP = "127.0.0.1"
 
 	var wg sync.WaitGroup
@@ -568,7 +568,7 @@ func TestGithubIssue199(t *testing.T) {
 
 	server.setAllowedHosts([]string{"grr.la", "fake.com", "[1.1.1.1]", "[2001:db8::8a2e:370:7334]", "saggydimes.test.com"})
 
-	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), sc.ListenInterface)
+	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), 0)
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
@@ -746,7 +746,7 @@ func TestGithubIssue200(t *testing.T) {
 	server.backend().Start()
 	server.setAllowedHosts([]string{"1.1.1.1", "[2001:DB8::FF00:42:8329]"})
 
-	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), sc.ListenInterface)
+	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), 0)
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
@@ -800,7 +800,7 @@ func TestGithubIssue201(t *testing.T) {
 	// it will be used for rcpt to:<postmaster> which does not specify a host
 	server.setAllowedHosts([]string{"a.com", "saggydimes.test.com"})
 
-	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), sc.ListenInterface)
+	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), 0)
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
@@ -884,7 +884,7 @@ func TestXClient(t *testing.T) {
 	}
 	conn, server := getMockServerConn(sc, t)
 	// call the serve.handleClient() func in a goroutine.
-	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), sc.ListenInterface)
+	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5), 0)
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {