Ver Fonte

- Envelopes now have their own pool. This is so that if processing takes longer than backend timeout, the client can still be reused for the next connection,
while the envelope can be detached. Sometimes the client would panic if it reused a client where the backend still had a pointer to the client's envelope.
'Detaching' the envelope from the client avoids this problem.
- Gateway has new config options to control timeouts: gw_save_timeout and gw_val_rcpt_timeout
- spent some time on improving reliability of guerrilla_db_redis processor example

flashmob há 8 anos atrás
pai
commit
3cb670d137

+ 13 - 7
api_test.go

@@ -342,7 +342,7 @@ var funkyLogger = func() backends.Decorator {
 			}),
 	)
 
-	return func(c backends.Processor) backends.Processor {
+	return func(p backends.Processor) backends.Processor {
 		return backends.ProcessWith(
 			func(e *mail.Envelope, task backends.SelectTask) (backends.Result, error) {
 				if task == backends.TaskValidateRcpt {
@@ -351,13 +351,13 @@ var funkyLogger = func() backends.Decorator {
 						"another funky recipient [%s]",
 						e.RcptTo[len(e.RcptTo)-1])
 					// if valid then forward call to the next processor in the chain
-					return c.Process(e, task)
+					return p.Process(e, task)
 					// if invalid, return a backend result
 					//return backends.NewResult(response.Canned.FailRcptCmd), nil
 				} else if task == backends.TaskSaveMail {
 					backends.Log().Info("Another funky email!")
 				}
-				return c.Process(e, task)
+				return p.Process(e, task)
 			})
 	}
 }
@@ -411,19 +411,25 @@ func talkToServer(address string) {
 	}
 	in := bufio.NewReader(conn)
 	str, err := in.ReadString('\n')
+	//	fmt.Println(str)
 	fmt.Fprint(conn, "HELO maildiranasaurustester\r\n")
 	str, err = in.ReadString('\n')
+	//	fmt.Println(str)
 	fmt.Fprint(conn, "MAIL FROM:<[email protected]>r\r\n")
 	str, err = in.ReadString('\n')
+	//	fmt.Println(str)
 	fmt.Fprint(conn, "RCPT TO:[email protected]\r\n")
 	str, err = in.ReadString('\n')
+	//	fmt.Println(str)
 	fmt.Fprint(conn, "DATA\r\n")
 	str, err = in.ReadString('\n')
+	//	fmt.Println(str)
 	fmt.Fprint(conn, "Subject: Test subject\r\n")
 	fmt.Fprint(conn, "\r\n")
 	fmt.Fprint(conn, "A an email body\r\n")
 	fmt.Fprint(conn, ".\r\n")
 	str, err = in.ReadString('\n')
+	//	fmt.Println(str)
 	_ = str
 }
 
