Bladeren bron

- got rid of abstract.go backend
- processor lines can configured via config and built during initialization
- backend gateway manages creating of processor lines and starting / stopping workers that use these lines
- TODO: lots of cleanup, change to allow multiple rcpt to, more comments, test backend shutdown/restart on config reload

flashmob 8 jaren geleden
bovenliggende
commit
9127bc0ba1

+ 0 - 179
backends/abstract.go

@@ -1,179 +0,0 @@
-package backends
-
-import (
-	"errors"
-	"fmt"
-	"github.com/flashmob/go-guerrilla/envelope"
-	"reflect"
-	"runtime/debug"
-	"strings"
-)
-
-type AbstractBackend struct {
-	config abstractConfig
-	Extend Worker
-	p      Processor
-}
-
-type abstractConfig struct {
-	LogReceivedMails bool `json:"log_received_mails"`
-}
-
-var ab AbstractBackend
-
-// Your backend should implement this method and set b.config field with a custom config struct
-// Therefore, your implementation would have your own custom config type instead of dummyConfig
-func (b *AbstractBackend) loadConfig(backendConfig BackendConfig) (err error) {
-	// Load the backend config for the backend. It has already been unmarshalled
-	// from the main config file 'backend' config "backend_config"
-	// Now we need to convert each type and copy into the dummyConfig struct
-	configType := baseConfig(&abstractConfig{})
-	bcfg, err := b.extractConfig(backendConfig, configType)
-	if err != nil {
-		return err
-	}
-	m := bcfg.(*abstractConfig)
-	b.config = *m
-
-	return nil
-}
-
-func (b *AbstractBackend) SetProcessors(p ...Decorator) {
-	if b.Extend != nil {
-		b.Extend.SetProcessors(p...)
-		return
-	}
-	b.p = Decorate(DefaultProcessor{}, p...)
-}
-
-func (b *AbstractBackend) Initialize(config BackendConfig) error {
-
-	Service.Initialize(config)
-
-	return nil
-
-	// TODO delete
-	if b.Extend != nil {
-		return b.Extend.loadConfig(config)
-	}
-	err := b.loadConfig(config)
-	if err != nil {
-		return err
-	}
-	return nil
-}
-
-func (b *AbstractBackend) Shutdown() error {
-	if b.Extend != nil {
-		return b.Extend.Shutdown()
-	}
-	return nil
-}
-
-func (b *AbstractBackend) Process(mail *envelope.Envelope) BackendResult {
-	if b.Extend != nil {
-		return b.Extend.Process(mail)
-	}
-	// call the decorated process function
-	b.p.Process(mail)
-	return NewBackendResult("250 OK")
-}
-
-func (b *AbstractBackend) saveMailWorker(saveMailChan chan *savePayload) {
-	if b.Extend != nil {
-		b.Extend.saveMailWorker(saveMailChan)
-		return
-	}
-	defer func() {
-		if r := recover(); r != nil {
-			// recover form closed channel
-			fmt.Println("Recovered in f", r, string(debug.Stack()))
-			mainlog.Error("Recovered form panic:", r, string(debug.Stack()))
-		}
-		// close any connections / files
-		// ...
-
-	}()
-	for {
-		payload := <-saveMailChan
-		if payload == nil {
-			mainlog.Debug("No more saveMailChan payload")
-			return
-		}
-		// process the email here
-		result := b.Process(payload.mail)
-		// if all good
-		if result.Code() < 300 {
-			payload.savedNotify <- &saveStatus{nil, "s0m3l337Ha5hva1u3LOL"}
-		} else {
-			payload.savedNotify <- &saveStatus{errors.New(result.String()), "s0m3l337Ha5hva1u3LOL"}
-		}
-
-	}
-}
-
-func (b *AbstractBackend) getNumberOfWorkers() int {
-	if b.Extend != nil {
-		return b.Extend.getNumberOfWorkers()
-	}
-	return 1
-}
-
-func (b *AbstractBackend) testSettings() error {
-	if b.Extend != nil {
-		return b.Extend.testSettings()
-	}
-	return nil
-}
-
-// Load the backend config for the backend. It has already been unmarshalled
-// from the main config file 'backend' config "backend_config"
-// Now we need to convert each type and copy into the guerrillaDBAndRedisConfig struct
-// The reason why using reflection is because we'll get a nice error message if the field is missing
-// the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
-// error messages
-func (h *AbstractBackend) extractConfig(configData BackendConfig, configType baseConfig) (interface{}, error) {
-	// Use reflection so that we can provide a nice error message
-	s := reflect.ValueOf(configType).Elem() // so that we can set the values
-	m := reflect.ValueOf(configType).Elem()
-	t := reflect.TypeOf(configType).Elem()
-	typeOfT := s.Type()
-
-	for i := 0; i < m.NumField(); i++ {
-		f := s.Field(i)
-		// read the tags of the config struct
-		field_name := t.Field(i).Tag.Get("json")
-		if len(field_name) > 0 {
-			// parse the tag to
-			// get the field name from struct tag
-			split := strings.Split(field_name, ",")
-			field_name = split[0]
-		} else {
-			// could have no tag
-			// so use the reflected field name
-			field_name = typeOfT.Field(i).Name
-		}
-		if f.Type().Name() == "int" {
-			if intVal, converted := configData[field_name].(float64); converted {
-				s.Field(i).SetInt(int64(intVal))
-			} else {
-				return configType, convertError("property missing/invalid: '" + field_name + "' of expected type: " + f.Type().Name())
-			}
-		}
-		if f.Type().Name() == "string" {
-			if stringVal, converted := configData[field_name].(string); converted {
-				s.Field(i).SetString(stringVal)
-			} else {
-				return configType, convertError("missing/invalid: '" + field_name + "' of type: " + f.Type().Name())
-			}
-		}
-		if f.Type().Name() == "bool" {
-			if boolVal, converted := configData[field_name].(bool); converted {
-				s.Field(i).SetBool(boolVal)
-			} else {
-				return configType, convertError("missing/invalid: '" + field_name + "' of type: " + f.Type().Name())
-			}
-		}
-	}
-	return configType, nil
-}

