Browse Source

- synchronise BackendService
- Remove initializers / shutdowners once backend is shutdown
- redis processor: propagate close error
- mysql processor: propaget close error
- add more comments

flashmob 8 years ago
parent
commit
91d4e66c06
4 changed files with 33 additions and 5 deletions
  1. 19 2
      backends/backend.go
  2. 12 1
      backends/gateway.go
  3. 1 1
      backends/p_mysql.go
  4. 1 1
      backends/p_redis.go

+ 19 - 2
backends/backend.go

@@ -7,6 +7,7 @@ import (
 	"reflect"
 	"strconv"
 	"strings"
+	"sync"
 )
 
 var (
@@ -23,6 +24,8 @@ func init() {
 	Processors = make(map[string]ProcessorConstructor)
 }
 
+type ProcessorConstructor func() Decorator
+
 // Backends process received mail. Depending on the implementation, they can store mail in the database,
 // write to a file, check for spam, re-transmit to another server, etc.
 // Must return an SMTP message (i.e. "250 OK") and a boolean indicating
@@ -53,8 +56,6 @@ type Worker interface {
 */
 type BackendConfig map[string]interface{}
 
-type ProcessorConstructor func() Decorator
-
 type baseConfig interface{}
 
 type saveStatus struct {
@@ -129,6 +130,7 @@ func (s Shutdown) Shutdown() error {
 
 type BackendService struct {
 	ProcessorHandlers
+	sync.Mutex
 }
 
 type ProcessorHandlers struct {
@@ -136,24 +138,39 @@ type ProcessorHandlers struct {
 	Shutdowners  []ProcessorShutdowner
 }
 
+// AddInitializer adds a function that impliments ProcessorShutdowner to be called when initializing
 func (b *BackendService) AddInitializer(i ProcessorInitializer) {
+	b.Lock()
+	defer b.Unlock()
 	b.Initializers = append(b.Initializers, i)
 }
 
+// AddShutdowner adds a function that impliments ProcessorShutdowner to be called when shutting down
 func (b *BackendService) AddShutdowner(i ProcessorShutdowner) {
+	b.Lock()
+	defer b.Unlock()
 	b.Shutdowners = append(b.Shutdowners, i)
 }
 
+// Initialize initializes all the processors by
 func (b *BackendService) Initialize(backend BackendConfig) {
+	b.Lock()
+	defer b.Unlock()
 	for i := range b.Initializers {
 		b.Initializers[i].Initialize(backend)
 	}
 }
 
+// Shutdown shuts down all the processor by calling their shutdowners
+// It also clears the initializers and shutdowners that were set with AddInitializer and AddShutdowner
 func (b *BackendService) Shutdown() {
+	b.Lock()
+	defer b.Unlock()
 	for i := range b.Shutdowners {
 		b.Shutdowners[i].Shutdown()
 	}
+	b.Initializers = make([]ProcessorInitializer, 0)
+	b.Shutdowners = make([]ProcessorShutdowner, 0)
 }
 
 // extractConfig loads the backend config. It has already been unmarshalled

+ 12 - 1
backends/gateway.go

@@ -87,14 +87,16 @@ func (gw *BackendGateway) Process(e *envelope.Envelope) BackendResult {
 		return NewBackendResult(response.Canned.FailBackendTimeout)
 	}
 }
+
+// Shutdown shuts down the backend and leaves it in BackendStateShuttered state
 func (gw *BackendGateway) Shutdown() error {
 	gw.stateGuard.Lock()
 	defer gw.stateGuard.Unlock()
 	if gw.State != BackendStateShuttered {
 		close(gw.saveMailChan) // workers will stop
 		gw.wg.Wait()
-		gw.State = BackendStateShuttered
 		Service.Shutdown()
+		gw.State = BackendStateShuttered
 	}
 	return nil
 }
@@ -112,6 +114,10 @@ func (gw *BackendGateway) Reinitialize() error {
 	return err
 }
 
+// newProcessorLine creates a new stack of decorators and returns as a single Processor
+// Decorators are functions of Decorator type, source files prefixed with p_*
+// Each decorator does a specific task during the processing stage.
+// This function uses the config value process_line to figure out which Decorator to use
 func (gw *BackendGateway) newProcessorLine() Processor {
 	var decorators []Decorator
 	if len(gw.gwConfig.ProcessorLine) == 0 {
@@ -124,10 +130,12 @@ func (gw *BackendGateway) newProcessorLine() Processor {
 			decorators = append(decorators, makeFunc())
 		}
 	}
+	// build the call-stack of decorators
 	p := Decorate(DefaultProcessor{}, decorators...)
 	return p
 }
 
+// loadConfig loads the config for the GatewayConfig
 func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
 	configType := baseConfig(&GatewayConfig{})
 	bcfg, err := Service.extractConfig(cfg, configType)
@@ -138,6 +146,7 @@ func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
 	return nil
 }
 
+// Initialize builds the workers and starts each worker in a thread
 func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 	err := gw.loadConfig(cfg)
 	if err == nil {
@@ -168,6 +177,8 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 	return err
 }
 
+// getNumberOfWorkers gets the number of workers to use for saving email by reading the save_workers_size config value
+// Returns 1 if no config value was set
 func (gw *BackendGateway) getNumberOfWorkers() int {
 	if gw.gwConfig.WorkersSize == 0 {
 		return 1

+ 1 - 1
backends/p_mysql.go

@@ -153,7 +153,7 @@ func MySql() Decorator {
 	// shutdown
 	Service.AddShutdowner(Shutdown(func() error {
 		if db != nil {
-			db.Close()
+			return db.Close()
 		}
 		return nil
 	}))

+ 1 - 1
backends/p_redis.go

@@ -77,7 +77,7 @@ func Redis() Decorator {
 	// When shutting down
 	Service.AddShutdowner(Shutdown(func() error {
 		if redisClient.isConnected {
-			redisClient.conn.Close()
+			return redisClient.conn.Close()
 		}
 		return nil
 	}))