Browse Source

envelope: use WaitGroup instead of lock
background processing: borrow a new envelope and copy the existing protocol params to it.

flashmob 5 years ago
parent
commit
5610107491
4 changed files with 78 additions and 64 deletions
  1. 39 39
      backends/gateway.go
  2. 0 4
      client.go
  3. 18 19
      mail/envelope.go
  4. 21 2
      server.go

+ 39 - 39
backends/gateway.go

@@ -251,12 +251,13 @@ func (gw *BackendGateway) Process(e *mail.Envelope) Result {
 		return NewResult(response.Canned.FailBackendTransaction, response.SP, err)
 		return NewResult(response.Canned.FailBackendTransaction, response.SP, err)
 
 
 	case <-time.After(gw.saveTimeout()):
 	case <-time.After(gw.saveTimeout()):
-		Log().Error("Backend has timed out while saving email")
-		e.Lock() // lock the envelope - it's still processing here, we don't want the server to recycle it
+		Log().Fields("queuedId", e.QueuedId).Error("backend has timed out while saving email")
+		e.Add(1) // lock the envelope - it's still processing here, we don't want the server to recycle it
 		go func() {
 		go func() {
 			// keep waiting for the backend to finish processing
 			// keep waiting for the backend to finish processing
 			<-workerMsg.notifyMe
 			<-workerMsg.notifyMe
-			e.Unlock()
+			Log().Fields("queuedId", e.QueuedId).Error("finished processing mail after timeout")
+			e.Done()
 		}()
 		}()
 		return NewResult(response.Canned.FailBackendTimeout)
 		return NewResult(response.Canned.FailBackendTimeout)
 	}
 	}
@@ -287,11 +288,12 @@ func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
 		return nil
 		return nil
 
 
 	case <-time.After(gw.validateRcptTimeout()):
 	case <-time.After(gw.validateRcptTimeout()):
-		e.Lock()
+		Log().Fields("queuedId", e.QueuedId).Error("backend has timed out while validating rcpt")
+		e.Add(1) // lock the envelope - it's still processing here, we don't want the server to recycle it
 		go func() {
 		go func() {
 			<-workerMsg.notifyMe
 			<-workerMsg.notifyMe
-			e.Unlock()
-			Log().Error("Backend has timed out while validating rcpt")
+			Log().Fields("queuedId", e.QueuedId).Error("finished validating rcpt after timeout")
+			e.Done()
 		}()
 		}()
 		return StorageTimeout
 		return StorageTimeout
 	}
 	}
@@ -323,7 +325,7 @@ func (gw *BackendGateway) newStreamDecorator(cs stackConfigExpression, ns string
 }
 }
 
 
 func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {
 func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {
-	//defer e.Unlock()
+
 	m := newAliasMap(gw.config[ConfigStreamProcessors.String()])
 	m := newAliasMap(gw.config[ConfigStreamProcessors.String()])
 	c := newStackStreamProcessorConfig(gw.gwConfig.PostProcessProducer, m)
 	c := newStackStreamProcessorConfig(gw.gwConfig.PostProcessProducer, m)
 	if len(c.list) == 0 {
 	if len(c.list) == 0 {
@@ -344,9 +346,8 @@ func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {
 		// borrow a workerMsg from the pool
 		// borrow a workerMsg from the pool
 		workerMsg := workerMsgPool.Get().(*workerMsg)
 		workerMsg := workerMsgPool.Get().(*workerMsg)
 		defer workerMsgPool.Put(workerMsg)
 		defer workerMsgPool.Put(workerMsg)
-		// we copy the envelope (ignore the "sync locker" warning)
-		envelope := *e
-		workerMsg.reset(&envelope, TaskSaveMailStream)
+
+		workerMsg.reset(e, TaskSaveMailStream)
 		workerMsg.r = r
 		workerMsg.r = r
 
 
 		// place on the channel so that one of the save mail workers can pick it up
 		// place on the channel so that one of the save mail workers can pick it up
@@ -359,36 +360,34 @@ func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {
 			return
 			return
 		}
 		}
 		// process in the background
 		// process in the background
-		go func() {
-			for {
-				select {
-				case status := <-workerMsg.notifyMe:
-					// email saving transaction completed
-					if status.result == BackendResultOK && status.queuedID != "" {
-						Log().Fields("queuedID", status.queuedID).Info("post-process email completed")
-						return
-					}
-					var fields []interface{}
-					if status.err != nil {
-						fields = append(fields, "error", status.err)
-					}
-					if status.result != nil {
-						fields = append(fields, "result", status.result, "code", status.result.Code())
-					}
-					if len(fields) > 0 {
-						fields = append(fields, "queuedID", status.queuedID)
-						Log().Fields(fields).Error("post-process completed with an error")
-						return
-					}
-					// both result & error are nil (should not happen)
-					Log().Fields("error", err, "queuedID", e.QueuedId).Error("no response from backend - post-process did not return a result or an error")
+		for {
+			select {
+			case status := <-workerMsg.notifyMe:
+				// email saving transaction completed
+				if status.result == BackendResultOK && status.queuedID != "" {
+					Log().Fields("queuedID", status.queuedID).Info("post-process email completed")
 					return
 					return
-				case <-time.After(gw.saveTimeout()):
-					Log().Fields("queuedID", e.QueuedId).Error("post-processing timeout")
+				}
+				var fields []interface{}
+				if status.err != nil {
+					fields = append(fields, "error", status.err)
+				}
+				if status.result != nil {
+					fields = append(fields, "result", status.result, "code", status.result.Code())
+				}
+				if len(fields) > 0 {
+					fields = append(fields, "queuedID", status.queuedID)
+					Log().Fields(fields).Error("post-process completed with an error")
 					return
 					return
 				}
 				}
+				// both result & error are nil (should not happen)
+				Log().Fields("error", err, "queuedID", e.QueuedId).Error("no response from backend - post-process did not return a result or an error")
+				return
+			case <-time.After(gw.saveTimeout()):
+				Log().Fields("queuedID", e.QueuedId).Error("background post-processing timed-out, will keep waiting")
+				// don't return here, keep waiting for workerMsg.notifyMe
 			}
 			}
-		}()
+		}
 	}
 	}
 }
 }
 
 