@@ -455,7 +461,7 @@ func TestPubSubAPI(t *testing.T) {
 
 	// new config
 	cfg := &AppConfig{
-		PidFile:       "tests/pidfile`.pid",
+		PidFile:       "tests/pidfilex.pid",
 		LogFile:       "tests/testlog",
 		AllowedHosts:  []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{"process_stack": "HeadersParser|Debugger|FunkyLogger"},
@@ -494,14 +500,14 @@ func TestAPILog(t *testing.T) {
 	os.Truncate("tests/testlog", 0)
 	d := Daemon{}
 	l := d.Log()
-	l.Info("hai") // to stderr
+	l.Info("logtest1") // to stderr
 	if l.GetLevel() != "info" {
 		t.Error("Log level does not eq info, it is ", l.GetLevel())
 	}
 	d.Logger = nil
 	d.Config = &AppConfig{LogFile: "tests/testlog"}
 	l = d.Log()
-	l.Info("hai") // to tests/testlog
+	l.Info("logtest1") // to tests/testlog
 
 	//
 	l = d.Log()
@@ -515,7 +521,7 @@ func TestAPILog(t *testing.T) {
 		return
 	}
 	// lets interrogate the log
-	if strings.Index(string(b), "hai") < 0 {
+	if strings.Index(string(b), "logtest1") < 0 {
 		t.Error("hai was not found in the log, it should have been in tests/testlog")
 	}
 }

+ 34 - 8
backends/gateway.go

@@ -37,8 +37,14 @@ type BackendGateway struct {
 }
 
 type GatewayConfig struct {
-	WorkersSize    int    `json:"save_workers_size,omitempty"`
+	// WorkersSize controls how many concurrent workers to start. Defaults to 1
+	WorkersSize int `json:"save_workers_size,omitempty"`
+	// ProcessorStack controls which processors to chain in a stack.
 	ProcessorStack string `json:"process_stack,omitempty"`
+	// TimeoutSave is the number of seconds before timeout when saving an email
+	TimeoutSave int `json:"gw_save_timeout,omitempty"`
+	// TimeoutValidateRcpt is how many seconds before timeout when validating a recipient
+	TimeoutValidateRcpt int `json:"gw_val_rcpt_timeout,omitempty"`
 }
 
 // workerMsg is what get placed on the BackendGateway.saveMailChan channel
@@ -61,8 +67,11 @@ const (
 	BackendStateError
 	BackendStateInitialized
 
-	processTimeout   = time.Second * 30
-	defaultProcessor = "Debugger"
+	// default timeout for saving email, if 'gw_save_timeout' not present in config
+	saveTimeout = time.Second * 30
+	// default timeout for validating rcpt to, if 'gw_val_rcpt_timeout' not present in config
+	validateRcptTimeout = time.Second * 5
+	defaultProcessor    = "Debugger"
 )
 
 func (s backendState) String() string {
@@ -114,11 +123,10 @@ func (gw *BackendGateway) Process(e *mail.Envelope) Result {
 		}
 		return NewResult(response.Canned.SuccessMessageQueued + status.queuedID)
 
-	case <-time.After(processTimeout):
-		Log().Infof("Backend has timed out")
+	case <-time.After(gw.saveTimeout()):
+		Log().Error("Backend has timed out while saving eamil")
 		return NewResult(response.Canned.FailBackendTimeout)
 	}
-
 }
 
 // ValidateRcpt asks one of the workers to validate the recipient
@@ -139,8 +147,8 @@ func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
 		}
 		return nil
 
-	case <-time.After(time.Second):
-		Log().Infof("Backend has timed out")
+	case <-time.After(gw.validateRcptTimeout()):
+		Log().Error("Backend has timed out while validating rcpt")
 		return StorageTimeout
 	}
 }
@@ -295,6 +303,22 @@ func (gw *BackendGateway) workersSize() int {
 	return gw.gwConfig.WorkersSize
 }
 
+// saveTimeout returns the maximum amount of seconds to wait before timing out a save processing task
+func (gw *BackendGateway) saveTimeout() time.Duration {
+	if gw.gwConfig.TimeoutSave == 0 {
+		return saveTimeout
+	}
+	return time.Duration(gw.gwConfig.TimeoutSave)
+}
+
+// validateRcptTimeout returns the maximum amount of seconds to wait before timing out a recipient validation  task
+func (gw *BackendGateway) validateRcptTimeout() time.Duration {
+	if gw.gwConfig.TimeoutValidateRcpt == 0 {
+		return validateRcptTimeout
+	}
+	return time.Duration(gw.gwConfig.TimeoutValidateRcpt)
+}
+
 func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, workerId int, stop chan bool) {
 
 	defer func() {
@@ -317,6 +341,7 @@ func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, wo
 				Log().Debugf("worker stopped (#%d)", workerId)
 				return
 			}
+			msg.e.Lock()
 			if msg.task == TaskSaveMail {
 				// process the email here
 				// TODO we should check the err
@@ -338,6 +363,7 @@ func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, wo
 					msg.notifyMe <- &notifyMsg{err: nil}
 				}
 			}
+			msg.e.Unlock()
 		}
 	}
 }

+ 3 - 3
backends/p_compressor.go

