Sfoglia il codice sorgente

- Backend shutdown process re-written to use channels (workStoppers). Previous method could cause panic / race condition
- Do not close conveyor channel on shutdown
- Default gateway adds the following states: BackendStateNew when created, BackendStateInitialized after call to Initialize().
- Backend interface has new Start() and Reinitialize() functions. Added comments for all interface funcs.
- Reinitialize() clears all initializers, can only be called after backend was shut down.
- limit more identifiers to private access
- amalgamated worker.go into gateway go
- Added gateway backend test
- Fixed typos in the RCPT error messages
- updated server to use the new backend start / restart mechanism
- updated tests
- added new Len() method to mail envelope as it could be useful for backends.

flashmob 8 anni fa
parent
commit
c359a95316
14 ha cambiato i file con 484 aggiunte e 146 eliminazioni
  1. 1 0
      Makefile
  2. 1 1
      README.md
  3. 31 32
      backends/backend.go
  4. 143 20
      backends/gateway.go
  5. 115 0
      backends/gateway_test.go
  6. 2 2
      backends/validate.go
  7. 0 52
      backends/worker.go
  8. 17 5
      cmd/guerrillad/serve.go
  9. 141 17
      cmd/guerrillad/serve_test.go
  10. 19 14
      guerrilla.go
  11. 1 0
      log/log.go
  12. 5 0
      mail/envelope.go
  13. 1 1
      server.go
  14. 7 2
      tests/guerrilla_test.go

+ 1 - 0
Makefile

