Browse Source

- add "named stream processors" feature (stream processor can have multiple instances with different configurations).
- cleaner stream processor configuration
- bug fixes

flashmob 5 years ago
parent
commit
4a8a9623d8
8 changed files with 240 additions and 52 deletions
  1. 72 0
      api_test.go
  2. 7 1
      backends/backend.go
  3. 72 13
      backends/config.go
  4. 20 4
      backends/decorate_stream.go
  5. 25 6
      backends/gateway.go
  6. 2 2
      backends/p_debugger.go
  7. 39 4
      backends/s_debug.go
  8. 3 22
      backends/s_transformer.go

+ 72 - 0
api_test.go

@@ -933,6 +933,68 @@ func TestBackendAddRemove(t *testing.T) {
 
 }
 
+func TestStreamProcessorConfig(t *testing.T) {
+	if err := os.Truncate("tests/testlog", 0); err != nil {
+		t.Error(err)
+	}
+
+	servers := []ServerConfig{
+		0: {
+			IsEnabled:       true,
+			Hostname:        "mail.guerrillamail.com",
+			MaxSize:         100017,
+			Timeout:         160,
+			ListenInterface: "127.0.0.1:2526",
+			MaxClients:      2,
+			TLS: ServerTLSConfig{
+				PrivateKeyFile: "",
+				PublicKeyFile:  "",
+				StartTLSOn:     false,
+				AlwaysOn:       false,
+			},
+		},
+	}
+
+	cfg := &AppConfig{
+		LogFile:      "tests/testlog",
+		PidFile:      "tests/go-guerrilla.pid",
+		AllowedHosts: []string{"grr.la", "spam4.me"},
+		BackendConfig: backends.BackendConfig{
+			"stream_procEssoRs": { // note mixed case
+				"chunkSaver": { // note mixed case
+					"chunksaver_chunk_size":     8000,
+					"chunksaver_storage_engine": "memory",
+					"chunksaver_compress_level": 0,
+				},
+				"test:chunksaver": {
+					"chunksaver_chunk_size":     8000,
+					"chunksaver_storage_engine": "memory",
+					"chunksaver_compress_level": 0,
+				},
+				"debug": {
+					"sleep_seconds": 2,
+					"log_reads":     true,
+				},
+			},
+			"gateways": {
+				"default": {
+					"stream_save_process": "mimeanalyzer|chunksaver|test|debug",
+				},
+			},
+		},
+		Servers: servers,
+	}
+
+	d := Daemon{Config: cfg}
+	if err := d.Start(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	d.Shutdown()
+
+}
+
 func TestStreamProcessor(t *testing.T) {
 	if err := os.Truncate("tests/testlog", 0); err != nil {
 		t.Error(err)
@@ -941,6 +1003,11 @@ func TestStreamProcessor(t *testing.T) {
 		LogFile:      "tests/testlog",
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
+			"stream_processors": {
+				"debug": {
+					"log_reads": true,
+				},
+			},
 			"gateways": {
 				"default": {
 					"save_process":        "HeadersParser|Debugger",
@@ -1252,6 +1319,11 @@ func TestStreamMimeProcessor(t *testing.T) {
 		LogFile:      "tests/testlog",
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
+			"stream_processors": {
+				"debug": {
+					"log_reads": true,
+				},
+			},
 			"gateways": {
 				"default": {
 					"save_process":        "HeadersParser|Debugger",

+ 7 - 1
backends/backend.go

@@ -123,6 +123,7 @@ type processorShutdowner interface {
 }
 
 type InitializeWith func(backendConfig BackendConfig) error
+type InitializeStreamWith func(conf ConfigGroup) error
 type ShutdownWith func() error
 
 // Satisfy ProcessorInitializer interface
@@ -132,6 +133,11 @@ func (i InitializeWith) Initialize(backendConfig BackendConfig) error {
 	return i(backendConfig)
 }
 
+func (i InitializeStreamWith) Initialize(conf ConfigGroup) error {
+	// delegate to the anonymous function
+	return i(conf)
+}
+
 // satisfy ProcessorShutdowner interface, same concept as InitializeWith type
 func (s ShutdownWith) Shutdown() error {
 	// delegate
@@ -177,7 +183,7 @@ func (s *service) SetMainlog(l log.Logger) {
 	s.mainlog.Store(l)
 }
 
-// AddInitializer adds a function that implements ProcessorShutdowner to be called when initializing
+// AddInitializer adds a function that implements processorInitializer to be called when initializing
 func (s *service) AddInitializer(i processorInitializer) {
 	s.Lock()
 	defer s.Unlock()

+ 72 - 13
backends/config.go

@@ -52,26 +52,51 @@ func (c *BackendConfig) GetValue(ns configNameSpace, name string, key string) in
 	return nil
 }
 
+// toLower normalizes the backendconfig lowercases the config's keys
+func (c BackendConfig) toLower() {
+	for k, v := range c {
+		var l string
+		if l = strings.ToLower(k); k != l {
+			c[l] = v
+			delete(c, k)
+		}
+		for k2, v2 := range v {
+			if l2 := strings.ToLower(k2); k2 != l2 {
+				c[l][l2] = v2
+				delete(c[l], k)
+			}
+		}
+	}
+}
+
+func (c BackendConfig) group(ns string, name string) *ConfigGroup {
+	if v, ok := c[ns][name]; ok {
+		return &v
+	}
+	return nil
+}
+
 // ConfigureDefaults sets default values for the backend config,
 // if no backend config was added before starting, then use a default config
 // otherwise, see what required values were missed in the config and add any missing with defaults
 func (c *BackendConfig) ConfigureDefaults() error {
 	// set the defaults if no value has been configured
+	// (always use lowercase)
 	if c.GetValue(ConfigGateways, "default", "save_workers_size") == nil {
 		c.SetValue(ConfigGateways, "default", "save_workers_size", 1)
 	}
 	if c.GetValue(ConfigGateways, "default", "save_process") == nil {
 		c.SetValue(ConfigGateways, "default", "save_process", "HeadersParser|Header|Debugger")
 	}
-	if c.GetValue(ConfigProcessors, "default", "primary_mail_host") == nil {
+	if c.GetValue(ConfigProcessors, "header", "primary_mail_host") == nil {
 		h, err := os.Hostname()
 		if err != nil {
 			return err
 		}
-		c.SetValue(ConfigProcessors, "Header", "primary_mail_host", h)
+		c.SetValue(ConfigProcessors, "header", "primary_mail_host", h)
 	}
-	if c.GetValue(ConfigProcessors, "default", "log_received_mails") == nil {
-		c.SetValue(ConfigProcessors, "Debugger", "log_received_mails", true)
+	if c.GetValue(ConfigProcessors, "debugger", "log_received_mails") == nil {
+		c.SetValue(ConfigProcessors, "debugger", "log_received_mails", true)
 	}
 	return nil
 }
@@ -104,6 +129,13 @@ type stackConfigExpression struct {
 	name  string
 }
 
+func (e stackConfigExpression) String() string {
+	if e.alias == e.name || e.alias == "" {
+		return e.name
+	}
+	return fmt.Sprintf("%s:%s", e.alias, e.name)
+}
+
 type notFoundError func(s string) error
 
 type stackConfig struct {
@@ -111,7 +143,29 @@ type stackConfig struct {
 	notFound notFoundError
 }
 
-func NewStackConfig(config string) (ret *stackConfig) {
+type aliasMap map[string]string
+
+// newAliasMap scans through the configured processors to produce a mapping
+// alias -> processor name. This mapping is used to determine what configuration to use
+// when making a new processor
+func newAliasMap(cfg map[string]ConfigGroup) aliasMap {
+	am := make(aliasMap, 0)
+	for k, _ := range cfg {
+		var alias, name string
+		// format: <alias> : <processorName>
+		if i := strings.Index(k, ":"); i > 0 && len(k) > i+2 {
+			alias = k[0:i]
+			name = k[i+1:]
+		} else {
+			alias = k
+			name = k
+		}
+		am[strings.ToLower(alias)] = strings.ToLower(name)
+	}
+	return am
+}
+
+func NewStackConfig(config string, am aliasMap) (ret *stackConfig) {
 	ret = new(stackConfig)
 	cfg := strings.ToLower(strings.TrimSpace(config))
 	if cfg == "" {
@@ -122,21 +176,24 @@ func NewStackConfig(config string) (ret *stackConfig) {
 	pos := 0
 	for i := range items {
 		pos = len(items) - 1 - i // reverse order, since decorators are stacked
-		ret.list[i] = stackConfigExpression{alias: "", name: items[pos]}
+		ret.list[i] = stackConfigExpression{alias: items[pos], name: items[pos]}
+		if processor, ok := am[items[pos]]; ok {
+			ret.list[i].name = processor
+		}
 	}
 	return ret
 }
 
-func newStackProcessorConfig(config string) (ret *stackConfig) {
-	ret = NewStackConfig(config)
+func newStackProcessorConfig(config string, am aliasMap) (ret *stackConfig) {
+	ret = NewStackConfig(config, am)
 	ret.notFound = func(s string) error {
 		return errors.New(fmt.Sprintf("processor [%s] not found", s))
 	}
 	return ret
 }
 
-func newStackStreamProcessorConfig(config string) (ret *stackConfig) {
-	ret = NewStackConfig(config)
+func newStackStreamProcessorConfig(config string, am aliasMap) (ret *stackConfig) {
+	ret = NewStackConfig(config, am)
 	ret.notFound = func(s string) error {
 		return errors.New(fmt.Sprintf("stream processor [%s] not found", s))
 	}
@@ -146,7 +203,6 @@ func newStackStreamProcessorConfig(config string) (ret *stackConfig) {
 // Changes returns a list of gateways whose config changed
 func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed map[string]bool) {
 	// check the processors if changed
-
 	changed = make(map[string]bool, 0)
 	added = make(map[string]bool, 0)
 	removed = make(map[string]bool, 0)
@@ -158,6 +214,8 @@ func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed
 	changedStreamProcessors := changedConfigGroups(
 		oldConfig[csp], c[csp])
 	configType := BaseConfig(&GatewayConfig{})
+	aliasMapStream := newAliasMap(c[csp])
+	aliasMapProcessor := newAliasMap(c[cp])
 	// oldList keeps a track of gateways that have been compared for changes.
 	// We remove the from the list when a gateway was processed
 	// remaining items are assumed to be removed from the new config
@@ -172,13 +230,14 @@ func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed
 		// they changed. If changed, then make a record of which gateways use the processors
 		e, _ := Svc.ExtractConfig(ConfigGateways, key, c, configType)
 		bcfg := e.(*GatewayConfig)
-		config := NewStackConfig(bcfg.SaveProcess)
+		config := NewStackConfig(bcfg.SaveProcess, aliasMapProcessor)
 		for _, v := range config.list {
 			if _, ok := changedProcessors[v.name]; ok {
 				changed[key] = true
 			}
 		}
-		config = NewStackConfig(bcfg.StreamSaveProcess)
+
+		config = NewStackConfig(bcfg.StreamSaveProcess, aliasMapStream)
 		for _, v := range config.list {
 			if _, ok := changedStreamProcessors[v.name]; ok {
 				changed[key] = true

+ 20 - 4
backends/decorate_stream.go

@@ -1,6 +1,7 @@
 package backends
 
 import (
+	"encoding/json"
 	"github.com/flashmob/go-guerrilla/mail"
 )
 
@@ -8,14 +9,29 @@ type streamOpenWith func(e *mail.Envelope) error
 
 type streamCloseWith func() error
 
+type streamConfigureWith func(cfg *ConfigGroup) error
+
 // We define what a decorator to our processor will look like
 // StreamProcessor argument is the underlying processor that we're decorating
 // the additional ...interface argument is not needed, but can be useful for dependency injection
 type StreamDecorator struct {
-	Decorate func(StreamProcessor, ...interface{}) StreamProcessor
-	e        *mail.Envelope
-	Close    streamCloseWith
-	Open     streamOpenWith
+	Decorate  func(StreamProcessor, ...interface{}) StreamProcessor
+	e         *mail.Envelope
+	Close     streamCloseWith
+	Open      streamOpenWith
+	Configure streamConfigureWith
+}
+
+func (s StreamDecorator) ExtractConfig(cfg *ConfigGroup, i interface{}) error {
+	data, err := json.Marshal(cfg)
+	if err != nil {
+		return err
+	}
+	err = json.Unmarshal(data, i)
+	if err != nil {
+		return err
+	}
+	return nil
 }
 
 // DecorateStream will decorate a StreamProcessor with a slice of passed decorators

+ 25 - 6
backends/gateway.go

@@ -111,6 +111,13 @@ func (s *streamer) close() error {
 	return err
 }
 
+func (s *streamer) configure(cfg BackendConfig) error {
+	//for i := range s.d {
+	//s.d[i].Configure(cg)
+	//}
+	return nil
+}
+
 type backendState int
 
 // possible values for state
@@ -153,12 +160,14 @@ func (s backendState) String() string {
 func New(name string, backendConfig BackendConfig, l log.Logger) (Backend, error) {
 	Svc.SetMainlog(l)
 	gateway := &BackendGateway{name: name}
+	backendConfig.toLower()
+	// keep the a copy of the config
+	gateway.config = backendConfig
 	err := gateway.Initialize(backendConfig)
 	if err != nil {
 		return nil, fmt.Errorf("error while initializing the backend: %s", err)
 	}
-	// keep the a copy of the config
-	gateway.config = backendConfig
+
 	return gateway, nil
 }
 
@@ -369,7 +378,7 @@ func (gw *BackendGateway) Reinitialize() error {
 // This function uses the config value save_process or validate_process to figure out which Decorator to use
 func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 	var decorators []Decorator
-	c := newStackProcessorConfig(stackConfig)
+	c := newStackProcessorConfig(stackConfig, newAliasMap(gw.config[ConfigProcessors.String()]))
 	if len(c.list) == 0 {
 		return NoopProcessor{}, nil
 	}
@@ -387,17 +396,27 @@ func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 
 func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 	var decorators []*StreamDecorator
-	c := newStackStreamProcessorConfig(stackConfig)
+	noop := streamer{NoopStreamProcessor{}, decorators}
+	c := newStackStreamProcessorConfig(stackConfig, newAliasMap(gw.config[ConfigStreamProcessors.String()]))
 	if len(c.list) == 0 {
-		return streamer{NoopStreamProcessor{}, decorators}, nil
+		return noop, nil
 	}
 	for i := range c.list {
 		if makeFunc, ok := Streamers[c.list[i].name]; ok {
-			decorators = append(decorators, makeFunc())
+			d := makeFunc()
+			if config := gw.config.group(ConfigStreamProcessors.String(), c.list[i].String()); config != nil {
+				if d.Configure != nil {
+					if err := d.Configure(config); err != nil {
+						return noop, err
+					}
+				}
+			}
+			decorators = append(decorators, d)
 		} else {
 			return streamer{nil, decorators}, c.notFound(c.list[i].name)
 		}
 	}
+	//decorators[0].
 	// build the call-stack of decorators
 	sp, decorators := DecorateStream(&DefaultStreamProcessor{}, decorators)
 	return streamer{sp, decorators}, nil

+ 2 - 2
backends/p_debugger.go

@@ -12,6 +12,8 @@ import (
 // Description   : Log received emails
 // ----------------------------------------------------------------------------------
 // Config Options: log_received_mails bool - log if true
+//               : sleep_seconds - how many seconds to pause for, useful to force a
+//               : timeout. If sleep_seconds is 1 then a panic will be induced
 // --------------:-------------------------------------------------------------------
 // Input         : e.MailFrom, e.RcptTo, e.Header
 // ----------------------------------------------------------------------------------
@@ -49,7 +51,6 @@ func Debugger() Decorator {
 					Log().Info("Headers are:", e.Header)
 					Log().Info("Body:", e.Data.String())
 				}
-
 				if config.SleepSec > 0 {
 					Log().Infof("sleeping for %d", config.SleepSec)
 					time.Sleep(time.Second * time.Duration(config.SleepSec))
@@ -59,7 +60,6 @@ func Debugger() Decorator {
 						panic("panic on purpose")
 					}
 				}
-
 				// continue to the next Processor in the decorator stack
 				return p.Process(e, task)
 			} else {

+ 39 - 4
backends/s_debug.go

@@ -3,27 +3,62 @@ package backends
 import (
 	"fmt"
 	"github.com/flashmob/go-guerrilla/mail"
+	"time"
 )
 
+// ----------------------------------------------------------------------------------
+// Processor Name: debugger
+// ----------------------------------------------------------------------------------
+// Description   : Log received emails
+// ----------------------------------------------------------------------------------
+// Config Options: log_reads bool - log if true
+//               : sleep_seconds - how many seconds to pause for, useful to force a
+//               : timeout. If sleep_seconds is 1 then a panic will be induced
+// --------------:-------------------------------------------------------------------
+// Input         : email envelope
+// ----------------------------------------------------------------------------------
+// Output        : none (only output to the log if enabled)
+// ----------------------------------------------------------------------------------
+
 func init() {
 	Streamers["debug"] = func() *StreamDecorator {
 		return StreamDebug()
 	}
 }
 
+type streamDebuggerConfig struct {
+	LogReads bool `json:"log_reads"`
+	SleepSec int  `json:"sleep_seconds,omitempty"`
+}
+
 func StreamDebug() *StreamDecorator {
+
 	sd := &StreamDecorator{}
-	sd.Decorate =
+	var config streamDebuggerConfig
+	sd.Configure = func(cfg *ConfigGroup) error {
+		return sd.ExtractConfig(cfg, &config)
+	}
 
+	sd.Decorate =
 		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
-
 			sd.Open = func(e *mail.Envelope) error {
 				return nil
 			}
 			return StreamProcessWith(func(p []byte) (int, error) {
 				str := string(p)
-				fmt.Print(str)
-				Log().WithField("p", string(p)).Info("Debug stream")
+				if config.LogReads {
+					fmt.Print(str)
+					Log().WithField("p", string(p)).Info("Debug stream")
+				}
+				if config.SleepSec > 0 {
+					Log().Infof("sleeping for %d", config.SleepSec)
+					time.Sleep(time.Second * time.Duration(config.SleepSec))
+					Log().Infof("woke up")
+
+					if config.SleepSec == 1 {
+						panic("panic on purpose")
+					}
+				}
 				return sp.Write(p)
 			})
 		}

+ 3 - 22
backends/s_transformer.go

@@ -32,11 +32,6 @@ func init() {
 	}
 }
 
-type TransformerConfig struct {
-	// we can add any config here
-
-}
-
 // Transform stream processor: convert an email to UTF-8
 type Transform struct {
 	sp                  io.Writer
@@ -76,7 +71,7 @@ func (t *Transform) unswap() {
 // regexpCharset captures the charset value
 var regexpCharset = regexp.MustCompile("(?i)charset=\"?(.+)\"?") // (?i) is a flag for case-insensitive
 
-func (t *Transform) ReWrite(b []byte, last bool, offset uint) (count int, err error) {
+func (t *Transform) ReWrite(b []byte, last bool) (count int, err error) {
 	defer func() {
 		count = len(b)
 	}()
@@ -221,20 +216,6 @@ func (t *Transform) Reset() {
 
 func Transformer() *StreamDecorator {
 
-	var conf *TransformerConfig
-
-	Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
-		configType := BaseConfig(&TransformerConfig{})
-		bcfg, err := Svc.ExtractConfig(
-			ConfigStreamProcessors, "transformer", backendConfig, configType)
-		if err != nil {
-			return err
-		}
-		conf = bcfg.(*TransformerConfig)
-		_ = conf
-		return nil
-	}))
-
 	var (
 		msgPos   uint
 		progress int
@@ -278,7 +259,7 @@ func Transformer() *StreamDecorator {
 				var err error
 				var count int
 
-				count, err = reWriter.ReWrite(p[pos:start-offset], true, offset)
+				count, err = reWriter.ReWrite(p[pos:start-offset], true)
 
 				written += count
 				if err != nil {
@@ -329,7 +310,7 @@ func Transformer() *StreamDecorator {
 
 						// if on the latest (last) part, and yet there is still data to be written out
 						if len(*parts)-1 == i && len(p)-1 > pos {
-							count, err = reWriter.ReWrite(p[pos:], false, offset)
+							count, err = reWriter.ReWrite(p[pos:], false)
 
 							written += count
 							if err != nil {