|
@@ -12,6 +12,8 @@ import (
|
|
"strings"
|
|
"strings"
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+var ErrProcessorNotFound error
|
|
|
|
+
|
|
// A backend gateway is a proxy that implements the Backend interface.
|
|
// A backend gateway is a proxy that implements the Backend interface.
|
|
// It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
|
|
// It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
|
|
// via a channel. Shutting down via Shutdown() will stop all workers.
|
|
// via a channel. Shutting down via Shutdown() will stop all workers.
|
|
@@ -142,7 +144,7 @@ func (gw *BackendGateway) Reinitialize() error {
|
|
// Decorators are functions of Decorator type, source files prefixed with p_*
|
|
// Decorators are functions of Decorator type, source files prefixed with p_*
|
|
// Each decorator does a specific task during the processing stage.
|
|
// Each decorator does a specific task during the processing stage.
|
|
// This function uses the config value process_stack to figure out which Decorator to use
|
|
// This function uses the config value process_stack to figure out which Decorator to use
|
|
-func (gw *BackendGateway) newProcessorLine() Processor {
|
|
|
|
|
|
+func (gw *BackendGateway) newProcessorLine() (Processor, error) {
|
|
var decorators []Decorator
|
|
var decorators []Decorator
|
|
cfg := strings.ToLower(strings.TrimSpace(gw.gwConfig.ProcessorStack))
|
|
cfg := strings.ToLower(strings.TrimSpace(gw.gwConfig.ProcessorStack))
|
|
if len(cfg) == 0 {
|
|
if len(cfg) == 0 {
|
|
@@ -153,11 +155,14 @@ func (gw *BackendGateway) newProcessorLine() Processor {
|
|
name := line[len(line)-1-i] // reverse order, since decorators are stacked
|
|
name := line[len(line)-1-i] // reverse order, since decorators are stacked
|
|
if makeFunc, ok := processors[name]; ok {
|
|
if makeFunc, ok := processors[name]; ok {
|
|
decorators = append(decorators, makeFunc())
|
|
decorators = append(decorators, makeFunc())
|
|
|
|
+ } else {
|
|
|
|
+ ErrProcessorNotFound = errors.New(fmt.Sprintf("processor [%s] not found", name))
|
|
|
|
+ return nil, ErrProcessorNotFound
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// build the call-stack of decorators
|
|
// build the call-stack of decorators
|
|
p := Decorate(DefaultProcessor{}, decorators...)
|
|
p := Decorate(DefaultProcessor{}, decorators...)
|
|
- return p
|
|
|
|
|
|
+ return p, nil
|
|
}
|
|
}
|
|
|
|
|
|
// loadConfig loads the config for the GatewayConfig
|
|
// loadConfig loads the config for the GatewayConfig
|
|
@@ -187,7 +192,11 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
|
|
}
|
|
}
|
|
var lines []Processor
|
|
var lines []Processor
|
|
for i := 0; i < workersSize; i++ {
|
|
for i := 0; i < workersSize; i++ {
|
|
- lines = append(lines, gw.newProcessorLine())
|
|
|
|
|
|
+ p, err := gw.newProcessorLine()
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ lines = append(lines, p)
|
|
}
|
|
}
|
|
// initialize processors
|
|
// initialize processors
|
|
if err := Svc.initialize(cfg); err != nil {
|
|
if err := Svc.initialize(cfg); err != nil {
|