@@ -435,12 +434,13 @@ func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result,
 		return NewResult(res.FailBackendTransaction, response.SP, err), err
 		return NewResult(res.FailBackendTransaction, response.SP, err), err
 
 
 	case <-time.After(gw.saveTimeout()):
 	case <-time.After(gw.saveTimeout()):
-		Log().Error("Backend has timed out while saving email")
-		e.Lock() // lock the envelope - it's still processing here, we don't want the server to recycle it
+		Log().Fields("queuedID", e.QueuedId).Error("backend has timed out while saving email stream")
+		e.Add(1) // lock the envelope - it's still processing here, we don't want the server to recycle it
 		go func() {
 		go func() {
 			// keep waiting for the backend to finish processing
 			// keep waiting for the backend to finish processing
 			<-workerMsg.notifyMe
 			<-workerMsg.notifyMe
-			e.Unlock()
+			e.Done()
+			Log().Fields("queuedID", e.QueuedId).Info("backend has finished saving email stream after timeout")
 		}()
 		}()
 		return NewResult(res.FailBackendTimeout), errors.New("gateway timeout")
 		return NewResult(res.FailBackendTimeout), errors.New("gateway timeout")
 	}
 	}

+ 0 - 4
client.go

@@ -236,7 +236,3 @@ func (c *client) parsePath(in []byte, p pathParser) (mail.Address, error) {
 	}
 	}
 	return address, err
 	return address, err
 }
 }
-
-func (s *server) rcptTo() (address mail.Address, err error) {
-	return address, err
-}

+ 18 - 19
mail/envelope.go