+ 65 - 6
backends/backend.go

@@ -4,16 +4,23 @@ import (
 	"fmt"
 	"github.com/flashmob/go-guerrilla/envelope"
 	"github.com/flashmob/go-guerrilla/log"
+	"reflect"
 	"strconv"
 	"strings"
 )
 
-var mainlog log.Logger
-
-var Service *BackendService
+var (
+	mainlog log.Logger
+	Service *BackendService
+	// deprecated backends system
+	backends = map[string]Backend{}
+	// new backends system
+	Processors map[string]ProcessorConstructor
+)
 
 func init() {
 	Service = &BackendService{}
+	Processors = make(map[string]ProcessorConstructor)
 }
 
 // Backends process received mail. Depending on the implementation, they can store mail in the database,
@@ -27,13 +34,13 @@ type Backend interface {
 	Shutdown() error
 }
 
+/*
 type Worker interface {
 	// start save mail worker(s)
 	saveMailWorker(chan *savePayload)
 	// get the number of workers that will be stared
 	getNumberOfWorkers() int
 	// test database settings, permissions, correct paths, etc, before starting workers
-	testSettings() error
 	// parse the configuration files
 	loadConfig(BackendConfig) error
 
@@ -43,10 +50,10 @@ type Worker interface {
 
 	SetProcessors(p ...Decorator)
 }
-
+*/
 type BackendConfig map[string]interface{}
 
-var backends = map[string]Worker{}
+type ProcessorConstructor func() Decorator
 
 type baseConfig interface{}
 
@@ -148,3 +155,55 @@ func (b *BackendService) Shutdown() {
 		b.Shutdowners[i].Shutdown()
 	}
 }