@@ -29,3 +29,4 @@ test: *.go */*.go */*/*.go
 	$(GO_VARS) $(GO) test -v ./tests
 	$(GO_VARS) $(GO) test -v ./cmd/guerrillad
 	$(GO_VARS) $(GO) test -v ./response
+	$(GO_VARS) $(GO) test -v ./backends

+ 1 - 1
README.md

@@ -190,7 +190,7 @@ func (cb *CustomBackend) Process(e *mail.Envelope) backends.Result {
 ```go
 import "github.com/flashmob/go-guerrilla/log"
 
-mainlog, err := log.GetLogger(log.OutputStderr.String());
+mainlog, err := log.GetLogger(string(log.OutputStderr));
 if  err != nil {
     fmt.Println("Cannot open log:", err)
     os.Exit(1)

+ 31 - 32
backends/backend.go

@@ -12,7 +12,7 @@ import (
 )
 
 var (
-	Svc *Service
+	Svc *service
 
 	// Store the constructor for making an new processor decorator.
 	processors map[string]processorConstructor
@@ -21,8 +21,7 @@ var (
 )
 
 func init() {
-	Svc = &Service{}
-
+	Svc = &service{}
 	processors = make(map[string]processorConstructor)
 }
 
@@ -33,10 +32,18 @@ type processorConstructor func() Decorator
 // Must return an SMTP message (i.e. "250 OK") and a boolean indicating
 // whether the message was processed successfully.
 type Backend interface {
+	// Process processes then saves the mail envelope
 	Process(*mail.Envelope) Result
+	// ValidateRcpt validates the last recipient that was pushed to the mail envelope
 	ValidateRcpt(e *mail.Envelope) RcptError
+	// Initializes the backend, eg. creates folders, sets-up database connections
 	Initialize(BackendConfig) error
+	// Initializes the backend after it was Shutdown()
+	Reinitialize() error
+	// Shutdown frees / closes anything created during initializations
 	Shutdown() error
+	// Start Starts a backend that has been initialized
+	Start() error
 }
 
 type BackendConfig map[string]interface{}
@@ -83,11 +90,11 @@ func NewResult(message string) Result {
 	return result(message)
 }
 
-type ProcessorInitializer interface {
+type processorInitializer interface {
 	Initialize(backendConfig BackendConfig) error
 }
 
-type ProcessorShutdowner interface {
+type processorShutdowner interface {
 	Shutdown() error
 }
 
@@ -122,20 +129,6 @@ func (e Errors) Error() string {
 	return msg
 }
 
-// New makes a new default BackendGateway backend, and initializes it using
-// backendConfig and stores the logger
-func New(backendConfig BackendConfig, l log.Logger) (Backend, error) {
-	Svc.StoreMainlog(l)
-	gateway := &BackendGateway{config: backendConfig}
-	err := gateway.Initialize(backendConfig)
-	if err != nil {
-		return nil, fmt.Errorf("error while initializing the backend: %s", err)
-	}
-	gateway.State = BackendStateRunning
-	b = Backend(gateway)
-	return b, nil
-}
-
 func convertError(name string) error {
 	return fmt.Errorf("failed to load backend config (%s)", name)
 }
@@ -144,9 +137,9 @@ func GetBackend() Backend {
 	return b
 }
 
-type Service struct {
-	initializers []ProcessorInitializer
-	shutdowners  []ProcessorShutdowner
+type service struct {
+	initializers []processorInitializer
+	shutdowners  []processorShutdowner
 	sync.Mutex
 	mainlog atomic.Value
 }
@@ -156,36 +149,42 @@ func Log() log.Logger {
 	if v, ok := Svc.mainlog.Load().(log.Logger); ok {
 		return v
 	}
-	l, _ := log.GetLogger(log.OutputStderr.String())
+	l, _ := log.GetLogger(string(log.OutputStderr))
 	return l
 }
 
-func (s *Service) StoreMainlog(l log.Logger) {
+func (s *service) SetMainlog(l log.Logger) {
 	s.mainlog.Store(l)
 }
 
 // AddInitializer adds a function that implements ProcessorShutdowner to be called when initializing
-func (s *Service) AddInitializer(i ProcessorInitializer) {
+func (s *service) AddInitializer(i processorInitializer) {
 	s.Lock()
 	defer s.Unlock()
 	s.initializers = append(s.initializers, i)
 }
 
 // AddShutdowner adds a function that implements ProcessorShutdowner to be called when shutting down
-func (s *Service) AddShutdowner(sh ProcessorShutdowner) {
+func (s *service) AddShutdowner(sh processorShutdowner) {
 	s.Lock()
 	defer s.Unlock()
 	s.shutdowners = append(s.shutdowners, sh)
 }
 
+// reset clears the initializers and Shutdowners
+func (s *service) reset() {
+	s.shutdowners = make([]processorShutdowner, 0)
+	s.initializers = make([]processorInitializer, 0)
+}
+
 // Initialize initializes all the processors one-by-one and returns any errors.
 // Subsequent calls to Initialize will not call the initializer again unless it failed on the previous call
 // so Initialize may be called again to retry after getting errors
-func (s *Service) initialize(backend BackendConfig) Errors {
+func (s *service) initialize(backend BackendConfig) Errors {
 	s.Lock()
 	defer s.Unlock()
 	var errors Errors
-	failed := make([]ProcessorInitializer, 0)
+	failed := make([]processorInitializer, 0)
 	for i := range s.initializers {
 		if err := s.initializers[i].Initialize(backend); err != nil {
 			errors = append(errors, err)
@@ -200,11 +199,11 @@ func (s *Service) initialize(backend BackendConfig) Errors {
 // Shutdown shuts down all the processors by calling their shutdowners (if any)
 // Subsequent calls to Shutdown will not call the shutdowners again unless it failed on the previous call
 // so Shutdown may be called again to retry after getting errors
-func (s *Service) shutdown() Errors {
+func (s *service) shutdown() Errors {
 	s.Lock()
 	defer s.Unlock()
 	var errors Errors
-	failed := make([]ProcessorShutdowner, 0)
+	failed := make([]processorShutdowner, 0)
 	for i := range s.shutdowners {
 		if err := s.shutdowners[i].Shutdown(); err != nil {
 			errors = append(errors, err)
@@ -218,7 +217,7 @@ func (s *Service) shutdown() Errors {
 // AddProcessor adds a new processor, which becomes available to the backend_config.process_stack option
 // Use to add your own custom processor when using backends as a package, or after importing an external
 // processor.
-func (s *Service) AddProcessor(name string, p processorConstructor) {
+func (s *service) AddProcessor(name string, p processorConstructor) {
 	// wrap in a constructor since we want to defer calling it
 	var c processorConstructor
 	c = func() Decorator {
@@ -234,7 +233,7 @@ func (s *Service) AddProcessor(name string, p processorConstructor) {
 // The reason why using reflection is because we'll get a nice error message if the field is missing
 // the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
 // error messages
-func (s *Service) ExtractConfig(configData BackendConfig, configType BaseConfig) (interface{}, error) {
+func (s *service) ExtractConfig(configData BackendConfig, configType BaseConfig) (interface{}, error) {
 	// Use reflection so that we can provide a nice error message
 	v := reflect.ValueOf(configType).Elem() // so that we can set the values
 	//m := reflect.ValueOf(configType).Elem()

+ 143 - 20
backends/gateway.go

@@ -7,8 +7,10 @@ import (
 	"sync"
 	"time"
 
+	"github.com/flashmob/go-guerrilla/log"
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/response"
+	"runtime/debug"
 	"strings"
 )
 
@@ -23,8 +25,9 @@ type BackendGateway struct {
 	conveyor chan *workerMsg
 
 	// waits for backend workers to start/stop
-	wg sync.WaitGroup
-	w  *Worker
+	wg           sync.WaitGroup
+	workStoppers []chan bool
+	lines        []Processor
 
 	// controls access to state
 	sync.Mutex
@@ -42,28 +45,58 @@ type GatewayConfig struct {
 type workerMsg struct {
 	// The email data
 	e *mail.Envelope
-	// savedNotify is used to notify that the save operation completed
+	// notifyMe is used to notify the gateway of workers finishing their processing
 	notifyMe chan *notifyMsg
 	// select the task type
 	task SelectTask
 }
 
+type backendState int
+
 // possible values for state
 const (
-	BackendStateRunning = iota
+	BackendStateNew backendState = iota
+	BackendStateRunning
 	BackendStateShuttered
 	BackendStateError
+	BackendStateInitialized
 
 	processTimeout   = time.Second * 30
 	defaultProcessor = "Debugger"
 )
 
-type backendState int
-
 func (s backendState) String() string {
+	switch s {
+	case BackendStateNew:
+		return "NewState"
+	case BackendStateRunning:
+		return "RunningState"
+	case BackendStateShuttered:
+		return "ShutteredState"
+	case BackendStateError:
+		return "ErrorSate"
+	case BackendStateInitialized:
+		return "InitializedState"
+	}
 	return strconv.Itoa(int(s))
 }
 
+// New makes a new default BackendGateway backend, and initializes it using
+// backendConfig and stores the logger
+func New(backendConfig BackendConfig, l log.Logger) (Backend, error) {
+	Svc.SetMainlog(l)
+	gateway := &BackendGateway{}
+	err := gateway.Initialize(backendConfig)
+	if err != nil {
+		return nil, fmt.Errorf("error while initializing the backend: %s", err)
+	}
+	// keep the config known to be good.
+	gateway.config = backendConfig
+
+	b = Backend(gateway)
+	return b, nil
+}
+
 // Process distributes an envelope to one of the backend workers
 func (gw *BackendGateway) Process(e *mail.Envelope) Result {
 	if gw.State != BackendStateRunning {
@@ -117,26 +150,33 @@ func (gw *BackendGateway) Shutdown() error {
 	gw.Lock()
 	defer gw.Unlock()
 	if gw.State != BackendStateShuttered {
-		close(gw.conveyor) // workers will stop
+		// send a signal to all workers
+		gw.stopWorkers()
 		// wait for workers to stop
 		gw.wg.Wait()
-		Svc.shutdown()
+		// call shutdown on all processor shutdowners
+		if err := Svc.shutdown(); err != nil {
+			return err
+		}
 		gw.State = BackendStateShuttered
 	}
 	return nil
 }
 
-// Reinitialize starts up a backend gateway that was shutdown before
+// Reinitialize initializes the gateway with the existing config after it was shutdown
 func (gw *BackendGateway) Reinitialize() error {
 	if gw.State != BackendStateShuttered {
 		return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
 	}
+	//
+	Svc.reset()
+
 	err := gw.Initialize(gw.config)
 	if err != nil {
+		fmt.Println("reinitialize to ", gw.config, err)
 		return fmt.Errorf("error while initializing the backend: %s", err)
 	}
 
-	gw.State = BackendStateRunning
 	return err
 }
 
@@ -179,10 +219,13 @@ func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
 	return nil
 }
 
-// Initialize builds the workers and starts each worker in a goroutine
+// Initialize builds the workers and initializes each one
 func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 	gw.Lock()
 	defer gw.Unlock()
+	if gw.State != BackendStateNew && gw.State != BackendStateShuttered {
+		return errors.New("Can only Initialize in BackendStateNew or BackendStateShuttered state")
+	}
 	err := gw.loadConfig(cfg)
 	if err == nil {
 		workersSize := gw.workersSize()
@@ -190,31 +233,57 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 			gw.State = BackendStateError
 			return errors.New("Must have at least 1 worker")
 		}
-		var lines []Processor
+		gw.lines = make([]Processor, 0)
 		for i := 0; i < workersSize; i++ {
 			p, err := gw.newProcessorStack()
 			if err != nil {
+				gw.State = BackendStateError
 				return err
 			}
-			lines = append(lines, p)
+			gw.lines = append(gw.lines, p)
 		}
 		// initialize processors
 		if err := Svc.initialize(cfg); err != nil {
+			gw.State = BackendStateError
 			return err
 		}
-		gw.conveyor = make(chan *workerMsg, workersSize)
-		// start our workers
+		if gw.conveyor == nil {
+			gw.conveyor = make(chan *workerMsg, workersSize)
+		}
+		// ready to start
+		gw.State = BackendStateInitialized
+		return nil
+	}
+	gw.State = BackendStateError
+	return err
+}
+
+// Start starts the worker goroutines, assuming it has been initialized or shuttered before
+func (gw *BackendGateway) Start() error {
+	gw.Lock()
+	defer gw.Unlock()
+	if gw.State == BackendStateInitialized || gw.State == BackendStateShuttered {
+		// we start our workers
+		workersSize := gw.workersSize()
+		// make our slice of channels for stopping
+		gw.workStoppers = make([]chan bool, 0)
+		// set the wait group
 		gw.wg.Add(workersSize)
+
 		for i := 0; i < workersSize; i++ {
-			go func(workerId int) {
-				gw.w.workDispatcher(gw.conveyor, lines[workerId], workerId+1)
+			stop := make(chan bool)
+			go func(workerId int, stop chan bool) {
+				// blocks here until the worker exits
+				gw.workDispatcher(gw.conveyor, gw.lines[workerId], workerId+1, stop)
 				gw.wg.Done()
-			}(i)
+			}(i, stop)
+			gw.workStoppers = append(gw.workStoppers, stop)
 		}
+		gw.State = BackendStateRunning
+		return nil
 	} else {
-		gw.State = BackendStateError
+		return errors.New(fmt.Sprintf("cannot start backend because it's in %s state", gw.State))
 	}
-	return err
 }
 
 // workersSize gets the number of workers to use for saving email by reading the save_workers_size config value
@@ -225,3 +294,57 @@ func (gw *BackendGateway) workersSize() int {
 	}
 	return gw.gwConfig.WorkersSize
 }
+
+func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, workerId int, stop chan bool) {
+
+	defer func() {
+		if r := recover(); r != nil {
+			// recover form closed channel
+			Log().Error("worker recovered form panic:", r, string(debug.Stack()))
+		}
+		// close any connections / files
+		Svc.shutdown()
+
+	}()
+	Log().Infof("processing worker started (#%d)", workerId)
+	for {
+		select {
+		case <-stop:
+			Log().Infof("stop signal for worker (#%d)", workerId)
+			return
+		case msg := <-workIn:
+			if msg == nil {
+				Log().Debugf("worker stopped (#%d)", workerId)
+				return
+			}
+			if msg.task == TaskSaveMail {
+				// process the email here
+				// TODO we should check the err
+				result, _ := p.Process(msg.e, TaskSaveMail)
+				if result.Code() < 300 {
+					// if all good, let the gateway know that it was queued
+					msg.notifyMe <- &notifyMsg{nil, msg.e.QueuedId}
+				} else {
+					// notify the gateway about the error
+					msg.notifyMe <- &notifyMsg{err: errors.New(result.String())}
+				}
+			} else if msg.task == TaskValidateRcpt {
+				_, err := p.Process(msg.e, TaskValidateRcpt)
+				if err != nil {
+					// validation failed
+					msg.notifyMe <- &notifyMsg{err: err}
+				} else {
+					// all good.
+					msg.notifyMe <- &notifyMsg{err: nil}
+				}
+			}
+		}
+	}
+}
+
+// stopWorkers sends a signal to all workers to stop
+func (gw *BackendGateway) stopWorkers() {
+	for i := range gw.workStoppers {
+		gw.workStoppers[i] <- true
+	}
+}

+ 115 - 0
backends/gateway_test.go

@@ -0,0 +1,115 @@
+package backends
+
+import (
+	"fmt"
+	"github.com/derekparker/delve/config"
+	"github.com/flashmob/go-guerrilla/log"
+	"github.com/flashmob/go-guerrilla/mail"
+	"strings"
+	"testing"
+	"time"
+)
+
+func TestStates(t *testing.T) {
+	gw := BackendGateway{}
+	str := fmt.Sprintf("%s", gw.State)
+	if strings.Index(str, "NewState") != 0 {
+		t.Error("Backend should begin in NewState")
+	}
+}
+
+func TestInitialize(t *testing.T) {
+	c := BackendConfig{
+		"process_stack":      "HeadersParser|Debugger",
+		"log_received_mails": true,
+		"save_workers_size":  "1",
+	}
+
+	gateway := &BackendGateway{}
+	err := gateway.Initialize(c)
+	if err != nil {
+		t.Error("Gateway did not init because:", err)
+		t.Fail()
+	}
+	if gateway.lines == nil {
+		t.Error("gateway.lines should not be nil")
+	} else if len(gateway.lines) != 1 {
+		t.Error("len(gateway.lines) should be 1, but got", len(gateway.lines))
+	}
+
+	if gateway.conveyor == nil {
+		t.Error("gateway.conveyor should not be nil")
+	} else if cap(gateway.conveyor) != gateway.workersSize() {
+		t.Error("gateway.conveyor channel buffer cap does not match worker size, cap was", cap(gateway.conveyor))
+	}
+
+	if gateway.State != BackendStateInitialized {
+		t.Error("gateway.State is not in initialized state, got ", gateway.State)
+	}
+
+}
+
+func TestStartProcessStop(t *testing.T) {
+	c := BackendConfig{
+		"process_stack":      "HeadersParser|Debugger",
+		"log_received_mails": true,
+		"save_workers_size":  2,
+	}
+	config.LoadConfig()
+
+	gateway := &BackendGateway{}
+	err := gateway.Initialize(c)
+
+	mainlog, _ := log.GetLogger(string(log.OutputOff))
+	Svc.SetMainlog(mainlog)
+
+	if err != nil {
+		t.Error("Gateway did not init because:", err)
+		t.Fail()
+	}
+	err = gateway.Start()
+	if err != nil {
+		t.Error("Gateway did not start because:", err)
+		t.Fail()
+	}
+	if gateway.State != BackendStateRunning {
+		t.Error("gateway.State is not in rinning state, got ", gateway.State)
+	}
+	// can we place an envelope on the conveyor channel?
+
+	e := &mail.Envelope{
+		RemoteIP: "127.0.0.1",
+		QueuedId: "abc12345",
+		Helo:     "helo.example.com",
+		MailFrom: mail.Address{"test", "example.com"},
+		TLS:      true,
+	}
+	e.PushRcpt(mail.Address{"test", "example.com"})
+	e.Data.WriteString("Subject:Test\n\nThis is a test.")
+	notify := make(chan *notifyMsg)
+
+	gateway.conveyor <- &workerMsg{e, notify, TaskSaveMail}
+
+	// it should not produce any errors
+	// headers (subject) should be parsed.
+
+	select {
+	case status := <-notify:
+
+		if status.err != nil {
+			t.Error("envelope processing failed with:", status.err)
+		}
+		if e.Header["Subject"][0] != "Test" {
+			t.Error("envelope processing did not parse header")
+		}
+
+	case <-time.After(time.Second):
+		t.Error("gateway did not respond after 1 second")
+		t.Fail()
+	}
+
+	err = gateway.Shutdown()
+	if err != nil {
+		t.Error("Gateway did not shutdown")
+	}
+}

+ 2 - 2
backends/validate.go

@@ -7,10 +7,10 @@ import (
 type RcptError error
 
 var (
-	NoSuchUser          = RcptError(errors.New("no such iser"))
+	NoSuchUser          = RcptError(errors.New("no such user"))
 	StorageNotAvailable = RcptError(errors.New("storage not available"))
 	StorageTooBusy      = RcptError(errors.New("stoarge too busy"))
-	StorageTimeout      = RcptError(errors.New("stoarge too busy"))
+	StorageTimeout      = RcptError(errors.New("stoarge timeout"))
 	QuotaExceeded       = RcptError(errors.New("quota exceeded"))
 	UserSuspended       = RcptError(errors.New("user suspended"))
 )

+ 0 - 52
backends/worker.go

@@ -1,52 +0,0 @@
-package backends
-
-import (
-	"errors"
-	"runtime/debug"
-)
-
-type Worker struct{}
-
-func (w *Worker) workDispatcher(workIn chan *workerMsg, p Processor, workerId int) {
-
-	defer func() {
-		if r := recover(); r != nil {
-			// recover form closed channel
-			Log().Error("worker recovered form panic:", r, string(debug.Stack()))
-		}
-		// close any connections / files
-		Svc.shutdown()
-
-	}()
-	Log().Infof("processing worker started (#%d)", workerId)
-	for {
-		select {
-		case msg := <-workIn:
-			if msg == nil {
-				Log().Debugf("worker stopped (#%d)", workerId)
-				return
-			}
-			if msg.task == TaskSaveMail {
-				// process the email here
-				// TODO we should check the err
-				result, _ := p.Process(msg.e, TaskSaveMail)
-				if result.Code() < 300 {
-					// if all good, let the gateway know that it was queued
-					msg.notifyMe <- &notifyMsg{nil, msg.e.QueuedId}
-				} else {
-					// notify the gateway about the error
-					msg.notifyMe <- &notifyMsg{err: errors.New(result.String())}
-				}
-			} else if msg.task == TaskValidateRcpt {
-				_, err := p.Process(msg.e, TaskValidateRcpt)
-				if err != nil {
-					// validation failed
-					msg.notifyMe <- &notifyMsg{err: err}
-				} else {
-					// all good.
-					msg.notifyMe <- &notifyMsg{err: nil}
-				}
-			}
-		}
-	}
-}

+ 17 - 5
cmd/guerrillad/serve.go

@@ -86,7 +86,6 @@ func sigHandler(app guerrilla.Guerrilla) {
 }
 
 func subscribeBackendEvent(event guerrilla.Event, backend backends.Backend, app guerrilla.Guerrilla) {
-
 	app.Subscribe(event, func(cmdConfig *CmdConfig) {
 		logger, _ := log.GetLogger(cmdConfig.LogFile)
 		var err error
@@ -94,14 +93,27 @@ func subscribeBackendEvent(event guerrilla.Event, backend backends.Backend, app
 			logger.WithError(err).Warn("Backend failed to shutdown")
 			return
 		}
-		newBackend, newErr := backends.New(cmdConfig.BackendConfig, logger)
-		if newErr != nil {
-			// this will continue using old backend
+		// init a new backend
+
+		if newBackend, newErr := backends.New(cmdConfig.BackendConfig, logger); newErr != nil {
+			// Revert to old backend config
 			logger.WithError(newErr).Error("Error while loading the backend")
+			err = backend.Reinitialize()
+			if err != nil {
+				logger.WithError(err).Fatal("failed to revert to old backend config")
+				return
+			}
+			err = backend.Start()
+			if err != nil {
+				logger.WithError(err).Fatal("failed to start backend with old config")
+				return
+			}
+			logger.Info("reverted to old backend config")
 		} else {
 			// swap to the bew backend (assuming old backend was shutdown so it can be safely swapped)
+			backend.Start()
 			backend = newBackend
-			logger.Info("Backend started")
+			logger.Info("new backend started")
 		}
 	})
 }

+ 141 - 17
cmd/guerrillad/serve_test.go

@@ -33,6 +33,8 @@ var configJsonA = `
       "guerrillamail.org"
     ],
     "backend_config": {
+    	"save_workers_size" : 1,
+    	"process_stack": "HeadersParser|Debugger",
         "log_received_mails": true
     },
     "servers" : [
@@ -80,6 +82,8 @@ var configJsonB = `
       "guerrillamail.org"
     ],
     "backend_config": {
+    	"save_workers_size" : 1,
+    	"process_stack": "HeadersParser|Debugger",
         "log_received_mails": false
     },
     "servers" : [
@@ -124,7 +128,10 @@ var configJsonC = `
             "redis_interface" : "127.0.0.1:6379",
             "redis_expire_seconds" : 7200,
             "save_workers_size" : 3,
-            "primary_mail_host":"sharklasers.com"
+            "primary_mail_host":"sharklasers.com",
+            "save_workers_size" : 1,
+	    "process_stack": "HeadersParser|Debugger",
+	    "log_received_mails": true
         },
     "servers" : [
         {
@@ -171,6 +178,8 @@ var configJsonD = `
       "guerrillamail.org"
     ],
     "backend_config": {
+        "save_workers_size" : 1,
+    	"process_stack": "HeadersParser|Debugger",
         "log_received_mails": false
     },
     "servers" : [
@@ -204,6 +213,65 @@ var configJsonD = `
 }
 `
 
+// adds 127.0.0.1:4655, a secure server
+var configJsonE = `
+{
+    "log_file" : "../../tests/testlog",
+    "log_level" : "debug",
+    "pid_file" : "./pidfile2.pid",
+    "allowed_hosts": [
+      "guerrillamail.com",
+      "guerrillamailblock.com",
+      "sharklasers.com",
+      "guerrillamail.net",
+      "guerrillamail.org"
+    ],
+    "backend_config" :
+        {
+            "process_stack_old": "HeadersParser|Debugger|Hasher|Header|Compressor|Redis|MySql",
+            "process_stack": "GuerrillaRedisDB",
+            "log_received_mails" : true,
+            "mysql_db":"gmail_mail",
+            "mysql_host":"127.0.0.1:3306",
+            "mysql_pass":"secret",
+            "mysql_user":"root",
+            "mail_table":"new_mail",
+            "redis_interface" : "127.0.0.1:6379",
+             "redis_expire_seconds" : 7200,
+            "save_workers_size" : 3,
+            "primary_mail_host":"sharklasers.com"
+        },
+    "servers" : [
+        {
+            "is_enabled" : true,
+            "host_name":"mail.test.com",
+            "max_size": 1000000,
+            "private_key_file":"../..//tests/mail2.guerrillamail.com.key.pem",
+            "public_key_file":"../../tests/mail2.guerrillamail.com.cert.pem",
+            "timeout":180,
+            "listen_interface":"127.0.0.1:2552",
+            "start_tls_on":true,
+            "tls_always_on":false,
+            "max_clients": 1000,
+            "log_file" : "../../tests/testlog"
+        },
+        {
+            "is_enabled" : true,
+            "host_name":"secure.test.com",
+            "max_size":1000000,
+            "private_key_file":"../..//tests/mail2.guerrillamail.com.key.pem",
+            "public_key_file":"../../tests/mail2.guerrillamail.com.cert.pem",
+            "timeout":180,
+            "listen_interface":"127.0.0.1:4655",
+            "start_tls_on":false,
+            "tls_always_on":true,
+            "max_clients":500,
+            "log_file" : "../../tests/testlog"
+        }
+    ]
+}
+`
+
 const testPauseDuration = time.Millisecond * 600
 
 // reload config
@@ -240,19 +308,25 @@ func sigKill() {
 func TestCmdConfigChangeEvents(t *testing.T) {
 
 	oldconf := &CmdConfig{}
-	oldconf.load([]byte(configJsonA))
+	if err := oldconf.load([]byte(configJsonA)); err != nil {
+		t.Error("configJsonA is invalid", err)
+	}
 
 	newconf := &CmdConfig{}
-	newconf.load([]byte(configJsonB))
+	if err := newconf.load([]byte(configJsonB)); err != nil {
+		t.Error("configJsonB is invalid", err)
+	}
 
 	newerconf := &CmdConfig{}
-	newerconf.load([]byte(configJsonC))
+	if err := newerconf.load([]byte(configJsonC)); err != nil {
+		t.Error("configJsonC is invalid", err)
+	}
 
 	expectedEvents := map[guerrilla.Event]bool{
 		guerrilla.EventConfigBackendConfig: false,
 		guerrilla.EventConfigServerNew:     false,
 	}
-	mainlog, _ = log.GetLogger("off")
+	mainlog, _ = log.GetLogger("../../tests/testlog")
 
 	bcfg := backends.BackendConfig{"log_received_mails": true}
 	backend, err := backends.New(bcfg, mainlog)
@@ -266,19 +340,18 @@ func TestCmdConfigChangeEvents(t *testing.T) {
 	for event := range expectedEvents {
 		// Put in anon func since range is overwriting event
 		func(e guerrilla.Event) {
-
 			if strings.Index(e.String(), "server_change") == 0 {
 				f := func(c *guerrilla.ServerConfig) {
 					expectedEvents[e] = true
 				}
-				app.Subscribe(event, f)
-				toUnsubscribeS[event] = f
+				app.Subscribe(e, f)
+				toUnsubscribeS[e] = f
 			} else {
 				f := func(c *CmdConfig) {
 					expectedEvents[e] = true
 				}
-				app.Subscribe(event, f)
-				toUnsubscribe[event] = f
+				app.Subscribe(e, f)
+				toUnsubscribe[e] = f
 			}
 
 		}(event)
@@ -339,12 +412,7 @@ func TestServe(t *testing.T) {
 	// Would not work on windows as kill is not available.
 	// TODO: Implement an alternative test for windows.
 	if runtime.GOOS != "windows" {
-		ecmd := exec.Command("kill", "-HUP", string(data))
-		_, err = ecmd.Output()
-		if err != nil {
-			t.Error("could not SIGHUP", err)
-			t.FailNow()
-		}
+		sigHup()
 		time.Sleep(testPauseDuration) // allow sighup to do its job
 		// did the pidfile change as expected?
 		if _, err := os.Stat("./pidfile2.pid"); os.IsNotExist(err) {
@@ -362,7 +430,7 @@ func TestServe(t *testing.T) {
 	}
 	if read, err := ioutil.ReadAll(fd); err == nil {
 		logOutput := string(read)
-		if i := strings.Index(logOutput, "Backend started"); i < 0 {
+		if i := strings.Index(logOutput, "new backend started"); i < 0 {
 			t.Error("Dummy backend not restared")
 		}
 	}
@@ -1084,3 +1152,59 @@ func TestDebugLevelChange(t *testing.T) {
 	os.Remove("./pidfile.pid")
 
 }
+
+// When reloading with a bad backend config, it should revert to old backend config
+func TestBadBackendReload(t *testing.T) {
+	testcert.GenerateCert("mail2.guerrillamail.com", "", 365*24*time.Hour, false, 2048, "P256", "../../tests/")
+
+	mainlog, _ = log.GetLogger("../../tests/testlog")
+
+	ioutil.WriteFile("configJsonA.json", []byte(configJsonA), 0644)
+	cmd := &cobra.Command{}
+	configPath = "configJsonA.json"
+	var serveWG sync.WaitGroup
+	serveWG.Add(1)
+	go func() {
+		serve(cmd, []string{})
+		serveWG.Done()
+	}()
+	time.Sleep(testPauseDuration)
+
+	// change the config file to the one with a broken backend
+	ioutil.WriteFile("configJsonA.json", []byte(configJsonE), 0644)
+
+	// test SIGHUP via the kill command
+	// Would not work on windows as kill is not available.
+	// TODO: Implement an alternative test for windows.
+	if runtime.GOOS != "windows" {
+		sigHup()
+		time.Sleep(testPauseDuration) // allow sighup to do its job
+		// did the pidfile change as expected?
+		if _, err := os.Stat("./pidfile2.pid"); os.IsNotExist(err) {
+			t.Error("pidfile not changed after sighup SIGHUP", err)
+		}
+	}
+
+	// send kill signal and wait for exit
+	sigKill()
+	serveWG.Wait()
+
+	// did backend started as expected?
+	fd, err := os.Open("../../tests/testlog")
+	if err != nil {
+		t.Error(err)
+	}
+	if read, err := ioutil.ReadAll(fd); err == nil {
+		logOutput := string(read)
+		if i := strings.Index(logOutput, "reverted to old backend config"); i < 0 {
+			t.Error("did not revert to old backend config")
+		}
+	}
+
+	// cleanup
+	os.Truncate("../../tests/testlog", 0)
+	os.Remove("configJsonA.json")
+	os.Remove("./pidfile.pid")
+	os.Remove("./pidfile2.pid")
+
+}

+ 19 - 14
guerrilla.go

@@ -66,18 +66,18 @@ func (ls *logStore) mainlog() log.Logger {
 }
 
 // storeMainlog stores the log value in an atomic operation
-func (ls *logStore) storeMainlog(log log.Logger) {
+func (ls *logStore) setMainlog(log log.Logger) {
 	ls.Store(log)
 }
 
-// Returns a new instance of Guerrilla with the given config, not yet running.
+// Returns a new instance of Guerrilla with the given config, not yet running. Backend started.
 func New(ac *AppConfig, b backends.Backend, l log.Logger) (Guerrilla, error) {
 	g := &guerrilla{
 		Config:  *ac, // take a local copy
 		servers: make(map[string]*server, len(ac.Servers)),
 		backend: b,
 	}
-	g.storeMainlog(l)
+	g.setMainlog(l)
 
 	if ac.LogLevel != "" {
 		g.mainlog().SetLevel(ac.LogLevel)
@@ -86,6 +86,12 @@ func New(ac *AppConfig, b backends.Backend, l log.Logger) (Guerrilla, error) {
 	g.state = GuerrillaStateNew
 	err := g.makeServers()
 
+	// start backend for processing email
+	err = g.backend.Start()
+	if err != nil {
+		return g, err
+	}
+
 	// subscribe for any events that may come in while running
 	g.subscribeEvents()
 	return g, err
@@ -189,7 +195,7 @@ func (g *guerrilla) subscribeEvents() {
 		var err error
 		var l log.Logger
 		if l, err = log.GetLogger(c.LogFile); err == nil {
-			g.storeMainlog(l)
+			g.setMainlog(l)
 			g.mapServers(func(server *server) {
 				// it will change server's logger when the next client gets accepted
 				server.mainlogStore.Store(l)
@@ -296,8 +302,8 @@ func (g *guerrilla) subscribeEvents() {
 			var err error
 			var l log.Logger
 			if l, err = log.GetLogger(sc.LogFile); err == nil {
-				g.storeMainlog(l)
-				backends.Svc.StoreMainlog(l)
+				g.setMainlog(l)
+				backends.Svc.SetMainlog(l)
 				// it will change to the new logger on the next accepted client
 				server.logStore.Store(l)
 
@@ -335,6 +341,11 @@ func (g *guerrilla) Start() error {
 	if len(g.servers) == 0 {
 		return append(startErrors, errors.New("No servers to start, please check the config"))
 	}
+	if g.state == GuerrillaStateStopped {
+		// when a backend is shutdown, we need to re-initialize before it can be started again
+		g.backend.Reinitialize()
+		g.backend.Start()
+	}
 	// channel for reading errors
 	errs := make(chan error, len(g.servers))
 	var startWG sync.WaitGroup
@@ -369,12 +380,6 @@ func (g *guerrilla) Start() error {
 	}
 	if len(startErrors) > 0 {
 		return startErrors
-	} else {
-		if gw, ok := g.backend.(*backends.BackendGateway); ok {
-			if gw.State == backends.BackendStateShuttered {
-				_ = gw.Reinitialize()
-			}
-		}
 	}
 	return nil
 }
@@ -401,6 +406,6 @@ func (g *guerrilla) Shutdown() {
 // SetLogger sets the logger for the app and propagates it to sub-packages (eg.
 func (g *guerrilla) SetLogger(l log.Logger) {
 	l.SetLevel(g.Config.LogLevel)
-	g.storeMainlog(l)
-	backends.Svc.StoreMainlog(l)
+	g.setMainlog(l)
+	backends.Svc.SetMainlog(l)
 }

+ 1 - 0
log/log.go

@@ -275,6 +275,7 @@ func (hook *LogrusHook) Fire(entry *log.Entry) error {
 		}()
 		// use the plain text hook
 		entry.Logger.Formatter = hook.plainTxtFormatter
+		// todo : `go go test -v -race` detected a race condition, try log.SetFormatter()
 	}
 	if line, err := entry.String(); err == nil {
 		r := strings.NewReader(line)

+ 5 - 0
mail/envelope.go

@@ -108,6 +108,11 @@ func (e *Envelope) ParseHeaders() error {
 	return err
 }
 
+// Len returns the number of bytes that would be in the reader returned by NewReader()
+func (e *Envelope) Len() int {
+	return len(e.DeliveryHeader) + e.Data.Len()
+}
+
 // Returns a new reader for reading the email contents, including the delivery headers
 func (e *Envelope) NewReader() io.Reader {
 	return io.MultiReader(

+ 1 - 1
server.go

@@ -414,7 +414,7 @@ func (server *server) handleClient(client *client) {
 						rcptError := server.backend.ValidateRcpt(client.Envelope)
 						if rcptError != nil {
 							client.PopRcpt()
-							client.sendResponse(response.Canned.FailRcptCmd + rcptError.Error())
+							client.sendResponse(response.Canned.FailRcptCmd + " " + rcptError.Error())
 						} else {
 							client.sendResponse(response.Canned.SuccessRcptCmd)
 						}

+ 7 - 2
tests/guerrilla_test.go

@@ -113,7 +113,12 @@ var configJson = `
 `
 
 func getBackend(backendConfig map[string]interface{}, l log.Logger) (backends.Backend, error) {
-	return backends.New(backendConfig, l)
+	b, err := backends.New(backendConfig, l)
+	if err != nil {
+		fmt.Println("backend init error", err)
+		os.Exit(1)
+	}
+	return b, err
 }
 
 func setupCerts(c *TestConfig) {
@@ -331,7 +336,7 @@ func TestRFC2821LimitRecipients(t *testing.T) {
 			}
 
 			for i := 0; i < 101; i++ {
-				fmt.Println(fmt.Sprintf("RCPT TO:test%[email protected]", i))
+				//fmt.Println(fmt.Sprintf("RCPT TO:test%[email protected]", i))
 				if _, err := Command(conn, bufin, fmt.Sprintf("RCPT TO:test%[email protected]", i)); err != nil {
 					t.Error("RCPT TO", err.Error())
 					break