@@ -120,44 +120,45 @@ func NewAddress(str string) (*Address, error) {
 
 
 // Envelope of Email represents a single SMTP message.
 // Envelope of Email represents a single SMTP message.
 type Envelope struct {
 type Envelope struct {
-	// Remote IP address
-	RemoteIP string
-	// Message sent in EHLO command
-	Helo string
-	// Sender
-	MailFrom Address
-	// Recipients
-	RcptTo []Address
 	// Data stores the header and message body (when using the non-streaming processor)
 	// Data stores the header and message body (when using the non-streaming processor)
 	Data bytes.Buffer
 	Data bytes.Buffer
 	// Subject stores the subject of the email, extracted and decoded after calling ParseHeaders()
 	// Subject stores the subject of the email, extracted and decoded after calling ParseHeaders()
 	Subject string
 	Subject string
-	// TLS is true if the email was received using a TLS connection
-	TLS bool
 	// Header stores the results from ParseHeaders()
 	// Header stores the results from ParseHeaders()
 	Header textproto.MIMEHeader
 	Header textproto.MIMEHeader
 	// Values hold the values generated when processing the envelope by the backend
 	// Values hold the values generated when processing the envelope by the backend
 	Values map[string]interface{}
 	Values map[string]interface{}
 	// Hashes of each email on the rcpt
 	// Hashes of each email on the rcpt
 	Hashes []string
 	Hashes []string
-	// additional delivery header that may be added
+	// DeliveryHeader stores additional delivery header that may be added (used by non-streaming processor)
 	DeliveryHeader string
 	DeliveryHeader string
-	// Email(s) will be queued with this id
-	QueuedId string
-	// TransportType indicates whenever 8BITMIME extension has been signaled
-	TransportType smtp.TransportType
 	// Size is the length of message, after being written
 	// Size is the length of message, after being written
 	Size int64
 	Size int64
 	// MimeParts contain the information about the mime-parts after they have been parsed
 	// MimeParts contain the information about the mime-parts after they have been parsed
 	MimeParts *mimeparse.Parts
 	MimeParts *mimeparse.Parts
 	// MessageID contains the id of the message after it has been written
 	// MessageID contains the id of the message after it has been written
 	MessageID uint64
 	MessageID uint64
+	// Remote IP address
+	RemoteIP string
+	// Message sent in EHLO command
+	Helo string
+	// Sender
+	MailFrom Address
+	// Recipients
+	RcptTo []Address
+	// TLS is true if the email was received using a TLS connection
+	TLS bool
+	// Email(s) will be queued with this id
+	QueuedId string
+	// TransportType indicates whenever 8BITMIME extension has been signaled
+	TransportType smtp.TransportType
 	// ESMTP: true if EHLO was used
 	// ESMTP: true if EHLO was used
 	ESMTP bool
 	ESMTP bool
 	// ServerIface records the server's interface in the config
 	// ServerIface records the server's interface in the config
 	ServerIface string
 	ServerIface string
+
 	// When locked, it means that the envelope is being processed by the backend
 	// When locked, it means that the envelope is being processed by the backend
-	sync.Mutex
+	sync.WaitGroup
 }
 }
 
 
 func NewEnvelope(remoteAddr string, clientID uint64, iface string) *Envelope {
 func NewEnvelope(remoteAddr string, clientID uint64, iface string) *Envelope {
@@ -216,9 +217,7 @@ func (e *Envelope) String() string {
 func (e *Envelope) ResetTransaction() {
 func (e *Envelope) ResetTransaction() {
 
 
 	// ensure not processing by the backend, will only get lock if finished, otherwise block
 	// ensure not processing by the backend, will only get lock if finished, otherwise block
-	e.Lock()
-	// got the lock, it means processing finished
-	e.Unlock()
+	e.Wait()
 
 
 	e.MailFrom = Address{}
 	e.MailFrom = Address{}
 	e.RcptTo = []Address{}
 	e.RcptTo = []Address{}

+ 21 - 2
server.go

@@ -93,7 +93,7 @@ func newServer(sc *ServerConfig, b backends.Backend, mainlog log.Logger) (*serve
 		closedListener:  make(chan bool, 1),
 		closedListener:  make(chan bool, 1),
 		listenInterface: sc.ListenInterface,
 		listenInterface: sc.ListenInterface,
 		state:           ServerStateNew,
 		state:           ServerStateNew,
-		envelopePool:    mail.NewPool(sc.MaxClients),
+		envelopePool:    mail.NewPool(sc.MaxClients * 2),
 	}
 	}
 	server.mainlogStore.Store(mainlog)
 	server.mainlogStore.Store(mainlog)
 	server.backendStore.Store(b)
 	server.backendStore.Store(b)
@@ -596,7 +596,17 @@ func (s *server) handleClient(client *client) {
 				// process the message as a stream
 				// process the message as a stream
 				res, err = be.ProcessStream(client.smtpReader.DotReader(), client.Envelope)
 				res, err = be.ProcessStream(client.smtpReader.DotReader(), client.Envelope)
 				if err == nil && res.Code() < 300 {
 				if err == nil && res.Code() < 300 {
-					be.ProcessBackground(client.Envelope)
+					e := s.envelopePool.Borrow(
+						client.Envelope.RemoteIP,
+						client.ID,
+						client.Envelope.ServerIface,
+					)
+					s.copyEnvelope(client.Envelope, e)
+					// process in the background then return the envelope
+					go func() {
+						be.ProcessBackground(e)
+						s.envelopePool.Return(e)
+					}()
 				}
 				}
 			} else {
 			} else {
 				// or buffer the entire message (parse headers & mime structure as we go along)
 				// or buffer the entire message (parse headers & mime structure as we go along)
@@ -724,3 +734,12 @@ func (s *server) defaultHost(a *mail.Address) {
 		}
 		}
 	}
 	}
 }
 }
+
+func (s *server) copyEnvelope(src *mail.Envelope, dest *mail.Envelope) {
+	dest.TLS = src.TLS
+	dest.Helo = src.Helo
+	dest.ESMTP = src.ESMTP
+	dest.RcptTo = src.RcptTo
+	dest.MailFrom = src.MailFrom
+	dest.TransportType = src.TransportType
+}