+
+// extractConfig loads the backend config. It has already been unmarshalled
+// configData contains data from the main config file's "backend_config" value
+// configType is a Processor's specific config value.
+// The reason why using reflection is because we'll get a nice error message if the field is missing
+// the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
+// error messages
+func (b *BackendService) extractConfig(configData BackendConfig, configType baseConfig) (interface{}, error) {
+	// Use reflection so that we can provide a nice error message
+	s := reflect.ValueOf(configType).Elem() // so that we can set the values
+	m := reflect.ValueOf(configType).Elem()
+	t := reflect.TypeOf(configType).Elem()
+	typeOfT := s.Type()
+
+	for i := 0; i < m.NumField(); i++ {
+		f := s.Field(i)
+		// read the tags of the config struct
+		field_name := t.Field(i).Tag.Get("json")
+		if len(field_name) > 0 {
+			// parse the tag to
+			// get the field name from struct tag
+			split := strings.Split(field_name, ",")
+			field_name = split[0]
+		} else {
+			// could have no tag
+			// so use the reflected field name
+			field_name = typeOfT.Field(i).Name
+		}
+		if f.Type().Name() == "int" {
+			if intVal, converted := configData[field_name].(float64); converted {
+				s.Field(i).SetInt(int64(intVal))
+			} else {
+				return configType, convertError("property missing/invalid: '" + field_name + "' of expected type: " + f.Type().Name())
+			}
+		}
+		if f.Type().Name() == "string" {
+			if stringVal, converted := configData[field_name].(string); converted {
+				s.Field(i).SetString(stringVal)
+			} else {
+				return configType, convertError("missing/invalid: '" + field_name + "' of type: " + f.Type().Name())
+			}
+		}
+		if f.Type().Name() == "bool" {
+			if boolVal, converted := configData[field_name].(bool); converted {
+				s.Field(i).SetBool(boolVal)
+			} else {
+				return configType, convertError("missing/invalid: '" + field_name + "' of type: " + f.Type().Name())
+			}
+		}
+	}
+	return configType, nil
+}

+ 0 - 35
backends/dummy.go

@@ -1,36 +1 @@
 package backends
-
-func init() {
-	backends["dummy"] = &AbstractBackend{}
-	backends["dummy"].SetProcessors(
-		MySql(), Redis(), Compressor(), Header(), Hasher(), Debugger(), HeadersParser())
-}
-
-// custom configuration we will parse from the json
-// see guerrillaDBAndRedisConfig struct for a more complete example
-type dummyConfig struct {
-	LogReceivedMails bool `json:"log_received_mails"`
-}
-
-// putting all the paces we need together
-type DummyBackend struct {
-	config dummyConfig
-	// embed functions form AbstractBackend so that DummyBackend satisfies the Backend interface
-	AbstractBackend
-}
-
-// Backends should implement this method and set b.config field with a custom config struct
-// Therefore, your implementation would have a custom config type instead of dummyConfig
-func (b *DummyBackend) loadConfig(backendConfig BackendConfig) (err error) {
-	// Load the backend config for the backend. It has already been unmarshalled
-	// from the main config file 'backend' config "backend_config"
-	// Now we need to convert each type and copy into the dummyConfig struct
-	configType := baseConfig(&dummyConfig{})
-	bcfg, err := b.extractConfig(backendConfig, configType)
-	if err != nil {
-		return err
-	}
-	m := bcfg.(*dummyConfig)
-	b.config = *m
-	return nil
-}

+ 61 - 21
backends/gateway.go

@@ -10,6 +10,7 @@ import (
 	"github.com/flashmob/go-guerrilla/envelope"
 	"github.com/flashmob/go-guerrilla/log"
 	"github.com/flashmob/go-guerrilla/response"
+	"strings"
 )
 
 // A backend gateway is a proxy that implements the Backend interface.