@@ -90,7 +90,7 @@ func (c *compressor) clear() {
 }
 
 func Compressor() Decorator {
-	return func(c Processor) Processor {
+	return func(p Processor) Processor {
 		return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
 			if task == TaskSaveMail {
 				compressor := newCompressor()
@@ -98,9 +98,9 @@ func Compressor() Decorator {
 				// put the pointer in there for other processors to use later in the line
 				e.Values["zlib-compressor"] = compressor
 				// continue to the next Processor in the decorator stack
-				return c.Process(e, task)
+				return p.Process(e, task)
 			} else {
-				return c.Process(e, task)
+				return p.Process(e, task)
 			}
 		})
 	}

+ 3 - 3
backends/p_debugger.go

@@ -38,7 +38,7 @@ func Debugger() Decorator {
 		return nil
 	})
 	Svc.AddInitializer(initFunc)
-	return func(c Processor) Processor {
+	return func(p Processor) Processor {
 		return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
 			if task == TaskSaveMail {
 				if config.LogReceivedMails {
@@ -46,9 +46,9 @@ func Debugger() Decorator {
 					Log().Info("Headers are:", e.Header)
 				}
 				// continue to the next Processor in the decorator stack
-				return c.Process(e, task)
+				return p.Process(e, task)
 			} else {
-				return c.Process(e, task)
+				return p.Process(e, task)
 			}
 		})
 	}

+ 92 - 36
backends/p_guerrilla_db_redis.go

