Bläddra i källkod

configuration: new backend gateway config values & defaults
backend processing debugging & added test

flashmob 5 år sedan
förälder
incheckning
9e5bd996cc
5 ändrade filer med 262 tillägg och 128 borttagningar
  1. 20 16
      api_test.go
  2. 147 2
      backends/config.go
  3. 93 109
      backends/gateway.go
  4. 1 1
      backends/gateway_test.go
  5. 1 0
      server.go

+ 20 - 16
api_test.go

@@ -978,7 +978,7 @@ func TestStreamProcessorConfig(t *testing.T) {
 			},
 			"gateways": {
 				"default": {
-					"stream_save_process": "mimeanalyzer|chunksaver|test|debug",
+					"save_stream": "mimeanalyzer|chunksaver|test|debug",
 				},
 			},
 		},
@@ -1010,9 +1010,9 @@ func TestStreamProcessor(t *testing.T) {
 			},
 			"gateways": {
 				"default": {
-					"save_process":        "HeadersParser|Debugger",
-					"stream_save_process": "Header|headersparser|compress|Decompress|debug",
-					"post_process":        "Header|headersparser|compress|Decompress|debug",
+					"save_process": "HeadersParser|Debugger",
+					"save_stream":  "Header|headersparser|compress|Decompress|debug",
+					"post_process": "Header|headersparser|compress|Decompress|debug",
 				},
 			},
 		},
