|
@@ -7,7 +7,7 @@ import (
|
|
"sync"
|
|
"sync"
|
|
"time"
|
|
"time"
|
|
|
|
|
|
- "github.com/flashmob/go-guerrilla/envelope"
|
|
|
|
|
|
+ "github.com/flashmob/go-guerrilla/mail"
|
|
"github.com/flashmob/go-guerrilla/response"
|
|
"github.com/flashmob/go-guerrilla/response"
|
|
"strings"
|
|
"strings"
|
|
)
|
|
)
|
|
@@ -17,10 +17,9 @@ import (
|
|
// via a channel. Shutting down via Shutdown() will stop all workers.
|
|
// via a channel. Shutting down via Shutdown() will stop all workers.
|
|
// The rest of this program always talks to the backend via this gateway.
|
|
// The rest of this program always talks to the backend via this gateway.
|
|
type BackendGateway struct {
|
|
type BackendGateway struct {
|
|
- // channel for sending envelopes to process and save
|
|
|
|
- saveMailChan chan *workerMsg
|
|
|
|
- // channel for validating the last recipient added to the envelope
|
|
|
|
- validateRcptChan chan *workerMsg
|
|
|
|
|
|
+ // channel for distributing envelopes to workers
|
|
|
|
+ conveyor chan *workerMsg
|
|
|
|
+
|
|
// waits for backend workers to start/stop
|
|
// waits for backend workers to start/stop
|
|
wg sync.WaitGroup
|
|
wg sync.WaitGroup
|
|
w *Worker
|
|
w *Worker
|
|
@@ -33,15 +32,18 @@ type BackendGateway struct {
|
|
}
|
|
}
|
|
|
|
|
|
type GatewayConfig struct {
|
|
type GatewayConfig struct {
|
|
- WorkersSize int `json:"save_workers_size,omitempty"`
|
|
|
|
- ProcessorLine string `json:"process_stack,omitempty"`
|
|
|
|
|
|
+ WorkersSize int `json:"save_workers_size,omitempty"`
|
|
|
|
+ ProcessorStack string `json:"process_stack,omitempty"`
|
|
}
|
|
}
|
|
|
|
|
|
-// savePayload is what get placed on the BackendGateway.saveMailChan channel
|
|
|
|
|
|
+// workerMsg is what get placed on the BackendGateway.saveMailChan channel
|
|
type workerMsg struct {
|
|
type workerMsg struct {
|
|
- mail *envelope.Envelope
|
|
|
|
|
|
+ // The email data
|
|
|
|
+ e *mail.Envelope
|
|
// savedNotify is used to notify that the save operation completed
|
|
// savedNotify is used to notify that the save operation completed
|
|
notifyMe chan *notifyMsg
|
|
notifyMe chan *notifyMsg
|
|
|
|
+ // select the task type
|
|
|
|
+ task SelectTask
|
|
}
|
|
}
|
|
|
|
|
|
// possible values for state
|
|
// possible values for state
|
|
@@ -49,9 +51,10 @@ const (
|
|
BackendStateRunning = iota
|
|
BackendStateRunning = iota
|
|
BackendStateShuttered
|
|
BackendStateShuttered
|
|
BackendStateError
|
|
BackendStateError
|
|
-)
|
|
|
|
|
|
|
|
-const ProcessTimeout = time.Second * 30
|
|
|
|
|
|
+ processTimeout = time.Second * 30
|
|
|
|
+ defaultProcessor = "Debugger"
|
|
|
|
+)
|
|
|
|
|
|
type backendState int
|
|
type backendState int
|
|
|
|
|
|
@@ -60,13 +63,13 @@ func (s backendState) String() string {
|
|
}
|
|
}
|
|
|
|
|
|
// Process distributes an envelope to one of the backend workers
|
|
// Process distributes an envelope to one of the backend workers
|
|
-func (gw *BackendGateway) Process(e *envelope.Envelope) Result {
|
|
|
|
|
|
+func (gw *BackendGateway) Process(e *mail.Envelope) Result {
|
|
if gw.State != BackendStateRunning {
|
|
if gw.State != BackendStateRunning {
|
|
return NewResult(response.Canned.FailBackendNotRunning + gw.State.String())
|
|
return NewResult(response.Canned.FailBackendNotRunning + gw.State.String())
|
|
}
|
|
}
|
|
// place on the channel so that one of the save mail workers can pick it up
|
|
// place on the channel so that one of the save mail workers can pick it up
|
|
savedNotify := make(chan *notifyMsg)
|
|
savedNotify := make(chan *notifyMsg)
|
|
- gw.saveMailChan <- &workerMsg{e, savedNotify}
|
|
|
|
|
|
+ gw.conveyor <- &workerMsg{e, savedNotify, TaskSaveMail}
|
|
// wait for the save to complete
|
|
// wait for the save to complete
|
|
// or timeout
|
|
// or timeout
|
|
select {
|
|
select {
|
|
@@ -76,7 +79,7 @@ func (gw *BackendGateway) Process(e *envelope.Envelope) Result {
|
|
}
|
|
}
|
|
return NewResult(response.Canned.SuccessMessageQueued + status.queuedID)
|
|
return NewResult(response.Canned.SuccessMessageQueued + status.queuedID)
|
|
|
|
|
|
- case <-time.After(ProcessTimeout):
|
|
|
|
|
|
+ case <-time.After(processTimeout):
|
|
Log().Infof("Backend has timed out")
|
|
Log().Infof("Backend has timed out")
|
|
return NewResult(response.Canned.FailBackendTimeout)
|
|
return NewResult(response.Canned.FailBackendTimeout)
|
|
}
|
|
}
|
|
@@ -85,13 +88,13 @@ func (gw *BackendGateway) Process(e *envelope.Envelope) Result {
|
|
|
|
|
|
// ValidateRcpt asks one of the workers to validate the recipient
|
|
// ValidateRcpt asks one of the workers to validate the recipient
|
|
// Only the last recipient appended to e.RcptTo will be validated.
|
|
// Only the last recipient appended to e.RcptTo will be validated.
|
|
-func (gw *BackendGateway) ValidateRcpt(e *envelope.Envelope) RcptError {
|
|
|
|
|
|
+func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
|
|
if gw.State != BackendStateRunning {
|
|
if gw.State != BackendStateRunning {
|
|
return StorageNotAvailable
|
|
return StorageNotAvailable
|
|
}
|
|
}
|
|
// place on the channel so that one of the save mail workers can pick it up
|
|
// place on the channel so that one of the save mail workers can pick it up
|
|
notify := make(chan *notifyMsg)
|
|
notify := make(chan *notifyMsg)
|
|
- gw.validateRcptChan <- &workerMsg{e, notify}
|
|
|
|
|
|
+ gw.conveyor <- &workerMsg{e, notify, TaskValidateRcpt}
|
|
// wait for the validation to complete
|
|
// wait for the validation to complete
|
|
// or timeout
|
|
// or timeout
|
|
select {
|
|
select {
|
|
@@ -112,7 +115,7 @@ func (gw *BackendGateway) Shutdown() error {
|
|
gw.Lock()
|
|
gw.Lock()
|
|
defer gw.Unlock()
|
|
defer gw.Unlock()
|
|
if gw.State != BackendStateShuttered {
|
|
if gw.State != BackendStateShuttered {
|
|
- close(gw.saveMailChan) // workers will stop
|
|
|
|
|
|
+ close(gw.conveyor) // workers will stop
|
|
// wait for workers to stop
|
|
// wait for workers to stop
|
|
gw.wg.Wait()
|
|
gw.wg.Wait()
|
|
Svc.shutdown()
|
|
Svc.shutdown()
|
|
@@ -141,10 +144,11 @@ func (gw *BackendGateway) Reinitialize() error {
|
|
// This function uses the config value process_stack to figure out which Decorator to use
|
|
// This function uses the config value process_stack to figure out which Decorator to use
|
|
func (gw *BackendGateway) newProcessorLine() Processor {
|
|
func (gw *BackendGateway) newProcessorLine() Processor {
|
|
var decorators []Decorator
|
|
var decorators []Decorator
|
|
- if len(gw.gwConfig.ProcessorLine) == 0 {
|
|
|
|
- return nil
|
|
|
|
|
|
+ cfg := strings.ToLower(strings.TrimSpace(gw.gwConfig.ProcessorStack))
|
|
|
|
+ if len(cfg) == 0 {
|
|
|
|
+ cfg = defaultProcessor
|
|
}
|
|
}
|
|
- line := strings.Split(strings.ToLower(gw.gwConfig.ProcessorLine), "|")
|
|
|
|
|
|
+ line := strings.Split(cfg, "|")
|
|
for i := range line {
|
|
for i := range line {
|
|
name := line[len(line)-1-i] // reverse order, since decorators are stacked
|
|
name := line[len(line)-1-i] // reverse order, since decorators are stacked
|
|
if makeFunc, ok := processors[name]; ok {
|
|
if makeFunc, ok := processors[name]; ok {
|
|
@@ -159,12 +163,9 @@ func (gw *BackendGateway) newProcessorLine() Processor {
|
|
// loadConfig loads the config for the GatewayConfig
|
|
// loadConfig loads the config for the GatewayConfig
|
|
func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
|
|
func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
|
|
configType := BaseConfig(&GatewayConfig{})
|
|
configType := BaseConfig(&GatewayConfig{})
|
|
- if _, ok := cfg["process_stack"]; !ok {
|
|
|
|
- cfg["process_stack"] = "Debugger"
|
|
|
|
- }
|
|
|
|
- if _, ok := cfg["save_workers_size"]; !ok {
|
|
|
|
- cfg["save_workers_size"] = 1
|
|
|
|
- }
|
|
|
|
|
|
+ // Note: treat config values as immutable
|
|
|
|
+ // if you need to change a config value, change in the file then
|
|
|
|
+ // send a SIGHUP
|
|
bcfg, err := Svc.ExtractConfig(cfg, configType)
|
|
bcfg, err := Svc.ExtractConfig(cfg, configType)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
@@ -192,13 +193,12 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
|
|
if err := Svc.initialize(cfg); err != nil {
|
|
if err := Svc.initialize(cfg); err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
- gw.saveMailChan = make(chan *workerMsg, workersSize)
|
|
|
|
- gw.validateRcptChan = make(chan *workerMsg, workersSize)
|
|
|
|
|
|
+ gw.conveyor = make(chan *workerMsg, workersSize)
|
|
// start our workers
|
|
// start our workers
|
|
gw.wg.Add(workersSize)
|
|
gw.wg.Add(workersSize)
|
|
for i := 0; i < workersSize; i++ {
|
|
for i := 0; i < workersSize; i++ {
|
|
go func(workerId int) {
|
|
go func(workerId int) {
|
|
- gw.w.workDispatcher(gw.saveMailChan, gw.validateRcptChan, lines[workerId], workerId+1)
|
|
|
|
|
|
+ gw.w.workDispatcher(gw.conveyor, lines[workerId], workerId+1)
|
|
gw.wg.Done()
|
|
gw.wg.Done()
|
|
}(i)
|
|
}(i)
|
|
}
|
|
}
|