@@ -17,15 +18,21 @@ import (
 // via a channel. Shutting down via Shutdown() will stop all workers.
 // The rest of this program always talks to the backend via this gateway.
 type BackendGateway struct {
-	AbstractBackend
 	saveMailChan chan *savePayload
 	// waits for backend workers to start/stop
 	wg sync.WaitGroup
-	b  Worker
+	w  *Worker
+	b  Backend
 	// controls access to state
 	stateGuard sync.Mutex
 	State      backendState
 	config     BackendConfig
+	gwConfig   *GatewayConfig
+}
+
+type GatewayConfig struct {
+	WorkersSize   int    `json:"save_workers_size,omitempty"`
+	ProcessorLine string `json:"process_line,omitempty"`
 }
 
 // possible values for state
@@ -44,17 +51,17 @@ func (s backendState) String() string {
 // New retrieve a backend specified by the backendName, and initialize it using
 // backendConfig
 func New(backendName string, backendConfig BackendConfig, l log.Logger) (Backend, error) {
-	backend, found := backends[backendName]
 	mainlog = l
-	if !found {
-		return nil, fmt.Errorf("backend %q not found", backendName)
+	gateway := &BackendGateway{config: backendConfig}
+	if backend, found := backends[backendName]; found {
+		gateway.b = backend
 	}
-	gateway := &BackendGateway{b: backend, config: backendConfig}
 	err := gateway.Initialize(backendConfig)
 	if err != nil {
 		return nil, fmt.Errorf("error while initializing the backend: %s", err)
 	}
 	gateway.State = BackendStateRunning
+
 	return gateway, nil
 }
 
@@ -84,13 +91,10 @@ func (gw *BackendGateway) Shutdown() error {
 	gw.stateGuard.Lock()
 	defer gw.stateGuard.Unlock()
 	if gw.State != BackendStateShuttered {
-		err := gw.b.Shutdown()
-		if err == nil {
-			close(gw.saveMailChan) // workers will stop
-			gw.wg.Wait()
-			gw.State = BackendStateShuttered
-		}
-		return err
+		close(gw.saveMailChan) // workers will stop
+		gw.wg.Wait()
+		gw.State = BackendStateShuttered
+		Service.Shutdown()
 	}
 	return nil
 }
@@ -108,29 +112,65 @@ func (gw *BackendGateway) Reinitialize() error {
 	return err
 }
 
+func (gw *BackendGateway) newProcessorLine() Processor {
+	var decorators []Decorator
+	if len(gw.gwConfig.ProcessorLine) == 0 {
+		return nil
+	}
+	line := strings.Split(strings.ToLower(gw.gwConfig.ProcessorLine), "|")
+	for i := range line {
+		name := line[len(line)-1-i] // reverse order, since decorators are stacked
+		if makeFunc, ok := Processors[name]; ok {
+			decorators = append(decorators, makeFunc())
+		}
+	}
+	p := Decorate(DefaultProcessor{}, decorators...)
+	return p
+}
+
+func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
+	configType := baseConfig(&GatewayConfig{})
+	bcfg, err := Service.extractConfig(cfg, configType)
+	if err != nil {
+		return err
+	}
+	gw.gwConfig = bcfg.(*GatewayConfig)
+	return nil
+}
+
 func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
-	err := gw.b.Initialize(cfg)
+	err := gw.loadConfig(cfg)
 	if err == nil {
-		workersSize := gw.b.getNumberOfWorkers()
+		workersSize := gw.getNumberOfWorkers()
 		if workersSize < 1 {
 			gw.State = BackendStateError
 			return errors.New("Must have at least 1 worker")
 		}
-		if err := gw.b.testSettings(); err != nil {
-			gw.State = BackendStateError
-			return err
+		var lines []Processor
+		for i := 0; i < workersSize; i++ {
+			lines = append(lines, gw.newProcessorLine())
 		}
+		// initialize processors
+		Service.Initialize(cfg)
 		gw.saveMailChan = make(chan *savePayload, workersSize)
 		// start our savemail workers
 		gw.wg.Add(workersSize)
 		for i := 0; i < workersSize; i++ {
-			go func() {
-				gw.b.saveMailWorker(gw.saveMailChan)
+			go func(workerId int) {
+				gw.w.saveMailWorker(gw.saveMailChan, lines[workerId], workerId+1)
 				gw.wg.Done()
-			}()
+			}(i)
 		}
+
 	} else {
 		gw.State = BackendStateError
 	}
 	return err
 }
+
+func (gw *BackendGateway) getNumberOfWorkers() int {
+	if gw.gwConfig.WorkersSize == 0 {
+		return 1
+	}
+	return gw.gwConfig.WorkersSize
+}

+ 24 - 4
backends/guerrilla_db_redis.go

@@ -33,6 +33,7 @@ import (
 	"database/sql"
 	_ "github.com/go-sql-driver/mysql"
 
+	"github.com/flashmob/go-guerrilla/envelope"
 	"github.com/go-sql-driver/mysql"
 	"io"
 	"runtime/debug"
@@ -47,12 +48,10 @@ const GuerrillaDBAndRedisBatchMax = 2
 const GuerrillaDBAndRedisBatchTimeout = time.Second * 3
 
 func init() {
-	backends["guerrilla-db-redis"] = &AbstractBackend{
-		Extend: &GuerrillaDBAndRedisBackend{}}
+	backends["guerrilla-db-redis"] = &GuerrillaDBAndRedisBackend{}
 }
 
 type GuerrillaDBAndRedisBackend struct {
-	AbstractBackend
 	config    guerrillaDBAndRedisConfig
 	batcherWg sync.WaitGroup
 	// cache prepared queries
@@ -81,9 +80,10 @@ func convertError(name string) error {
 // Load the backend config for the backend. It has already been unmarshalled
 // from the main config file 'backend' config "backend_config"
 // Now we need to convert each type and copy into the guerrillaDBAndRedisConfig struct
+
 func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig BackendConfig) (err error) {
 	configType := baseConfig(&guerrillaDBAndRedisConfig{})
-	bcfg, err := g.extractConfig(backendConfig, configType)
+	bcfg, err := Service.extractConfig(backendConfig, configType)
 	if err != nil {
 		return err
 	}
@@ -446,3 +446,23 @@ func (g *GuerrillaDBAndRedisBackend) testSettings() (err error) {
 
 	return
 }
+
+func (g *GuerrillaDBAndRedisBackend) Initialize(config BackendConfig) error {
+	err := g.loadConfig(config)
+	if err != nil {
+		return err
+	}
+	return nil
+
+}
+
+// does nothing
+func (g *GuerrillaDBAndRedisBackend) Process(mail *envelope.Envelope) BackendResult {
+	return NewBackendResult("250 OK")
+}
+
+// does nothing
+func (g *GuerrillaDBAndRedisBackend) Shutdown() error {
+
+	return nil
+}

+ 33 - 10
backends/p_compressor.go

@@ -8,16 +8,43 @@ import (
 	"sync"
 )
 
+func init() {
+
+	// ----------------------------------------------------------------------------------
+	// Processor Name: compressor
+	// ----------------------------------------------------------------------------------
+	// Description   : Compress the e.Data (email data) and e.DeliveryHeader together
+	// ----------------------------------------------------------------------------------
+	// Config Options: None
+	// --------------:-------------------------------------------------------------------
+	// Input         : e.Data, e.DeliveryHeader generated by Header() processor
+	// ----------------------------------------------------------------------------------
+	// Output        : sets the pointer to a compressor in e.Info["zlib-compressor"]
+	//               : to write the compressed data, simply use fmt to print as a string,
+	//               : eg. fmt.Println("%s", e.Info["zlib-compressor"])
+	//               : or just call the String() func .Info["zlib-compressor"].String()
+	//               : Note that it can only be outputted once. It destroys the buffer
+	//               : after being printed
+	// ----------------------------------------------------------------------------------
+
+	Processors["compressor"] = func() Decorator {
+		return Compressor()
+	}
+}
+
 // compressedData struct will be compressed using zlib when printed via fmt
 type compressor struct {
 	extraHeaders []byte
 	data         *bytes.Buffer
-	pool         *sync.Pool
+	// the pool is used to recycle buffers to ease up on the garbage collector
+	pool *sync.Pool
 }
 
 // newCompressedData returns a new CompressedData
 func newCompressor() *compressor {
+	// grab it from the pool
 	var p = sync.Pool{
+		// if not available, then create a new one
 		New: func() interface{} {
 			var b bytes.Buffer
 			return &b
@@ -34,7 +61,9 @@ func (c *compressor) set(b []byte, d *bytes.Buffer) {
 	c.data = d
 }
 
-// implement Stringer interface
+// String implements the Stringer interface.
+// Can only be called once!
+// This is because the compression buffer will be reset and compressor will be returned to the pool
 func (c *compressor) String() string {
 	if c.data == nil {
 		return ""
@@ -62,19 +91,13 @@ func (c *compressor) clear() {
 	c.data = nil
 }
 
-// The hasher decorator computes a hash of the email for each recipient
-// It appends the hashes to envelope's Hashes slice.
 func Compressor() Decorator {
 	return func(c Processor) Processor {
 		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
-
 			compressor := newCompressor()
 			compressor.set([]byte(e.DeliveryHeader), &e.Data)
-			if e.Meta == nil {
-				e.Meta = make(map[string]interface{})
-			}
-			e.Meta["zlib-compressor"] = compressor
-
+			// put the pinter in there for other processors to use later in the line
+			e.Info["zlib-compressor"] = compressor
 			return c.Process(e)
 		})
 	}

+ 19 - 2
backends/p_debugger.go

@@ -4,6 +4,23 @@ import (
 	"github.com/flashmob/go-guerrilla/envelope"
 )
 
+func init() {
+	// ----------------------------------------------------------------------------------
+	// Processor Name: debugger
+	// ----------------------------------------------------------------------------------
+	// Description   : Log received emails
+	// ----------------------------------------------------------------------------------
+	// Config Options: log_received_mails bool - log if true
+	// --------------:-------------------------------------------------------------------
+	// Input         : e.MailFrom, e.RcptTo, e.Header
+	// ----------------------------------------------------------------------------------
+	// Output        : none (only output to the log if enabled)
+	// ----------------------------------------------------------------------------------
+	Processors["debugger"] = func() Decorator {
+		return Debugger()
+	}
+}
+
 type debuggerConfig struct {
 	LogReceivedMails bool `json:"log_received_mails"`
 }
@@ -12,7 +29,7 @@ func Debugger() Decorator {
 	var config *debuggerConfig
 	initFunc := Initialize(func(backendConfig BackendConfig) error {
 		configType := baseConfig(&debuggerConfig{})
-		bcfg, err := ab.extractConfig(backendConfig, configType)
+		bcfg, err := Service.extractConfig(backendConfig, configType)
 		if err != nil {
 			return err
 		}
@@ -24,7 +41,7 @@ func Debugger() Decorator {
 		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
 			if config.LogReceivedMails {
 				mainlog.Infof("Mail from: %s / to: %v", e.MailFrom.String(), e.RcptTo)
-				mainlog.Info("So, Headers are:", e.Header)
+				mainlog.Info("Headers are:", e.Header)
 			}
 			// continue to the next Processor in the decorator chain
 			return c.Process(e)

+ 19 - 0
backends/p_hasher.go

@@ -9,6 +9,25 @@ import (
 	"time"
 )
 
+func init() {
+	// ----------------------------------------------------------------------------------
+	// Processor Name: hasher
+	// ----------------------------------------------------------------------------------
+	// Description   : Generates a unique md5 checksum id for an email
+	// ----------------------------------------------------------------------------------
+	// Config Options: None
+	// --------------:-------------------------------------------------------------------
+	// Input         : e.MailFrom, e.Subject, e.RcptTo
+	//               : assuming e.Subject was generated by "headersparser" processor
+	// ----------------------------------------------------------------------------------
+	// Output        : Checksum stored in e.Hash
+	// ----------------------------------------------------------------------------------
+
+	Processors["hasher"] = func() Decorator {
+		return Hasher()
+	}
+}
+
 // The hasher decorator computes a hash of the email for each recipient
 // It appends the hashes to envelope's Hashes slice.
 func Hasher() Decorator {

+ 7 - 1
backends/p_header.go

@@ -10,6 +10,12 @@ type HeaderConfig struct {
 	PrimaryHost string `json:"primary_mail_host"`
 }
 
+func init() {
+	Processors["header"] = func() Decorator {
+		return Header()
+	}
+}
+
 // Generate the MTA delivery header
 // Sets e.DeliveryHeader part of the envelope with the generated header
 func Header() Decorator {
@@ -18,7 +24,7 @@ func Header() Decorator {
 
 	Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
 		configType := baseConfig(&HeaderConfig{})
-		bcfg, err := ab.extractConfig(backendConfig, configType)
+		bcfg, err := Service.extractConfig(backendConfig, configType)
 		if err != nil {
 			return err
 		}

+ 6 - 0
backends/p_headers_parser.go

@@ -4,6 +4,12 @@ import (
 	"github.com/flashmob/go-guerrilla/envelope"
 )
 
+func init() {
+	Processors["headersparser"] = func() Decorator {
+		return HeadersParser()
+	}
+}
+
 func HeadersParser() Decorator {
 	return func(c Processor) Processor {
 		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {

+ 14 - 10
backends/p_mysql.go

@@ -11,6 +11,12 @@ import (
 	"runtime/debug"
 )
 
+func init() {
+	Processors["mysql"] = func() Decorator {
+		return MySql()
+	}
+}
+
 type MysqlProcessorConfig struct {
 	NumberOfWorkers    int    `json:"save_workers_size"`
 	MysqlTable         string `json:"mail_table"`
@@ -112,7 +118,7 @@ func MySql() Decorator {
 
 	Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
 		configType := baseConfig(&MysqlProcessorConfig{})
-		bcfg, err := ab.extractConfig(backendConfig, configType)
+		bcfg, err := Service.extractConfig(backendConfig, configType)
 		if err != nil {
 			return err
 		}
@@ -145,15 +151,13 @@ func MySql() Decorator {
 
 			var co *compressor
 			// a compressor was set
-			if e.Meta != nil {
-				if c, ok := e.Meta["zlib-compressor"]; ok {
-					body = "gzip"
-					co = c.(*compressor)
-				}
-				// was saved in redis
-				if _, ok := e.Meta["redis"]; ok {
-					body = "redis"
-				}
+			if c, ok := e.Info["zlib-compressor"]; ok {
+				body = "gzip"
+				co = c.(*compressor)
+			}
+			// was saved in redis
+			if _, ok := e.Info["redis"]; ok {
+				body = "redis"
 			}
 
 			// build the values for the query

+ 10 - 8
backends/p_redis.go

@@ -9,6 +9,12 @@ import (
 	"github.com/garyburd/redigo/redis"
 )
 
+func init() {
+	Processors["redis"] = func() Decorator {
+		return Redis()
+	}
+}
+
 type RedisProcessorConfig struct {
 	RedisExpireSeconds int    `json:"redis_expire_seconds"`
 	RedisInterface     string `json:"redis_interface"`
@@ -40,7 +46,7 @@ func Redis() Decorator {
 	// read the config into RedisProcessorConfig
 	Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
 		configType := baseConfig(&RedisProcessorConfig{})
-		bcfg, err := ab.extractConfig(backendConfig, configType)
+		bcfg, err := Service.extractConfig(backendConfig, configType)
 		if err != nil {
 			return err
 		}
@@ -69,12 +75,8 @@ func Redis() Decorator {
 
 				var cData *compressor
 				// a compressor was set
-				if e.Meta != nil {
-					if c, ok := e.Meta["zlib-compressor"]; ok {
-						cData = c.(*compressor)
-					}
-				} else {
-					e.Meta = make(map[string]interface{})
+				if c, ok := e.Info["zlib-compressor"]; ok {
+					cData = c.(*compressor)
 				}
 
 				redisErr = redisClient.redisConnection(config.RedisInterface)
@@ -98,7 +100,7 @@ func Redis() Decorator {
 					result := NewBackendResult(response.Canned.FailBackendTransaction)
 					return result, redisErr
 				} else {
-					e.Meta["redis"] = "redis" // the backend system will know to look in redis for the message data
+					e.Info["redis"] = "redis" // the backend system will know to look in redis for the message data
 				}
 			} else {
 				mainlog.Error("Redis needs a Hash() process before it")

+ 41 - 0
backends/worker.go

@@ -0,0 +1,41 @@
+package backends
+
+import (
+	"errors"
+	"fmt"
+	"runtime/debug"
+)
+
+type Worker struct {
+}
+
+func (w *Worker) saveMailWorker(saveMailChan chan *savePayload, p Processor, workerId int) {
+
+	defer func() {
+		if r := recover(); r != nil {
+			// recover form closed channel
+			fmt.Println("Recovered in f", r, string(debug.Stack()))
+			mainlog.Error("Recovered form panic:", r, string(debug.Stack()))
+		}
+		// close any connections / files
+		Service.Shutdown()
+
+	}()
+	mainlog.Infof("Save mail worker started (#%d)", workerId)
+	for {
+		payload := <-saveMailChan
+		if payload == nil {
+			mainlog.Debug("No more saveMailChan payload")
+			return
+		}
+		// process the email here
+		result, _ := p.Process(payload.mail)
+		// if all good
+		if result.Code() < 300 {
+			payload.savedNotify <- &saveStatus{nil, payload.mail.Hashes[0]}
+		} else {
+			payload.savedNotify <- &saveStatus{errors.New(result.String()), ""}
+		}
+
+	}
+}

+ 3 - 4
client.go

@@ -53,16 +53,15 @@ type client struct {
 // Allocate a new client
 func NewClient(conn net.Conn, clientID uint64, logger log.Logger) *client {
 	c := &client{
-		conn: conn,
-		Envelope: &envelope.Envelope{
-			RemoteAddress: getRemoteAddr(conn),
-		},
+		conn:        conn,
+		Envelope:    envelope.NewEnvelope(getRemoteAddr(conn)),
 		ConnectedAt: time.Now(),
 		bufin:       newSMTPBufferedReader(conn),
 		bufout:      bufio.NewWriter(conn),
 		ID:          clientID,
 		log:         logger,
 	}
+
 	// used for reading the DATA state
 	c.smtpReader = textproto.NewReader(c.bufin.Reader)
 	return c

+ 3 - 2
cmd/guerrillad/serve.go

@@ -167,8 +167,9 @@ func serve(cmd *cobra.Command, args []string) {
 // the the command line interface.
 type CmdConfig struct {
 	guerrilla.AppConfig
-	BackendName   string                 `json:"backend_name"`
-	BackendConfig backends.BackendConfig `json:"backend_config"`
+	BackendName        string                 `json:"backend_name"`
+	BackendConfig      backends.BackendConfig `json:"backend_config"`
+	BackendProcessLine string                 `json:"backend_process_line"`
 }
 
 func (c *CmdConfig) load(jsonBytes []byte) error {

+ 9 - 2
envelope/envelope.go

@@ -46,14 +46,21 @@ type Envelope struct {
 	TLS bool
 	// Header stores the results from ParseHeaders()
 	Header textproto.MIMEHeader
-	// Hold the metadat
-	Meta map[string]interface{}
+	// Hold the information generated when processing the envelope by the backend
+	Info map[string]interface{}
 	// Hashes of each email on the rcpt
 	Hashes []string
 	//
 	DeliveryHeader string
 }
 
+func NewEnvelope(remoteAddr string) *Envelope {
+	return &Envelope{
+		RemoteAddress: remoteAddr,
+		Info:          make(map[string]interface{}),
+	}
+}
+
 // ParseHeaders parses the headers into Header field of the Envelope struct.
 // Data buffer must be full before calling.
 // It assumes that at most 30kb of email data can be a header