@@ -9,6 +9,7 @@ import (
 	"github.com/garyburd/redigo/redis"
 	"github.com/go-sql-driver/mysql"
 	"io"
+	"math/rand"
 	"runtime/debug"
 	"strings"
 	"sync"
@@ -36,7 +37,7 @@ func init() {
 var queryBatcherId = 0
 
 // how many rows to batch at a time
-const GuerrillaDBAndRedisBatchMax = 2
+const GuerrillaDBAndRedisBatchMax = 50
 
 // tick on every...
 const GuerrillaDBAndRedisBatchTimeout = time.Second * 3
@@ -46,6 +47,8 @@ type GuerrillaDBAndRedisBackend struct {
 	batcherWg sync.WaitGroup
 	// cache prepared queries
 	cache stmtCache
+
+	batcherStoppers []chan bool
 }
 
 // statement cache. It's an array, not slice
@@ -61,6 +64,7 @@ type guerrillaDBAndRedisConfig struct {
 	RedisExpireSeconds int    `json:"redis_expire_seconds"`
 	RedisInterface     string `json:"redis_interface"`
 	PrimaryHost        string `json:"primary_mail_host"`
+	BatchTimeout       int    `json:"redis_mysql_batch_timeout,omitempty"`
 }
 
 // Load the backend config for the backend. It has already been unmarshalled
@@ -170,7 +174,7 @@ func (g *GuerrillaDBAndRedisBackend) prepareInsertQuery(rows int, db *sql.DB) *s
 	return stmt
 }
 
-func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) {
+func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) error {
 	var execErr error
 	defer func() {
 		if r := recover(); r != nil {
@@ -189,9 +193,13 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
 	// prepare the query used to insert when rows reaches batchMax
 	insertStmt = g.prepareInsertQuery(c, db)
 	_, execErr = insertStmt.Exec(*vals...)
+	//if rand.Intn(2) == 1 {
+	//	return errors.New("uggabooka")
+	//}
 	if execErr != nil {
 		Log().WithError(execErr).Error("There was a problem the insert")
 	}
+	return execErr
 }
 
 // Batches the rows from the feeder chan in to a single INSERT statement.
@@ -201,7 +209,12 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
 // The goroutine can either exit if there's a panic or feeder channel closes
 // it returns feederOk which signals if the feeder chanel was ok (still open) while returning
 // if it feederOk is false, then it means the feeder chanel is closed
-func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{}, db *sql.DB) (feederOk bool) {
+func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(
+	feeder feedChan,
+	db *sql.DB,
+	batcherId int,
+	stop chan bool) (feederOk bool) {
+
 	// controls shutdown
 	defer g.batcherWg.Done()
 	g.batcherWg.Add(1)
@@ -209,22 +222,40 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
 	var vals []interface{}
 	// how many rows were batched
 	count := 0
-	// The timer will tick every second.
+	// The timer will tick x seconds.
 	// Interrupting the select clause when there's no data on the feeder channel
-	t := time.NewTimer(GuerrillaDBAndRedisBatchTimeout)
+	timeo := GuerrillaDBAndRedisBatchTimeout
+	if g.config.BatchTimeout > 0 {
+		timeo = time.Duration(g.config.BatchTimeout)
+	}
+	t := time.NewTimer(timeo)
 	// prepare the query used to insert when rows reaches batchMax
 	insertStmt := g.prepareInsertQuery(GuerrillaDBAndRedisBatchMax, db)
 	// inserts executes a batched insert query, clears the vals and resets the count
-	insert := func(c int) {
+	inserter := func(c int) {
 		if c > 0 {
-			g.doQuery(c, db, insertStmt, &vals)
+			err := g.doQuery(c, db, insertStmt, &vals)
+			if err != nil {
+				// maybe connection prob?
+				// retry the sql query
+				attempts := 3
+				for i := 0; i < attempts; i++ {
+					Log().Infof("retrying query query rows[%c] ", c)
+					time.Sleep(time.Second)
+					err = g.doQuery(c, db, insertStmt, &vals)
+					if err == nil {
+						continue
+					}
+				}
+			}
 		}
 		vals = nil
 		count = 0
 	}
+	rand.Seed(time.Now().UnixNano())
 	defer func() {
 		if r := recover(); r != nil {
-			Log().Error("insertQueryBatcher caught a panic", r)
+			Log().Error("insertQueryBatcher caught a panic", r, string(debug.Stack()))
 		}
 	}()
 	// Keep getting values from feeder and add to batch.
@@ -234,30 +265,33 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
 	for {
 		select {
 		// it may panic when reading on a closed feeder channel. feederOK detects if it was closed
-		case row, feederOk := <-feeder:
-			if row == nil {
-				Log().Infof("MySQL query batcher exiting (#%d)", queryBatcherId)
-				// Insert any remaining rows
-				insert(count)
-				return feederOk
-			}
+		case <-stop:
+			Log().Infof("MySQL query batcher stopped (#%d)", batcherId)
+			// Insert any remaining rows
+			inserter(count)
+			feederOk = false
+			close(feeder)
+			return
+		case row := <-feeder:
+
 			vals = append(vals, row...)
 			count++
-			Log().Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", queryBatcherId)
+			Log().Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", batcherId)
 			if count >= GuerrillaDBAndRedisBatchMax {
-				insert(GuerrillaDBAndRedisBatchMax)
+				inserter(GuerrillaDBAndRedisBatchMax)
 			}
 			// stop timer from firing (reset the interrupt)
 			if !t.Stop() {
+				// darin the timer
 				<-t.C
 			}
-			t.Reset(GuerrillaDBAndRedisBatchTimeout)
+			t.Reset(timeo)
 		case <-t.C:
 			// anything to insert?
 			if n := len(vals); n > 0 {
-				insert(count)
+				inserter(count)
 			}
-			t.Reset(GuerrillaDBAndRedisBatchTimeout)
+			t.Reset(timeo)
 		}
 	}
 }
@@ -271,14 +305,23 @@ func trimToLimit(str string, limit int) string {
 }
 
 func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
+	tOut := GuerrillaDBAndRedisBatchTimeout
+	if g.config.BatchTimeout > 0 {
+		tOut = time.Duration(g.config.BatchTimeout)
+	}
+	tOut += 10
+	// don't go to 30 sec or more
+	if tOut >= 30 {
+		tOut = 29
+	}
 	conf := mysql.Config{
 		User:         g.config.MysqlUser,
 		Passwd:       g.config.MysqlPass,
 		DBName:       g.config.MysqlDB,
 		Net:          "tcp",
 		Addr:         g.config.MysqlHost,
-		ReadTimeout:  GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
-		WriteTimeout: GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
+		ReadTimeout:  tOut * time.Second,
+		WriteTimeout: tOut * time.Second,
 		Params:       map[string]string{"collation": "utf8_general_ci"},
 	}
 	if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
@@ -307,6 +350,8 @@ func (c *redisClient) redisConnection(redisInterface string) (err error) {
 	return nil
 }
 
+type feedChan chan []interface{}
+
 // GuerrillaDbReddis is a specialized processor for Guerrilla mail. It is here as an example.
 // It's an example of a 'monolithic' processor.
 func GuerrillaDbReddis() Decorator {
@@ -319,9 +364,12 @@ func GuerrillaDbReddis() Decorator {
 
 	var redisErr error
 
-	feeder := make(chan []interface{}, 1)
+	var feeders []feedChan
+
+	g.batcherStoppers = make([]chan bool, 0)
 
 	Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
+
 		configType := BaseConfig(&guerrillaDBAndRedisConfig{})
 		bcfg, err := Svc.ExtractConfig(backendConfig, configType)
 		if err != nil {
@@ -332,37 +380,45 @@ func GuerrillaDbReddis() Decorator {
 		if err != nil {
 			return err
 		}
+		queryBatcherId++
 		// start the query SQL batching where we will send data via the feeder channel
-		go func() {
-			queryBatcherId++
+		stop := make(chan bool)
+		feeder := make(feedChan, 1)
+		go func(qbID int, stop chan bool) {
+			// we loop so that if insertQueryBatcher panics, it can recover and go in again
 			for {
-				if feederOK := g.insertQueryBatcher(feeder, db); !feederOK {
-					Log().Debugf("insertQueryBatcher exited (#%d)", queryBatcherId)
+				if feederOK := g.insertQueryBatcher(feeder, db, qbID, stop); !feederOK {
+					Log().Debugf("insertQueryBatcher exited (#%d)", qbID)
 					return
 				}
-				// if insertQueryBatcher panics, it can recover and go in again
 				Log().Debug("resuming insertQueryBatcher")
 			}
-		}()
+		}(queryBatcherId, stop)
+		g.batcherStoppers = append(g.batcherStoppers, stop)
+		feeders = append(feeders, feeder)
 		return nil
 	}))
 
 	Svc.AddShutdowner(ShutdownWith(func() error {
 		db.Close()
+		Log().Infof("closed mysql")
 		if redisClient.conn != nil {
 			Log().Infof("closed redis")
 			redisClient.conn.Close()
 		}
-		// close the feeder & wait for query batcher to exit.
-		close(feeder)
+		// send a close signal to all query batchers to exit.
+		for i := range g.batcherStoppers {
+			g.batcherStoppers[i] <- true
+		}
 		g.batcherWg.Wait()
+
 		return nil
 	}))
 
 	var vals []interface{}
 	data := newCompressedData()
 
-	return func(c Processor) Processor {
+	return func(p Processor) Processor {
 		return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
 			if task == TaskSaveMail {
 				Log().Debug("Got mail from chan,", e.RemoteIP)
@@ -414,12 +470,12 @@ func GuerrillaDbReddis() Decorator {
 					e.RemoteIP,
 					trimToLimit(e.MailFrom.String(), 255),
 					e.TLS)
-				// give the values to the query batcher
-				feeder <- vals
-				return c.Process(e, task)
+				// give the values to a random query batcher
+				feeders[rand.Intn(len(feeders))] <- vals
+				return p.Process(e, task)
 
 			} else {
-				return c.Process(e, task)
+				return p.Process(e, task)
 			}
 		})
 	}

+ 3 - 3
backends/p_hasher.go

@@ -31,7 +31,7 @@ func init() {
 // The hasher decorator computes a hash of the email for each recipient
 // It appends the hashes to envelope's Hashes slice.
 func Hasher() Decorator {
-	return func(c Processor) Processor {
+	return func(p Processor) Processor {
 		return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
 
 			if task == TaskSaveMail {
@@ -48,9 +48,9 @@ func Hasher() Decorator {
 					sum := h2.Sum([]byte{})
 					e.Hashes = append(e.Hashes, fmt.Sprintf("%x", sum))
 				}
-				return c.Process(e, task)
+				return p.Process(e, task)
 			} else {
-				return c.Process(e, task)
+				return p.Process(e, task)
 			}
 
 		})

+ 3 - 3
backends/p_header.go

@@ -46,7 +46,7 @@ func Header() Decorator {
 		return nil
 	}))
 
-	return func(c Processor) Processor {
+	return func(p Processor) Processor {
 		return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
 			if task == TaskSaveMail {
 				to := strings.TrimSpace(e.RcptTo[0].User) + "@" + config.PrimaryHost
@@ -64,10 +64,10 @@ func Header() Decorator {
 				// save the result
 				e.DeliveryHeader = addHead
 				// next processor
-				return c.Process(e, task)
+				return p.Process(e, task)
 
 			} else {
-				return c.Process(e, task)
+				return p.Process(e, task)
 			}
 		})
 	}

+ 3 - 3
backends/p_headers_parser.go

@@ -22,15 +22,15 @@ func init() {
 }
 
 func HeadersParser() Decorator {
-	return func(c Processor) Processor {
+	return func(p Processor) Processor {
 		return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
 			if task == TaskSaveMail {
 				e.ParseHeaders()
 				// next processor
-				return c.Process(e, task)
+				return p.Process(e, task)
 			} else {
 				// next processor
-				return c.Process(e, task)
+				return p.Process(e, task)
 			}
 		})
 	}

+ 4 - 4
backends/p_mysql.go

@@ -166,7 +166,7 @@ func MySql() Decorator {
 		return nil
 	}))
 
-	return func(c Processor) Processor {
+	return func(p Processor) Processor {
 		return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
 
 			if task == TaskSaveMail {
@@ -217,7 +217,7 @@ func MySql() Decorator {
 				stmt := mp.prepareInsertQuery(1, db)
 				mp.doQuery(1, db, stmt, &vals)
 				// continue to the next Processor in the decorator chain
-				return c.Process(e, task)
+				return p.Process(e, task)
 			} else if task == TaskValidateRcpt {
 				// if you need to validate the e.Rcpt then change to:
 				if len(e.RcptTo) > 0 {
@@ -229,9 +229,9 @@ func MySql() Decorator {
 						return NewResult(response.Canned.FailNoSenderDataCmd), NoSuchUser
 					}
 				}
-				return c.Process(e, task)
+				return p.Process(e, task)
 			} else {
-				return c.Process(e, task)
+				return p.Process(e, task)
 			}
 
 		})

+ 3 - 3
backends/p_redis.go

@@ -84,7 +84,7 @@ func Redis() Decorator {
 
 	var redisErr error
 
-	return func(c Processor) Processor {
+	return func(p Processor) Processor {
 		return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
 
 			if task == TaskSaveMail {
@@ -119,10 +119,10 @@ func Redis() Decorator {
 					Log().Error("Redis needs a Hash() process before it")
 				}
 
-				return c.Process(e, task)
+				return p.Process(e, task)
 			} else {
 				// nothing to do for this task
-				return c.Process(e, task)
+				return p.Process(e, task)
 			}
 
 		})

+ 1 - 1
backends/processor.go

@@ -31,7 +31,7 @@ type Processor interface {
 // Signature of Processor
 type ProcessWith func(*mail.Envelope, SelectTask) (Result, error)
 
-// Make ProcessorFunc will satisfy the Processor interface
+// Make ProcessWith will satisfy the Processor interface
 func (f ProcessWith) Process(e *mail.Envelope, task SelectTask) (Result, error) {
 	// delegate to the anonymous function
 	return f(e, task)

+ 9 - 7
client.go

@@ -50,11 +50,13 @@ type client struct {
 	log       log.Logger
 }
 
-// Allocate a new client
-func NewClient(conn net.Conn, clientID uint64, logger log.Logger) *client {
+// NewClient allocates a new client.
+func NewClient(conn net.Conn, clientID uint64, logger log.Logger, envelope *mail.Pool) *client {
 	c := &client{
-		conn:        conn,
-		Envelope:    mail.NewEnvelope(getRemoteAddr(conn), clientID),
+		conn: conn,
+		// Envelope will be borrowed from the envelope pool
+		// the envelope could be 'detached' from the client later when processing
+		Envelope:    envelope.Borrow(getRemoteAddr(conn), clientID),
 		ConnectedAt: time.Now(),
 		bufin:       newSMTPBufferedReader(conn),
 		bufout:      bufio.NewWriter(conn),
@@ -153,7 +155,7 @@ func (c *client) closeConn() {
 }
 
 // init is called after the client is borrowed from the pool, to get it ready for the connection
-func (c *client) init(conn net.Conn, clientID uint64) {
+func (c *client) init(conn net.Conn, clientID uint64, ep *mail.Pool) {
 	c.conn = conn
 	// reset our reader & writer
 	c.bufout.Reset(conn)
@@ -164,8 +166,8 @@ func (c *client) init(conn net.Conn, clientID uint64) {
 	c.ConnectedAt = time.Now()
 	c.ID = clientID
 	c.errors = 0
-	c.Envelope.Reseed(getRemoteAddr(conn), clientID)
-
+	// borrow an envelope from the envelope pool
+	c.Envelope = ep.Borrow(getRemoteAddr(conn), clientID)
 }
 
 // getID returns the client's unique ID

+ 9 - 0
cmd/guerrillad/serve.go

@@ -12,6 +12,7 @@ import (
 	"strconv"
 	"strings"
 	"syscall"
+	"time"
 )
 
 const (
@@ -66,6 +67,14 @@ func sigHandler() {
 			d.ReopenLogs()
 		} else if sig == syscall.SIGTERM || sig == syscall.SIGQUIT || sig == syscall.SIGINT {
 			mainlog.Infof("Shutdown signal caught")
+			go func() {
+				select {
+				// exit if graceful shutdown not finished in 60 sec.
+				case <-time.After(time.Second * 60):
+					mainlog.Error("graceful shutdown timed out")
+					os.Exit(1)
+				}
+			}()
 			d.Shutdown()
 			mainlog.Infof("Shutdown completed, exiting.")
 			return

+ 67 - 0
mail/envelope.go

@@ -14,6 +14,7 @@ import (
 	"net/textproto"
 	"regexp"
 	"strings"
+	"sync"
 	"time"
 )
 
@@ -59,6 +60,8 @@ type Envelope struct {
 	DeliveryHeader string
 	// Email(s) will be queued with this id
 	QueuedId string
+	// When locked, it means that the envelope is being processed by the backend
+	sync.Mutex
 }
 
 func NewEnvelope(remoteAddr string, clientID uint64) *Envelope {
@@ -264,3 +267,67 @@ func fixCharset(charset string) string {
 	}
 	return charset
 }
+
+// Envelopes have their own pool
+
+type Pool struct {
+	// envelopes that are ready to be borrowed
+	pool chan *Envelope
+	// semaphore to control number of maximum borrowed envelopes
+	sem chan bool
+}
+
+func NewPool(poolSize int) *Pool {
+	return &Pool{
+		pool: make(chan *Envelope, poolSize),
+		sem:  make(chan bool, poolSize),
+	}
+}
+
+func (p *Pool) Borrow(remoteAddr string, clientID uint64) *Envelope {
+	var e *Envelope
+	p.sem <- true // block the envelope until more room
+	select {
+	case e = <-p.pool:
+		e.Reseed(remoteAddr, clientID)
+	default:
+		e = NewEnvelope(remoteAddr, clientID)
+	}
+	return e
+}
+
+// Return returns an envelope back to the envelope pool
+// Note that an envelope will not be recycled while it still is
+// processing
+func (p *Pool) Return(e *Envelope) {
+	// we down't want to recycle an envelope that may still be processing
+	isUnlocked := func() <-chan bool {
+		signal := make(chan bool)
+		// make sure envelope finished processing
+		go func() {
+			// lock will block if still processing
+			e.Lock()
+			// got the lock, it means processing finished
+			e.Unlock()
+			// generate a signal
+			signal <- true
+		}()
+		return signal
+	}()
+
+	select {
+	case <-time.After(time.Second * 30):
+		// envelope still processing, we can't recycle it.
+	case <-isUnlocked:
+		// The envelope was _unlocked_, it finished processing
+		// put back in the pool or destroy
+		select {
+		case p.pool <- e:
+			//placed envelope back in pool
+		default:
+			// pool is full, don't return
+		}
+	}
+	// take a value off the semaphore to make room for more envelopes
+	<-p.sem
+}

+ 5 - 4
pool.go

@@ -3,6 +3,7 @@ package guerrilla
 import (
 	"errors"
 	"github.com/flashmob/go-guerrilla/log"
+	"github.com/flashmob/go-guerrilla/mail"
 	"net"
 	"sync"
 	"sync/atomic"
@@ -18,7 +19,7 @@ type Poolable interface {
 	// ability to set read/write timeout
 	setTimeout(t time.Duration)
 	// set a new connection and client id
-	init(c net.Conn, clientID uint64)
+	init(c net.Conn, clientID uint64, ep *mail.Pool)
 	// get a unique id
 	getID() uint64
 }
@@ -121,7 +122,7 @@ func (p *Pool) GetActiveClientsCount() int {
 }
 
 // Borrow a Client from the pool. Will block if len(activeClients) > maxClients
-func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger) (Poolable, error) {
+func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger, ep *mail.Pool) (Poolable, error) {
 	p.poolGuard.Lock()
 	defer p.poolGuard.Unlock()
 
@@ -134,9 +135,9 @@ func (p *Pool) Borrow(conn net.Conn, clientID uint64, logger log.Logger) (Poolab
 	case p.sem <- true: // block the client from serving until there is room
 		select {
 		case c = <-p.pool:
-			c.init(conn, clientID)
+			c.init(conn, clientID, ep)
 		default:
-			c = NewClient(conn, clientID, logger)
+			c = NewClient(conn, clientID, logger, ep)
 		}
 		p.activeClientsAdd(c)
 

+ 4 - 1
server.go

@@ -61,6 +61,7 @@ type server struct {
 	logStore     atomic.Value
 	mainlogStore atomic.Value
 	backendStore atomic.Value
+	envelopePool *mail.Pool
 }
 
 type allowedHosts struct {
@@ -76,6 +77,7 @@ func newServer(sc *ServerConfig, b backends.Backend, l log.Logger) (*server, err
 		listenInterface: sc.ListenInterface,
 		state:           ServerStateNew,
 		mainlog:         l,
+		envelopePool:    mail.NewPool(sc.MaxClients),
 	}
 	server.backendStore.Store(b)
 	var logOpenError error
@@ -216,6 +218,7 @@ func (server *server) Start(startWG *sync.WaitGroup) error {
 			c := p.(*client)
 			if borrow_err == nil {
 				server.handleClient(c)
+				server.envelopePool.Return(c.Envelope)
 				server.clientPool.Return(c)
 			} else {
 				server.log.WithError(borrow_err).Info("couldn't borrow a new client")
@@ -225,7 +228,7 @@ func (server *server) Start(startWG *sync.WaitGroup) error {
 			}
 			// intentionally placed Borrow in args so that it's called in the
 			// same main goroutine.
-		}(server.clientPool.Borrow(conn, clientID, server.log))
+		}(server.clientPool.Borrow(conn, clientID, server.log, server.envelopePool))
 
 	}
 }

+ 2 - 1
server_test.go

@@ -10,6 +10,7 @@ import (
 
 	"github.com/flashmob/go-guerrilla/backends"
 	"github.com/flashmob/go-guerrilla/log"
+	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mocks"
 )
 
@@ -67,7 +68,7 @@ func TestHandleClient(t *testing.T) {
 	}
 	conn, server := getMockServerConn(sc, t)
 	// call the serve.handleClient() func in a goroutine.
-	client := NewClient(conn.Server, 1, mainlog)
+	client := NewClient(conn.Server, 1, mainlog, mail.NewPool(5))
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {