Browse Source

- further refactoring to workDispatcher and developing background post-processing

flashmob 5 years ago
parent
commit
304ecca1dd
5 changed files with 137 additions and 40 deletions
  1. 1 1
      backends/config.go
  2. 3 0
      backends/decorate_stream.go
  3. 112 39
      backends/gateway.go
  4. 8 0
      backends/reader.go
  5. 13 0
      chunk/processor.go

+ 1 - 1
backends/config.go

@@ -69,7 +69,7 @@ func (c BackendConfig) toLower() {
 	}
 	}
 }
 }
 
 
-func (c BackendConfig) group(ns string, name string) ConfigGroup {
+func (c BackendConfig) lookupGroup(ns string, name string) ConfigGroup {
 	if v, ok := c[ns][name]; ok {
 	if v, ok := c[ns][name]; ok {
 		return v
 		return v
 	}
 	}

+ 3 - 0
backends/decorate_stream.go

@@ -28,6 +28,9 @@ type StreamDecorator struct {
 	// Shutdown is called to release any resources before StreamDecorator is destroyed
 	// Shutdown is called to release any resources before StreamDecorator is destroyed
 	// typically to close any database connections, cleanup any files, etc
 	// typically to close any database connections, cleanup any files, etc
 	Shutdown streamShutdownWith
 	Shutdown streamShutdownWith
+	// GetEmail returns a reader for reading the data of ab email,
+	// it may return nil if no email is available
+	GetEmail func(emailID uint64) (SeekPartReader, error)
 }
 }
 
 
 func (s StreamDecorator) ExtractConfig(cfg ConfigGroup, i interface{}) error {
 func (s StreamDecorator) ExtractConfig(cfg ConfigGroup, i interface{}) error {

+ 112 - 39
backends/gateway.go

@@ -54,10 +54,16 @@ type GatewayConfig struct {
 	TimeoutSave string `json:"save_timeout,omitempty"`
 	TimeoutSave string `json:"save_timeout,omitempty"`
 	// TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
 	// TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
 	TimeoutValidateRcpt string `json:"val_rcpt_timeout,omitempty"`
 	TimeoutValidateRcpt string `json:"val_rcpt_timeout,omitempty"`
-	// StreamSaveProcess is same as a ProcessorStack, but reads from an io.Reader to write email data
+	// StreamSaveProcess is same as a SaveProcess, but uses the StreamProcessor stack instead
 	StreamSaveProcess string `json:"stream_save_process,omitempty"`
 	StreamSaveProcess string `json:"stream_save_process,omitempty"`
 	// StreamBufferLen controls the size of the output buffer, in bytes. Default is 4096
 	// StreamBufferLen controls the size of the output buffer, in bytes. Default is 4096
 	StreamBufferSize int `json:"stream_buffer_size,omitempty"`
 	StreamBufferSize int `json:"stream_buffer_size,omitempty"`
+	// PostProcessBacklog controls the length of thq queue for background processing
+	PostProcessBacklog int `json:"post_process_backlog,omitempty"`
+	// PostProcessProducer specifies which StreamProcessor to use for reading data to the prost process
+	PostProcessProducer string `json:"post_process_producer,omitempty"`
+	// PostProcessConsumer is same as StreamSaveProcess, but controls
+	PostProcessConsumer string `json:"post_process_consumer,omitempty"`
 }
 }
 
 
 // workerMsg is what get placed on the BackendGateway.saveMailChan channel
 // workerMsg is what get placed on the BackendGateway.saveMailChan channel
@@ -295,8 +301,95 @@ func (gw *BackendGateway) StreamOn() bool {
 	return len(gw.gwConfig.StreamSaveProcess) != 0
 	return len(gw.gwConfig.StreamSaveProcess) != 0
 }
 }
 
 
+// newStreamDecorator creates a new StreamDecorator and calls Configure with its corresponding configuration
+// cs - the item of 'list' property, result from newStackStreamProcessorConfig()
+// ns - typically the result of calling ConfigStreamProcessors.String()
+func (gw *BackendGateway) newStreamDecorator(cs stackConfigExpression, ns string) *StreamDecorator {
+	if makeFunc, ok := Streamers[cs.name]; !ok {
+		return nil
+	} else {
+		d := makeFunc()
+		config := gw.config.lookupGroup(ns, cs.String())
+		if config == nil {
+			config = ConfigGroup{}
+		}
+		if d.Configure != nil {
+			if err := d.Configure(config); err != nil {
+				return nil
+			}
+		}
+		return d
+	}
+}
+
 func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {
 func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {
+	//defer e.Unlock()
+	m := newAliasMap(gw.config[ConfigStreamProcessors.String()])
+	c := newStackStreamProcessorConfig(gw.gwConfig.PostProcessProducer, m)
+	if len(c.list) == 0 {
+		Log().Error("gateway has no valid post_process_producer configured")
+		return
+	}
+	if d := gw.newStreamDecorator(c.list[0], ConfigStreamProcessors.String()); d == nil {
+		Log().Error("gateway has failed creating a post_process_producer, check config")
+		return
+	} else {
+		r, err := d.GetEmail(e.MessageID)
+		if err != nil {
+			Log().Fields("queuedID", e.QueuedId, "messageID", e.MessageID).
+				Error("gateway background process aborted: email with messageID not found")
+			return
+		}
+
+		// borrow a workerMsg from the pool
+		workerMsg := workerMsgPool.Get().(*workerMsg)
+		defer workerMsgPool.Put(workerMsg)
+		// we copy the envelope (ignore the "sync locker" warning)
+		envelope := *e
+		workerMsg.reset(&envelope, TaskSaveMailStream)
+		workerMsg.r = r
 
 
+		// place on the channel so that one of the save mail workers can pick it up
+		// buffered channel will block if full
+		select {
+		case gw.conveyorStreamBg <- workerMsg:
+			break
+		case <-time.After(gw.saveTimeout()):
+			Log().Fields("queuedID", e.QueuedId).Error("post-processing timeout - queue full, aborting")
+			return
+		}
+		// process in the background
+		go func() {
+			for {
+				select {
+				case status := <-workerMsg.notifyMe:
+					// email saving transaction completed
+					if status.result == BackendResultOK && status.queuedID != "" {
+						Log().Fields("queuedID", status.queuedID).Info("post-process email completed")
+						return
+					}
+					var fields []interface{}
+					if status.err != nil {
+						fields = append(fields, "error", status.err)
+					}
+					if status.result != nil {
+						fields = append(fields, "result", status.result, "code", status.result.Code())
+					}
+					if len(fields) > 0 {
+						fields = append(fields, "queuedID", status.queuedID)
+						Log().Fields(fields).Error("post-process completed with an error")
+						return
+					}
+					// both result & error are nil (should not happen)
+					Log().Fields("error", err, "queuedID", e.QueuedId).Error("no response from backend - post-process did not return a result or an error")
+					return
+				case <-time.After(gw.saveTimeout()):
+					Log().Fields("queuedID", e.QueuedId).Error("post-processing timeout")
+					return
+				}
+			}
+		}()
+	}
 }
 }
 
 
 func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result, error) {
 func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result, error) {
@@ -306,6 +399,7 @@ func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result,
 	}
 	}
 	// borrow a workerMsg from the pool
 	// borrow a workerMsg from the pool
 	workerMsg := workerMsgPool.Get().(*workerMsg)
 	workerMsg := workerMsgPool.Get().(*workerMsg)