@@ -1063,7 +1063,7 @@ func TestStreamProcessor(t *testing.T) {
 }
 
 func TestStreamProcessorBackground(t *testing.T) {
-	return
+
 	if err := os.Truncate("tests/testlog", 0); err != nil {
 		t.Error(err)
 	}
@@ -1075,7 +1075,7 @@ func TestStreamProcessorBackground(t *testing.T) {
 				"debug": {
 					"log_reads": true,
 				},
-				"chunksaver": {
+				"moo:chunksaver": {
 					"chunk_size":     8000,
 					"storage_engine": "memory",
 					"compress_level": 0,
@@ -1084,10 +1084,10 @@ func TestStreamProcessorBackground(t *testing.T) {
 			"gateways": {
 				"default": {
 					"save_process":          "",
-					"stream_save_process":   "mimeanalyzer|chunksaver",
+					"save_stream":           "mimeanalyzer|moo",
 					"post_process_consumer": "Header|headersparser|compress|Decompress|debug",
-					"post_process_producer": "chunksaver",
-					"post_process_backlog":  100,
+					"post_process_producer": "moo",
+					"post_process_size":     100,
 					"stream_buffer_size":    1024,
 					"save_workers_size":     8,
 					"save_timeout":          "1s",
@@ -1102,7 +1102,7 @@ func TestStreamProcessorBackground(t *testing.T) {
 		t.Error(err)
 	}
 	body := "Subject: Test subject\r\n" +
-		//"\r\n" +
+		"\r\n" +
 		"A an email body.\r\n" +
 		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
 		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
@@ -1129,12 +1129,16 @@ func TestStreamProcessorBackground(t *testing.T) {
 		t.Error("could not read logfile")
 		return
 	}
-
+	time.Sleep(time.Second * 2)
 	// lets check for fingerprints
 	if strings.Index(string(b), "Debug stream") < 0 {
 		t.Error("did not log: Debug stream")
 	}
 
+	if strings.Index(string(b), "background process done") < 0 {
+		t.Error("did not log: background process done")
+	}
+
 	if strings.Index(string(b), "Error") != -1 {
 		t.Error("There was an error", string(b))
 	}
@@ -1406,8 +1410,8 @@ func TestStreamMimeProcessor(t *testing.T) {
 			},
 			"gateways": {
 				"default": {
-					"save_process":        "HeadersParser|Debugger",
-					"stream_save_process": "mimeanalyzer|headersparser|compress|Decompress|debug",
+					"save_process": "HeadersParser|Debugger",
+					"save_stream":  "mimeanalyzer|headersparser|compress|Decompress|debug",
 				},
 			},
 		},
@@ -1557,9 +1561,9 @@ func TestStreamChunkSaver(t *testing.T) {
 			},
 			"gateways": {
 				"default": {
-					"save_process":        "HeadersParser|Debugger",
-					"stream_save_process": "mimeanalyzer|chunksaver",
-					"save_timeout":        "5",
+					"save_process": "HeadersParser|Debugger",
+					"save_stream":  "mimeanalyzer|chunksaver",
+					"save_timeout": "5",
 				},
 			},
 		},

+ 147 - 2
backends/config.go

@@ -6,6 +6,7 @@ import (
 	"os"
 	"reflect"
 	"strings"
+	"time"
 )
 
 type ConfigGroup map[string]interface{}
@@ -24,6 +25,28 @@ type BackendConfig struct {
 
 */
 
+const (
+	validateRcptTimeout = time.Second * 5
+	defaultProcessor    = "Debugger"
+
+	// streamBufferSize sets the size of the buffer for the streaming processors,
+	// can be configured using `stream_buffer_size`
+	configStreamBufferSize       = 4096
+	configSaveWorkersCount       = 1
+	configValidateWorkersCount   = 1
+	configStreamWorkersCount     = 1
+	configBackgroundWorkersCount = 1
+	configSaveProcessSize        = 64
+	configValidateProcessSize    = 64
+	// configTimeoutSave: default timeout for saving email, if 'save_timeout' not present in config
+	configTimeoutSave = time.Second * 30
+	// configTimeoutValidateRcpt default timeout for validating rcpt to, if 'val_rcpt_timeout' not present in config
+	configTimeoutValidateRcpt = time.Second * 5
+	configTimeoutStream       = time.Second * 30
+	configSaveStreamSize      = 64
+	configPostProcessSize     = 64
+)
+
 func (c *BackendConfig) SetValue(ns configNameSpace, name string, key string, value interface{}) {
 	nsKey := ns.String()
 	if *c == nil {
@@ -226,7 +249,7 @@ func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed
 	// go through all the gateway configs,
 	// make a list of all the ones that have processors whose config had changed
 	for key, _ := range c[cg] {
-		// check the processors in the SaveProcess and StreamSaveProcess settings to see if
+		// check the processors in the SaveProcess and SaveStream settings to see if
 		// they changed. If changed, then make a record of which gateways use the processors
 		e, _ := Svc.ExtractConfig(ConfigGateways, key, c, configType)
 		bcfg := e.(*GatewayConfig)
@@ -237,7 +260,7 @@ func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed
 			}
 		}
 
-		config = NewStackConfig(bcfg.StreamSaveProcess, aliasMapStream)
+		config = NewStackConfig(bcfg.SaveStream, aliasMapStream)
 		for _, v := range config.list {
 			if _, ok := changedStreamProcessors[v.name]; ok {
 				changed[key] = true
@@ -298,3 +321,125 @@ func compareConfigGroup(old map[string]ConfigGroup, new map[string]ConfigGroup)
 	}
 	return
 }
+
+type GatewayConfig struct {
+	// SaveWorkersCount controls how many concurrent workers to start. Defaults to 1
+	SaveWorkersCount int `json:"save_workers_size,omitempty"`
+	// ValidateWorkersCount controls how many concurrent recipient validation workers to start. Defaults to 1
+	ValidateWorkersCount int `json:"validate_workers_size,omitempty"`
+	// StreamWorkersCount controls how many concurrent stream workers to start. Defaults to 1
+	StreamWorkersCount int `json:"stream_workers_size,omitempty"`
+	// BackgroundWorkersCount controls how many concurrent background stream workers to start. Defaults to 1
+	BackgroundWorkersCount int `json:"background_workers_size,omitempty"`
+
+	// SaveProcess controls which processors to chain in a stack for saving email tasks
+	SaveProcess string `json:"save_process,omitempty"`
+	// SaveProcessSize limits the amount of messages waiting in the queue to get processed by SaveProcess
+	SaveProcessSize int `json:"save_process_size,omitempty"`
+	// ValidateProcess is like ProcessorStack, but for recipient validation tasks
+	ValidateProcess string `json:"validate_process,omitempty"`
+	// ValidateProcessSize limits the amount of messages waiting in the queue to get processed by ValidateProcess
+	ValidateProcessSize int `json:"validate_process_size,omitempty"`
+
+	// TimeoutSave is duration before timeout when saving an email, eg "29s"
+	TimeoutSave string `json:"save_timeout,omitempty"`
+	// TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
+	TimeoutValidateRcpt string `json:"val_rcpt_timeout,omitempty"`
+	// TimeoutStream duration before timeout when processing a stream eg "1s"
+	TimeoutStream string `json:"stream_timeout,omitempty"`
+
+	// StreamBufferLen controls the size of the output buffer, in bytes. Default is 4096
+	StreamBufferSize int `json:"stream_buffer_size,omitempty"`
+	// SaveStream is same as a SaveProcess, but uses the StreamProcessor stack instead
+	SaveStream string `json:"save_stream,omitempty"`
+	// SaveStreamSize limits the amount of messages waiting in the queue to get processed by SaveStream
+	SaveStreamSize int `json:"save_stream_size,omitempty"`
+
+	// PostProcessSize controls the length of thq queue for background processing
+	PostProcessSize int `json:"post_process_size,omitempty"`
+	// PostProcessProducer specifies which StreamProcessor to use for reading data to the post process
+	PostProcessProducer string `json:"post_process_producer,omitempty"`
+	// PostProcessConsumer is same as SaveStream, but controls
+	PostProcessConsumer string `json:"post_process_consumer,omitempty"`
+}
+
+// saveWorkersCount gets the number of workers to use for saving email by reading the save_workers_size config value
+// Returns 1 if no config value was set
+func (c *GatewayConfig) saveWorkersCount() int {
+	if c.SaveWorkersCount <= 0 {
+		return configSaveWorkersCount
+	}
+	return c.SaveWorkersCount
+}
+
+func (c *GatewayConfig) validateWorkersCount() int {
+	if c.ValidateWorkersCount <= 0 {
+		return configValidateWorkersCount
+	}
+	return c.ValidateWorkersCount
+}
+
+func (c *GatewayConfig) streamWorkersCount() int {
+	if c.StreamWorkersCount <= 0 {
+		return configStreamWorkersCount
+	}
+	return c.StreamWorkersCount
+}
+
+func (c *GatewayConfig) backgroundWorkersCount() int {
+	if c.BackgroundWorkersCount <= 0 {
+		return configBackgroundWorkersCount
+	}
+	return c.BackgroundWorkersCount
+}
+func (c *GatewayConfig) saveProcessSize() int {
+	if c.SaveProcessSize <= 0 {
+		return configSaveProcessSize
+	}
+	return c.SaveProcessSize
+}
+
+func (c *GatewayConfig) validateProcessSize() int {
+	if c.ValidateProcessSize <= 0 {
+		return configValidateProcessSize
+	}
+	return c.ValidateProcessSize
+}
+
+func (c *GatewayConfig) saveStreamSize() int {
+	if c.SaveStreamSize <= 0 {
+		return configSaveStreamSize
+	}
+	return c.SaveStreamSize
+}
+
+func (c *GatewayConfig) postProcessSize() int {
+	if c.PostProcessSize <= 0 {
+		return configPostProcessSize
+	}
+	return c.PostProcessSize
+}
+
+// saveTimeout returns the maximum amount of seconds to wait before timing out a save processing task
+func (gw *BackendGateway) saveTimeout() time.Duration {
+	if gw.gwConfig.TimeoutSave == "" {
+		return configTimeoutSave
+	}
+	t, err := time.ParseDuration(gw.gwConfig.TimeoutSave)
+	if err != nil {
+		return configTimeoutSave
+	}
+	return t
+}
+
+// validateRcptTimeout returns the maximum amount of seconds to wait before timing out a recipient validation  task
+func (gw *BackendGateway) validateRcptTimeout() time.Duration {
+	if gw.gwConfig.TimeoutValidateRcpt == "" {
+		return configTimeoutValidateRcpt
+	}
+	t, err := time.ParseDuration(gw.gwConfig.TimeoutValidateRcpt)
+	if err != nil {
+		return configTimeoutValidateRcpt
+	}
+	return t
+}

+ 93 - 109
backends/gateway.go

@@ -34,6 +34,11 @@ type BackendGateway struct {
 	processors   []Processor
 	validators   []ValidatingProcessor
 	streamers    []streamer
+	background   []streamer
+
+	producer *StreamDecorator
+
+	decoratorLookup map[string]map[string]*StreamDecorator
 
 	workerID int
 
@@ -47,29 +52,6 @@ type BackendGateway struct {
 	buffers map[int][]byte
 }
 
-type GatewayConfig struct {
-	// WorkersSize controls how many concurrent workers to start. Defaults to 1
-	WorkersSize int `json:"save_workers_size,omitempty"`
-	// SaveProcess controls which processors to chain in a stack for saving email tasks
-	SaveProcess string `json:"save_process,omitempty"`
-	// ValidateProcess is like ProcessorStack, but for recipient validation tasks
-	ValidateProcess string `json:"validate_process,omitempty"`
-	// TimeoutSave is duration before timeout when saving an email, eg "29s"
-	TimeoutSave string `json:"save_timeout,omitempty"`
-	// TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
-	TimeoutValidateRcpt string `json:"val_rcpt_timeout,omitempty"`
-	// StreamSaveProcess is same as a SaveProcess, but uses the StreamProcessor stack instead
-	StreamSaveProcess string `json:"stream_save_process,omitempty"`
-	// StreamBufferLen controls the size of the output buffer, in bytes. Default is 4096
-	StreamBufferSize int `json:"stream_buffer_size,omitempty"`
-	// PostProcessBacklog controls the length of thq queue for background processing
-	PostProcessBacklog int `json:"post_process_backlog,omitempty"`
-	// PostProcessProducer specifies which StreamProcessor to use for reading data to the prost process
-	PostProcessProducer string `json:"post_process_producer,omitempty"`
-	// PostProcessConsumer is same as StreamSaveProcess, but controls
-	PostProcessConsumer string `json:"post_process_consumer,omitempty"`
-}
-
 // workerMsg is what get placed on the BackendGateway.saveMailChan channel
 type workerMsg struct {
 	// The email data
@@ -149,16 +131,6 @@ const (
 	BackendStateShuttered
 	BackendStateError
 	BackendStateInitialized
-
-	// default timeout for saving email, if 'save_timeout' not present in config
-	saveTimeout = time.Second * 30
-	// default timeout for validating rcpt to, if 'val_rcpt_timeout' not present in config
-	validateRcptTimeout = time.Second * 5
-	defaultProcessor    = "Debugger"
-
-	// streamBufferSize sets the size of the buffer for the streaming processors,
-	// can be configured using `stream_buffer_size`
-	streamBufferSize = 4096
 )
 
 func (s backendState) String() string {
@@ -304,7 +276,7 @@ func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
 }
 
 func (gw *BackendGateway) StreamOn() bool {
-	return len(gw.gwConfig.StreamSaveProcess) != 0
+	return len(gw.gwConfig.SaveStream) != 0
 }
 
 // newStreamDecorator creates a new StreamDecorator and calls Configure with its corresponding configuration
@@ -330,13 +302,7 @@ func (gw *BackendGateway) newStreamDecorator(cs stackConfigExpression, ns string
 
 func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {
 
-	m := newAliasMap(gw.config[ConfigStreamProcessors.String()])
-	c := newStackStreamProcessorConfig(gw.gwConfig.PostProcessProducer, m)
-	if len(c.list) == 0 {
-		Log().Error("gateway has no valid post_process_producer configured")
-		return
-	}
-	if d := gw.newStreamDecorator(c.list[0], ConfigStreamProcessors.String()); d == nil {
+	if d := gw.producer; d == nil {
 		Log().Error("gateway has failed creating a post_process_producer, check config")
 		return
 	} else {
@@ -368,24 +334,27 @@ func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {
 			select {
 			case status := <-workerMsg.notifyMe:
 				// email saving transaction completed
-				if status.result == BackendResultOK && status.queuedID != "" {
-					Log().Fields("queuedID", status.queuedID).Info("post-process email completed")
+				var fields []interface{}
+				var code int
+				if status.result != nil {
+					code = status.result.Code()
+					fields = append(fields, "queuedID", e.QueuedId, "code", code)
+
+				}
+				if code > 200 && code < 300 {
+					fields = append(fields, "messageID", e.MessageID)
+					Log().Fields(fields...).Info("background process done")
 					return
 				}
-				var fields []interface{}
 				if status.err != nil {
 					fields = append(fields, "error", status.err)
 				}
-				if status.result != nil {
-					fields = append(fields, "result", status.result, "code", status.result.Code())
-				}
 				if len(fields) > 0 {
-					fields = append(fields, "queuedID", status.queuedID)
-					Log().Fields(fields).Error("post-process completed with an error")
+					Log().Fields(fields...).Error("post-process completed with an error")
 					return
 				}
 				// both result & error are nil (should not happen)
-				Log().Fields("error", err, "queuedID", e.QueuedId).Error("no response from backend - post-process did not return a result or an error")
+				Log().Fields("queuedID", e.QueuedId).Error("no response from backend - post-process did not return a result or an error")
 				return
 			case <-time.After(gw.saveTimeout()):
 				Log().Fields("queuedID", e.QueuedId).Error("background post-processing timed-out, will keep waiting")
@@ -466,6 +435,12 @@ func (gw *BackendGateway) Shutdown() error {
 				Log().Fields("error", err, "gateway", gw.name).Error("failed shutting down stream")
 			}
 		}
+		for stream := range gw.background {
+			err := gw.background[stream].shutdown()
+			if err != nil {
+				Log().Fields("error", err, "gateway", gw.name).Error("failed shutting down background stream")
+			}
+		}
 		// call shutdown on all processor shutdowners
 		if err := Svc.shutdown(); err != nil {
 			return err
@@ -515,6 +490,7 @@ func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 
 func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 	var decorators []*StreamDecorator
+
 	noop := streamer{NoopStreamProcessor{}, decorators}
 	groupName := ConfigStreamProcessors.String()
 	c := newStackStreamProcessorConfig(stackConfig, newAliasMap(gw.config[groupName]))
@@ -523,6 +499,10 @@ func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 	}
 	for i := range c.list {
 		if d := gw.newStreamDecorator(c.list[i], groupName); d != nil {
+			if gw.decoratorLookup[groupName] == nil {
+				gw.decoratorLookup[groupName] = make(map[string]*StreamDecorator)
+			}
+			gw.decoratorLookup[groupName][c.list[i].String()] = d
 			decorators = append(decorators, d)
 		} else {
 			return streamer{nil, decorators}, c.notFound(c.list[i].name)
@@ -565,54 +545,57 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 		gw.State = BackendStateError
 		return err
 	}
-	workersSize := gw.workersSize()
-	if workersSize < 1 {
-		gw.State = BackendStateError
-		return errors.New("must have at least 1 worker")
-	}
+	gw.buffers = make(map[int][]byte) // individual buffers are made later
+	gw.decoratorLookup = make(map[string]map[string]*StreamDecorator)
 	gw.processors = make([]Processor, 0)
 	gw.validators = make([]ValidatingProcessor, 0)
 	gw.streamers = make([]streamer, 0)
-	for i := 0; i < workersSize; i++ {
+	gw.background = make([]streamer, 0)
+	for i := 0; i < gw.gwConfig.saveWorkersCount(); i++ {
 		p, err := gw.newStack(gw.gwConfig.SaveProcess)
 		if err != nil {
 			gw.State = BackendStateError
 			return err
 		}
 		gw.processors = append(gw.processors, p)
-
+	}
+	for i := 0; i < gw.gwConfig.validateWorkersCount(); i++ {
 		v, err := gw.newStack(gw.gwConfig.ValidateProcess)
 		if err != nil {
 			gw.State = BackendStateError
 			return err
 		}
 		gw.validators = append(gw.validators, v)
-
-		s, err := gw.newStreamStack(gw.gwConfig.StreamSaveProcess)
+	}
+	for i := 0; i < gw.gwConfig.streamWorkersCount(); i++ {
+		s, err := gw.newStreamStack(gw.gwConfig.SaveStream)
 		if err != nil {
 			gw.State = BackendStateError
 			return err
 		}
 		gw.streamers = append(gw.streamers, s)
 	}
+	for i := 0; i < gw.gwConfig.backgroundWorkersCount(); i++ {
+		c, err := gw.newStreamStack(gw.gwConfig.PostProcessConsumer)
+		if err != nil {
+			gw.State = BackendStateError
+			return err
+		}
+		gw.background = append(gw.background, c)
+	}
+	if err = gw.initProducer(); err != nil {
+		return err
+	}
+
 	// Initialize processors & stream processors
 	if err := Svc.Initialize(cfg); err != nil {
 		gw.State = BackendStateError
 		return err
 	}
-	if gw.conveyor == nil {
-		gw.conveyor = make(chan *workerMsg, workersSize)
-	}
-	if gw.conveyorValidation == nil {
-		gw.conveyorValidation = make(chan *workerMsg, workersSize)
-	}
-	if gw.conveyorStream == nil {
-		gw.conveyorStream = make(chan *workerMsg, workersSize)
-	}
-	if gw.conveyorStreamBg == nil {
-		gw.conveyorStreamBg = make(chan *workerMsg, workersSize)
-	}
-
+	gw.conveyor = make(chan *workerMsg, gw.gwConfig.saveProcessSize())
+	gw.conveyorValidation = make(chan *workerMsg, gw.gwConfig.validateProcessSize())
+	gw.conveyorStream = make(chan *workerMsg, gw.gwConfig.saveStreamSize())
+	gw.conveyorStreamBg = make(chan *workerMsg, gw.gwConfig.postProcessSize())
 	// ready to start
 	gw.State = BackendStateInitialized
 	return nil
@@ -628,6 +611,7 @@ func (gw *BackendGateway) Start() error {
 		gw.startWorkers(gw.conveyor, gw.processors)
 		gw.startWorkers(gw.conveyorValidation, gw.validators)
 		gw.startWorkers(gw.conveyorStream, gw.streamers)
+		gw.startWorkers(gw.conveyorStreamBg, gw.background)
 		gw.State = BackendStateRunning
 		return nil
 	} else {
@@ -669,39 +653,6 @@ func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, processors inte
 	}
 }
 
-// workersSize gets the number of workers to use for saving email by reading the save_workers_size config value
-// Returns 1 if no config value was set
-func (gw *BackendGateway) workersSize() int {
-	if gw.gwConfig.WorkersSize <= 0 {
-		return 1
-	}
-	return gw.gwConfig.WorkersSize
-}
-
-// saveTimeout returns the maximum amount of seconds to wait before timing out a save processing task
-func (gw *BackendGateway) saveTimeout() time.Duration {
-	if gw.gwConfig.TimeoutSave == "" {
-		return saveTimeout
-	}
-	t, err := time.ParseDuration(gw.gwConfig.TimeoutSave)
-	if err != nil {
-		return saveTimeout
-	}
-	return t
-}
-
-// validateRcptTimeout returns the maximum amount of seconds to wait before timing out a recipient validation  task
-func (gw *BackendGateway) validateRcptTimeout() time.Duration {
-	if gw.gwConfig.TimeoutValidateRcpt == "" {
-		return validateRcptTimeout
-	}
-	t, err := time.ParseDuration(gw.gwConfig.TimeoutValidateRcpt)
-	if err != nil {
-		return validateRcptTimeout
-	}
-	return t
-}
-
 type dispatcherState int
 
 const (
@@ -738,12 +689,13 @@ func (gw *BackendGateway) workDispatcher(
 	}()
 	state = dispatcherStateIdle
 	Log().Fields("id", workerId+1, "gateway", gw.name).
-		Infof("processing worker started")
+		Info("processing worker started")
 	for {
 		select {
 		case <-stop:
 			state = dispatcherStateStopped
-			Log().Infof("stop signal for worker (#%d)", workerId+1)
+			Log().Fields("id", workerId+1, "gateway", gw.name).
+				Info("stop signal for worker")
 			return
 		case msg = <-workIn:
 			state = dispatcherStateWorking // recovers from panic if in this state
@@ -783,9 +735,9 @@ func (gw *BackendGateway) workDispatcher(
 
 func (gw *BackendGateway) makeBuffer() []byte {
 	if gw.buffers == nil {
-		gw.buffers = make(map[int][]byte, gw.gwConfig.WorkersSize)
+		gw.buffers = make(map[int][]byte)
 	}
-	size := streamBufferSize
+	size := configStreamBufferSize
 	if gw.gwConfig.StreamBufferSize > 0 {
 		size = gw.gwConfig.StreamBufferSize
 	}
@@ -799,3 +751,35 @@ func (gw *BackendGateway) stopWorkers() {
 	}
 	gw.workerID = 0
 }
+
+func (gw *BackendGateway) initProducer() error {
+	notValid := errors.New("gateway has no valid [post_process_producer] configured")
+	if gw.gwConfig.PostProcessConsumer == "" {
+		// consumer not configured, so not active
+		return nil
+	}
+	if gw.gwConfig.PostProcessProducer == "" {
+		return notValid
+	}
+	ns := ConfigStreamProcessors.String()
+	m := newAliasMap(gw.config[ns])
+	c := newStackStreamProcessorConfig(gw.gwConfig.PostProcessProducer, m)
+	if len(c.list) == 0 {
+		return notValid
+	}
+	// check it there's already an instance of it
+	if gw.decoratorLookup[ns] != nil {
+		if v, ok := gw.decoratorLookup[ns][c.list[0].String()]; ok {
+			gw.producer = v
+			return nil
+		}
+	}
+	if d := gw.newStreamDecorator(c.list[0], ns); d != nil {
+		// use a new instance
+		gw.producer = d
+		return nil
+	} else {
+		return errors.New("please check gateway config [post_process_producer]")
+	}
+	return notValid
+}

+ 1 - 1
backends/gateway_test.go

@@ -49,7 +49,7 @@ func TestInitialize(t *testing.T) {
 
 	if gateway.conveyor == nil {
 		t.Error("gateway.conveyor should not be nil")
-	} else if cap(gateway.conveyor) != gateway.workersSize() {
+	} else if cap(gateway.conveyor) != gateway.configSaveWorkersSize() {
 		t.Error("gateway.conveyor channel buffer cap does not match worker size, cap was", cap(gateway.conveyor))
 	}
 

+ 1 - 0
server.go

@@ -741,5 +741,6 @@ func (s *server) copyEnvelope(src *mail.Envelope, dest *mail.Envelope) {
 	dest.ESMTP = src.ESMTP
 	dest.RcptTo = src.RcptTo
 	dest.MailFrom = src.MailFrom
+	dest.MessageID = src.MessageID
 	dest.TransportType = src.TransportType
 }