+	workerMsgPool.Put(workerMsg)
 	workerMsg.reset(e, TaskSaveMailStream)
 	workerMsg.reset(e, TaskSaveMailStream)
 	workerMsg.r = r
 	workerMsg.r = r
 	// place on the channel so that one of the save mail workers can pick it up
 	// place on the channel so that one of the save mail workers can pick it up
@@ -347,7 +441,6 @@ func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result,
 			// keep waiting for the backend to finish processing
 			// keep waiting for the backend to finish processing
 			<-workerMsg.notifyMe
 			<-workerMsg.notifyMe
 			e.Unlock()
 			e.Unlock()
-			workerMsgPool.Put(workerMsg)
 		}()
 		}()
 		return NewResult(res.FailBackendTimeout), errors.New("gateway timeout")
 		return NewResult(res.FailBackendTimeout), errors.New("gateway timeout")
 	}
 	}
@@ -419,23 +512,13 @@ func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 	var decorators []*StreamDecorator
 	var decorators []*StreamDecorator
 	noop := streamer{NoopStreamProcessor{}, decorators}
 	noop := streamer{NoopStreamProcessor{}, decorators}
-	configKey := ConfigStreamProcessors.String()
-	c := newStackStreamProcessorConfig(stackConfig, newAliasMap(gw.config[configKey]))
+	groupName := ConfigStreamProcessors.String()
+	c := newStackStreamProcessorConfig(stackConfig, newAliasMap(gw.config[groupName]))
 	if len(c.list) == 0 {
 	if len(c.list) == 0 {
 		return noop, nil
 		return noop, nil
 	}
 	}
 	for i := range c.list {
 	for i := range c.list {
-		if makeFunc, ok := Streamers[c.list[i].name]; ok {
-			d := makeFunc()
-			config := gw.config.group(configKey, c.list[i].String())
-			if config == nil {
-				config = ConfigGroup{}
-			}
-			if d.Configure != nil {
-				if err := d.Configure(config); err != nil {
-					return noop, err
-				}
-			}
+		if d := gw.newStreamDecorator(c.list[i], groupName); d != nil {
 			decorators = append(decorators, d)
 			decorators = append(decorators, d)
 		} else {
 		} else {
 			return streamer{nil, decorators}, c.notFound(c.list[i].name)
 			return streamer{nil, decorators}, c.notFound(c.list[i].name)
@@ -631,18 +714,6 @@ func (gw *BackendGateway) workDispatcher(
 	stop chan bool) (state dispatcherState) {
 	stop chan bool) (state dispatcherState) {
 
 
 	var msg *workerMsg
 	var msg *workerMsg
-	var save Processor
-	var validate ValidatingProcessor
-	var stream streamer
-
-	switch v := processors.(type) {
-	case []Processor:
-		save = v[workerId]
-	case []ValidatingProcessor:
-		validate = v[workerId]
-	case []streamer:
-		stream = v[workerId]
-	}
 
 
 	defer func() {
 	defer func() {
 
 
@@ -672,18 +743,23 @@ func (gw *BackendGateway) workDispatcher(
 			return
 			return
 		case msg = <-workIn:
 		case msg = <-workIn:
 			state = dispatcherStateWorking // recovers from panic if in this state
 			state = dispatcherStateWorking // recovers from panic if in this state
-			if msg.task == TaskSaveMail {
-				result, err := save.Process(msg.e, msg.task)
+			switch v := processors.(type) {
+			case []Processor:
+				result, err := v[workerId].Process(msg.e, msg.task)
 				state = dispatcherStateNotify
 				state = dispatcherStateNotify
 				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
 				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
-			} else if msg.task == TaskSaveMailStream {
-				err := stream.open(msg.e)
+			case []ValidatingProcessor:
+				result, err := v[workerId].Process(msg.e, msg.task)
+				state = dispatcherStateNotify
+				msg.notifyMe <- &notifyMsg{err: err, result: result}
+			case []streamer:
+				err := v[workerId].open(msg.e)
 				if err == nil {
 				if err == nil {
-					if msg.e.Size, err = io.CopyBuffer(stream, msg.r, gw.buf); err != nil {
-						Log().WithError(err).Error("stream writing failed")
+					if msg.e.Size, err = io.CopyBuffer(v[workerId], msg.r, gw.buf); err != nil {
+						Log().Fields("error", err, "workerID", workerId+1).Error("stream writing failed")
 					}
 					}
-					if err = stream.close(); err != nil {
-						Log().WithError(err).Error("stream closing failed")
+					if err = v[workerId].close(); err != nil {
+						Log().Fields("error", err, "workerID", workerId+1).Error("stream closing failed")
 					}
 					}
 				}
 				}
 				state = dispatcherStateNotify
 				state = dispatcherStateNotify
@@ -694,10 +770,7 @@ func (gw *BackendGateway) workDispatcher(
 					result = NewResult(response.Canned.SuccessMessageQueued, response.SP, msg.e.QueuedId)
 					result = NewResult(response.Canned.SuccessMessageQueued, response.SP, msg.e.QueuedId)
 				}
 				}
 				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
 				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
-			} else {
-				result, err := validate.Process(msg.e, msg.task)
-				state = dispatcherStateNotify
-				msg.notifyMe <- &notifyMsg{err: err, result: result}
+
 			}
 			}
 		}
 		}
 		state = dispatcherStateIdle
 		state = dispatcherStateIdle

+ 8 - 0
backends/reader.go

@@ -0,0 +1,8 @@
+package backends
+
+import "io"
+
+type SeekPartReader interface {
+	io.Reader
+	SeekPart(part int) error
+}

+ 13 - 0
chunk/processor.go

@@ -112,6 +112,19 @@ func Chunksaver() *backends.StreamDecorator {
 		return err
 		return err
 	}
 	}
 
 
+	sd.GetEmail = func(emailID uint64) (backends.SeekPartReader, error) {
+		if database == nil {
+			return nil, errors.New("database is nil")
+		}
+		email, err := database.GetEmail(emailID)
+		if err != nil {
+			return nil, errors.New("email not found")
+
+		}
+		r, err := NewChunkedReader(database, email, 0)
+		return r, err
+	}
+
 	sd.Decorate =
 	sd.Decorate =
 		func(sp backends.StreamProcessor, a ...interface{}) backends.StreamProcessor {
 		func(sp backends.StreamProcessor, a ...interface{}) backends.StreamProcessor {
 			// optional dependency injection (you can pass your own instance of Storage or ChunkingBufferMime)
 			// optional dependency injection (you can pass your own instance of Storage or ChunkingBufferMime)