Browse Source

- backend config struct uses typed sections
- update some log messages to use structured logging
- update tests to be aware of the new log format

flashmob 5 years ago
parent
commit
40185d0b4d

+ 18 - 18
api_test.go

@@ -102,7 +102,7 @@ func TestSMTPCustomBackend(t *testing.T) {
 	}
 	}
 	cfg.Servers = append(cfg.Servers, sc)
 	cfg.Servers = append(cfg.Servers, sc)
 	bcfg := backends.BackendConfig{
 	bcfg := backends.BackendConfig{
-		"processors": {
+		backends.ConfigProcessors: {
 			"debugger": {
 			"debugger": {
 				"log_received_mails": true,
 				"log_received_mails": true,
 			},
 			},
@@ -110,7 +110,7 @@ func TestSMTPCustomBackend(t *testing.T) {
 				"primary_mail_host": "example.com",
 				"primary_mail_host": "example.com",
 			},
 			},
 		},
 		},
-		"gateways": {
+		backends.ConfigGateways: {
 			"default": {
 			"default": {
 				"save_workers_size": 3,
 				"save_workers_size": 3,
 				"save_process":      "HeadersParser|Header|Hasher|Debugger",
 				"save_process":      "HeadersParser|Header|Hasher|Debugger",
@@ -479,7 +479,7 @@ func TestSetAddProcessor(t *testing.T) {
 		LogFile:      "tests/testlog",
 		LogFile:      "tests/testlog",
 		AllowedHosts: []string{"grr.la"},
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
-			"gateways": {
+			backends.ConfigGateways: {
 				"default": {
 				"default": {
 					"save_process":     "HeadersParser|Debugger|FunkyLogger",
 					"save_process":     "HeadersParser|Debugger|FunkyLogger",
 					"validate_process": "FunkyLogger",
 					"validate_process": "FunkyLogger",
@@ -625,7 +625,7 @@ func TestReloadConfig(t *testing.T) {
 		LogFile:      "tests/testlog",
 		LogFile:      "tests/testlog",
 		AllowedHosts: []string{"grr.la"},
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
-			"gateways": {
+			backends.ConfigGateways: {
 				"default": {
 				"default": {
 					"save_process":     "HeadersParser|Debugger|FunkyLogger",
 					"save_process":     "HeadersParser|Debugger|FunkyLogger",
 					"validate_process": "FunkyLogger",
 					"validate_process": "FunkyLogger",
@@ -660,7 +660,7 @@ func TestPubSubAPI(t *testing.T) {
 		LogFile:      "tests/testlog",
 		LogFile:      "tests/testlog",
 		AllowedHosts: []string{"grr.la"},
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
-			"gateways": {
+			backends.ConfigGateways: {
 				"default": {
 				"default": {
 					"save_process":     "HeadersParser|Debugger|FunkyLogger",
 					"save_process":     "HeadersParser|Debugger|FunkyLogger",
 					"validate_process": "FunkyLogger",
 					"validate_process": "FunkyLogger",
@@ -801,7 +801,7 @@ func TestCustomBackendResult(t *testing.T) {
 		LogFile:      "tests/testlog",
 		LogFile:      "tests/testlog",
 		AllowedHosts: []string{"grr.la"},
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
-			"gateways": {
+			backends.ConfigGateways: {
 				"default": {
 				"default": {
 					"save_process":     "HeadersParser|Debugger|Custom",
 					"save_process":     "HeadersParser|Debugger|Custom",
 					"validate_process": "Custom",
 					"validate_process": "Custom",
@@ -869,7 +869,7 @@ func TestBackendAddRemove(t *testing.T) {
 		PidFile:      "tests/go-guerrilla.pid",
 		PidFile:      "tests/go-guerrilla.pid",
 		AllowedHosts: []string{"grr.la", "spam4.me"},
 		AllowedHosts: []string{"grr.la", "spam4.me"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
-			"gateways": {
+			backends.ConfigGateways: {
 				"default": {
 				"default": {
 					"save_process":     "HeadersParser|Debugger|Custom",
 					"save_process":     "HeadersParser|Debugger|Custom",
 					"validate_process": "Custom",
 					"validate_process": "Custom",
@@ -893,7 +893,7 @@ func TestBackendAddRemove(t *testing.T) {
 
 
 	cfg2 := *cfg
 	cfg2 := *cfg
 	cfg2.BackendConfig = backends.BackendConfig{
 	cfg2.BackendConfig = backends.BackendConfig{
-		"gateways": {
+		backends.ConfigGateways: {
 			"client1": {
 			"client1": {
 				"save_process":     "HeadersParser|Debugger|Custom",
 				"save_process":     "HeadersParser|Debugger|Custom",
 				"validate_process": "Custom",
 				"validate_process": "Custom",
@@ -960,7 +960,7 @@ func TestStreamProcessorConfig(t *testing.T) {
 		PidFile:      "tests/go-guerrilla.pid",
 		PidFile:      "tests/go-guerrilla.pid",
 		AllowedHosts: []string{"grr.la", "spam4.me"},
 		AllowedHosts: []string{"grr.la", "spam4.me"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
-			"stream_procEssoRs": { // note mixed case
+			backends.ConfigStreamProcessors: {
 				"chunkSaver": { // note mixed case
 				"chunkSaver": { // note mixed case
 					"chunk_size":     8000,
 					"chunk_size":     8000,
 					"storage_engine": "memory",
 					"storage_engine": "memory",
@@ -976,7 +976,7 @@ func TestStreamProcessorConfig(t *testing.T) {
 					"log_reads":     true,
 					"log_reads":     true,
 				},
 				},
 			},
 			},
-			"gateways": {
+			backends.ConfigGateways: {
 				"default": {
 				"default": {
 					"save_stream": "mimeanalyzer|chunksaver|test|debug",
 					"save_stream": "mimeanalyzer|chunksaver|test|debug",
 				},
 				},
@@ -1003,12 +1003,12 @@ func TestStreamProcessor(t *testing.T) {
 		LogFile:      "tests/testlog",
 		LogFile:      "tests/testlog",
 		AllowedHosts: []string{"grr.la"},
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
-			"stream_processors": {
+			backends.ConfigStreamProcessors: {
 				"debug": {
 				"debug": {
 					"log_reads": true,
 					"log_reads": true,
 				},
 				},
 			},
 			},
-			"gateways": {
+			backends.ConfigGateways: {
 				"default": {
 				"default": {
 					"save_process": "HeadersParser|Debugger",
 					"save_process": "HeadersParser|Debugger",
 					"save_stream":  "Header|headersparser|compress|Decompress|debug",
 					"save_stream":  "Header|headersparser|compress|Decompress|debug",
@@ -1071,7 +1071,7 @@ func TestStreamProcessorBackground(t *testing.T) {
 		LogFile:      "tests/testlog",
 		LogFile:      "tests/testlog",
 		AllowedHosts: []string{"grr.la"},
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
-			"stream_processors": {
+			backends.ConfigStreamProcessors: {
 				"debug": {
 				"debug": {
 					"log_reads": true,
 					"log_reads": true,
 				},
 				},
@@ -1081,7 +1081,7 @@ func TestStreamProcessorBackground(t *testing.T) {
 					"compress_level": 0,
 					"compress_level": 0,
 				},
 				},
 			},
 			},
-			"gateways": {
+			backends.ConfigGateways: {
 				"default": {
 				"default": {
 					"save_process":          "",
 					"save_process":          "",
 					"save_stream":           "mimeanalyzer|moo",
 					"save_stream":           "mimeanalyzer|moo",
@@ -1403,12 +1403,12 @@ func TestStreamMimeProcessor(t *testing.T) {
 		LogFile:      "tests/testlog",
 		LogFile:      "tests/testlog",
 		AllowedHosts: []string{"grr.la"},
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
-			"stream_processors": {
+			backends.ConfigStreamProcessors: {
 				"debug": {
 				"debug": {
 					"log_reads": true,
 					"log_reads": true,
 				},
 				},
 			},
 			},
-			"gateways": {
+			backends.ConfigGateways: {
 				"default": {
 				"default": {
 					"save_process": "HeadersParser|Debugger",
 					"save_process": "HeadersParser|Debugger",
 					"save_stream":  "mimeanalyzer|headersparser|compress|Decompress|debug",
 					"save_stream":  "mimeanalyzer|headersparser|compress|Decompress|debug",
@@ -1552,14 +1552,14 @@ func TestStreamChunkSaver(t *testing.T) {
 		LogFile:      "tests/testlog",
 		LogFile:      "tests/testlog",
 		AllowedHosts: []string{"grr.la"},
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
-			"stream_processors": {
+			backends.ConfigStreamProcessors: {
 				"chunksaver": {
 				"chunksaver": {
 					"chunk_size":         1024 * 32,
 					"chunk_size":         1024 * 32,
 					"stream_buffer_size": 1024 * 16,
 					"stream_buffer_size": 1024 * 16,
 					"storage_engine":     "memory",
 					"storage_engine":     "memory",
 				},
 				},
 			},
 			},
-			"gateways": {
+			backends.ConfigGateways: {
 				"default": {
 				"default": {
 					"save_process": "HeadersParser|Debugger",
 					"save_process": "HeadersParser|Debugger",
 					"save_stream":  "mimeanalyzer|chunksaver",
 					"save_stream":  "mimeanalyzer|chunksaver",

+ 2 - 2
backends/backend.go

@@ -264,11 +264,11 @@ func (s *service) AddStreamProcessor(name string, p StreamProcessorConstructor)
 // The reason why using reflection is because we'll get a nice error message if the field is missing
 // The reason why using reflection is because we'll get a nice error message if the field is missing
 // the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
 // the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
 // error messages
 // error messages
-func (s *service) ExtractConfig(ns configNameSpace, group string, cfg BackendConfig, configType BaseConfig) (interface{}, error) {
+func (s *service) ExtractConfig(section ConfigSection, group string, cfg BackendConfig, configType BaseConfig) (interface{}, error) {
 	group = strings.ToLower(group)
 	group = strings.ToLower(group)
 
 
 	var configData ConfigGroup
 	var configData ConfigGroup
-	if v, ok := cfg[ns.String()][group]; ok {
+	if v, ok := cfg[section][group]; ok {
 		configData = v
 		configData = v
 	} else {
 	} else {
 		return configType, nil
 		return configType, nil

+ 83 - 42
backends/config.go

@@ -1,6 +1,7 @@
 package backends
 package backends
 
 
 import (
 import (
+	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
 	"os"
 	"os"
@@ -11,19 +12,7 @@ import (
 
 
 type ConfigGroup map[string]interface{}
 type ConfigGroup map[string]interface{}
 
 
-type BackendConfig map[string]map[string]ConfigGroup
-
-/*
-
-TODO change to thus
-
-type BackendConfig struct {
-	Processors map[string]ConfigGroup `json:"processors,omitempty"`
-	StreamProcessors map[string]ConfigGroup `json:"stream_processors,omitempty"`
-	Gateways map[string]ConfigGroup `json:"gateways,omitempty"`
-}
-
-*/
+type BackendConfig map[ConfigSection]map[string]ConfigGroup
 
 
 const (
 const (
 	validateRcptTimeout = time.Second * 5
 	validateRcptTimeout = time.Second * 5
@@ -47,29 +36,27 @@ const (
 	configPostProcessSize     = 64
 	configPostProcessSize     = 64
 )
 )
 
 
-func (c *BackendConfig) SetValue(ns configNameSpace, name string, key string, value interface{}) {
-	nsKey := ns.String()
+func (c *BackendConfig) SetValue(section ConfigSection, name string, key string, value interface{}) {
 	if *c == nil {
 	if *c == nil {
 		*c = make(BackendConfig, 0)
 		*c = make(BackendConfig, 0)
 	}
 	}
-	if (*c)[nsKey] == nil {
-		(*c)[nsKey] = make(map[string]ConfigGroup)
+	if (*c)[section] == nil {
+		(*c)[section] = make(map[string]ConfigGroup)
 	}
 	}
-	if (*c)[nsKey][name] == nil {
-		(*c)[nsKey][name] = make(ConfigGroup)
+	if (*c)[section][name] == nil {
+		(*c)[section][name] = make(ConfigGroup)
 	}
 	}
-	(*c)[nsKey][name][key] = value
+	(*c)[section][name][key] = value
 }
 }
 
 
-func (c *BackendConfig) GetValue(ns configNameSpace, name string, key string) interface{} {
-	nsKey := ns.String()
-	if (*c)[nsKey] == nil {
+func (c *BackendConfig) GetValue(section ConfigSection, name string, key string) interface{} {
+	if (*c)[section] == nil {
 		return nil
 		return nil
 	}
 	}
-	if (*c)[nsKey][name] == nil {
+	if (*c)[section][name] == nil {
 		return nil
 		return nil
 	}
 	}
-	if v, ok := (*c)[nsKey][name][key]; ok {
+	if v, ok := (*c)[section][name][key]; ok {
 		return &v
 		return &v
 	}
 	}
 	return nil
 	return nil
@@ -77,23 +64,18 @@ func (c *BackendConfig) GetValue(ns configNameSpace, name string, key string) in
 
 
 // toLower normalizes the backendconfig lowercases the config's keys
 // toLower normalizes the backendconfig lowercases the config's keys
 func (c BackendConfig) toLower() {
 func (c BackendConfig) toLower() {
-	for k, v := range c {
-		var l string
-		if l = strings.ToLower(k); k != l {
-			c[l] = v
-			delete(c, k)
-		}
+	for section, v := range c {
 		for k2, v2 := range v {
 		for k2, v2 := range v {
-			if l2 := strings.ToLower(k2); k2 != l2 {
-				c[l][l2] = v2
-				delete(c[l], k)
+			if k2_lower := strings.ToLower(k2); k2 != k2_lower {
+				c[section][k2_lower] = v2
+				delete(c[section], k2) // delete the non-lowercased key
 			}
 			}
 		}
 		}
 	}
 	}
 }
 }
 
 
-func (c BackendConfig) lookupGroup(ns string, name string) ConfigGroup {
-	if v, ok := c[ns][name]; ok {
+func (c BackendConfig) lookupGroup(section ConfigSection, name string) ConfigGroup {
+	if v, ok := c[section][name]; ok {
 		return v
 		return v
 	}
 	}
 	return nil
 	return nil
@@ -124,15 +106,50 @@ func (c *BackendConfig) ConfigureDefaults() error {
 	return nil
 	return nil
 }
 }
 
 
-type configNameSpace int
+// UnmarshalJSON custom handling of the ConfigSection keys (they're enumerated)
+func (c *BackendConfig) UnmarshalJSON(b []byte) error {
+	temp := make(map[string]map[string]ConfigGroup)
+	err := json.Unmarshal(b, &temp)
+	if err != nil {
+		return err
+	}
+	if *c == nil {
+		*c = make(BackendConfig)
+	}
+	for key, val := range temp {
+		// map the key to a ConfigSection type
+		var section ConfigSection
+		if err := json.Unmarshal([]byte("\""+key+"\""), &section); err != nil {
+			return err
+		}
+		if (*c)[section] == nil {
+			(*c)[section] = make(map[string]ConfigGroup)
+		}
+		(*c)[section] = val
+	}
+	return nil
+
+}
+
+// MarshalJSON custom handling of ConfigSection keys (since JSON keys need to be strings)
+func (c *BackendConfig) MarshalJSON() ([]byte, error) {
+	temp := make(map[string]map[string]ConfigGroup)
+	for key, val := range *c {
+		// convert they key to a string
+		temp[key.String()] = val
+	}
+	return json.Marshal(temp)
+}
+
+type ConfigSection int
 
 
 const (
 const (
-	ConfigProcessors configNameSpace = iota
+	ConfigProcessors ConfigSection = iota
 	ConfigStreamProcessors
 	ConfigStreamProcessors
 	ConfigGateways
 	ConfigGateways
 )
 )
 
 
-func (o configNameSpace) String() string {
+func (o ConfigSection) String() string {
 	switch o {
 	switch o {
 	case ConfigProcessors:
 	case ConfigProcessors:
 		return "processors"
 		return "processors"
@@ -144,6 +161,30 @@ func (o configNameSpace) String() string {
 	return "unknown"
 	return "unknown"
 }
 }
 
 
+func (o *ConfigSection) UnmarshalJSON(b []byte) error {
+	str := strings.Trim(string(b), `"`)
+	str = strings.ToLower(str)
+	switch {
+	case str == "processors":
+		*o = ConfigProcessors
+	case str == "stream_processors":
+		*o = ConfigStreamProcessors
+	case str == "gateways":
+		*o = ConfigGateways
+	default:
+		return errors.New("incorrect config section [" + str + "], may be processors, stream_processors or gateways")
+	}
+	return nil
+}
+
+func (o *ConfigSection) MarshalJSON() ([]byte, error) {
+	ret := o.String()
+	if ret == "unknown" {
+		return []byte{}, errors.New("unknown config section")
+	}
+	return []byte(ret), nil
+}
+
 // All config structs extend from this
 // All config structs extend from this
 type BaseConfig interface{}
 type BaseConfig interface{}
 
 
@@ -229,9 +270,9 @@ func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed
 	changed = make(map[string]bool, 0)
 	changed = make(map[string]bool, 0)
 	added = make(map[string]bool, 0)
 	added = make(map[string]bool, 0)
 	removed = make(map[string]bool, 0)
 	removed = make(map[string]bool, 0)
-	cp := ConfigProcessors.String()
-	csp := ConfigStreamProcessors.String()
-	cg := ConfigGateways.String()
+	cp := ConfigProcessors
+	csp := ConfigStreamProcessors
+	cg := ConfigGateways
 	changedProcessors := changedConfigGroups(
 	changedProcessors := changedConfigGroups(
 		oldConfig[cp], c[cp])
 		oldConfig[cp], c[cp])
 	changedStreamProcessors := changedConfigGroups(
 	changedStreamProcessors := changedConfigGroups(

+ 13 - 14
backends/gateway.go

@@ -38,7 +38,7 @@ type BackendGateway struct {
 
 
 	producer *StreamDecorator
 	producer *StreamDecorator
 
 
-	decoratorLookup map[string]map[string]*StreamDecorator
+	decoratorLookup map[ConfigSection]map[string]*StreamDecorator
 
 
 	workerID int
 	workerID int
 
 
@@ -281,13 +281,13 @@ func (gw *BackendGateway) StreamOn() bool {
 
 
 // newStreamDecorator creates a new StreamDecorator and calls Configure with its corresponding configuration
 // newStreamDecorator creates a new StreamDecorator and calls Configure with its corresponding configuration
 // cs - the item of 'list' property, result from newStackStreamProcessorConfig()
 // cs - the item of 'list' property, result from newStackStreamProcessorConfig()
-// ns - typically the result of calling ConfigStreamProcessors.String()
-func (gw *BackendGateway) newStreamDecorator(cs stackConfigExpression, ns string) *StreamDecorator {
+// section - which section of the config
+func (gw *BackendGateway) newStreamDecorator(cs stackConfigExpression, section ConfigSection) *StreamDecorator {
 	if makeFunc, ok := Streamers[cs.name]; !ok {
 	if makeFunc, ok := Streamers[cs.name]; !ok {
 		return nil
 		return nil
 	} else {
 	} else {
 		d := makeFunc()
 		d := makeFunc()
-		config := gw.config.lookupGroup(ns, cs.String())
+		config := gw.config.lookupGroup(section, cs.String())
 		if config == nil {
 		if config == nil {
 			config = ConfigGroup{}
 			config = ConfigGroup{}
 		}
 		}
@@ -472,7 +472,7 @@ func (gw *BackendGateway) Reinitialize() error {
 // This function uses the config value save_process or validate_process to figure out which Decorator to use
 // This function uses the config value save_process or validate_process to figure out which Decorator to use
 func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 	var decorators []Decorator
 	var decorators []Decorator
-	c := newStackProcessorConfig(stackConfig, newAliasMap(gw.config[ConfigProcessors.String()]))
+	c := newStackProcessorConfig(stackConfig, newAliasMap(gw.config[ConfigProcessors]))
 	if len(c.list) == 0 {
 	if len(c.list) == 0 {
 		return NoopProcessor{}, nil
 		return NoopProcessor{}, nil
 	}
 	}
@@ -492,7 +492,7 @@ func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 	var decorators []*StreamDecorator
 	var decorators []*StreamDecorator
 
 
 	noop := streamer{NoopStreamProcessor{}, decorators}
 	noop := streamer{NoopStreamProcessor{}, decorators}
-	groupName := ConfigStreamProcessors.String()
+	groupName := ConfigStreamProcessors
 	c := newStackStreamProcessorConfig(stackConfig, newAliasMap(gw.config[groupName]))
 	c := newStackStreamProcessorConfig(stackConfig, newAliasMap(gw.config[groupName]))
 	if len(c.list) == 0 {
 	if len(c.list) == 0 {
 		return noop, nil
 		return noop, nil
@@ -522,7 +522,7 @@ func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
 	if gw.name == "" {
 	if gw.name == "" {
 		gw.name = DefaultGateway
 		gw.name = DefaultGateway
 	}
 	}
-	if _, ok := cfg["gateways"][gw.name]; !ok {
+	if _, ok := cfg[ConfigGateways][gw.name]; !ok {
 		return errors.New("no such gateway configured: " + gw.name)
 		return errors.New("no such gateway configured: " + gw.name)
 	}
 	}
 	bcfg, err := Svc.ExtractConfig(ConfigGateways, gw.name, cfg, configType)
 	bcfg, err := Svc.ExtractConfig(ConfigGateways, gw.name, cfg, configType)
@@ -546,7 +546,7 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 		return err
 		return err
 	}
 	}
 	gw.buffers = make(map[int][]byte) // individual buffers are made later
 	gw.buffers = make(map[int][]byte) // individual buffers are made later
-	gw.decoratorLookup = make(map[string]map[string]*StreamDecorator)
+	gw.decoratorLookup = make(map[ConfigSection]map[string]*StreamDecorator)
 	gw.processors = make([]Processor, 0)
 	gw.processors = make([]Processor, 0)
 	gw.validators = make([]ValidatingProcessor, 0)
 	gw.validators = make([]ValidatingProcessor, 0)
 	gw.streamers = make([]streamer, 0)
 	gw.streamers = make([]streamer, 0)
@@ -761,25 +761,24 @@ func (gw *BackendGateway) initProducer() error {
 	if gw.gwConfig.PostProcessProducer == "" {
 	if gw.gwConfig.PostProcessProducer == "" {
 		return notValid
 		return notValid
 	}
 	}
-	ns := ConfigStreamProcessors.String()
-	m := newAliasMap(gw.config[ns])
+	section := ConfigStreamProcessors // which section of the config (stream_processors)
+	m := newAliasMap(gw.config[section])
 	c := newStackStreamProcessorConfig(gw.gwConfig.PostProcessProducer, m)
 	c := newStackStreamProcessorConfig(gw.gwConfig.PostProcessProducer, m)
 	if len(c.list) == 0 {
 	if len(c.list) == 0 {
 		return notValid
 		return notValid
 	}
 	}
 	// check it there's already an instance of it
 	// check it there's already an instance of it
-	if gw.decoratorLookup[ns] != nil {
-		if v, ok := gw.decoratorLookup[ns][c.list[0].String()]; ok {
+	if gw.decoratorLookup[section] != nil {
+		if v, ok := gw.decoratorLookup[section][c.list[0].String()]; ok {
 			gw.producer = v
 			gw.producer = v
 			return nil
 			return nil
 		}
 		}
 	}
 	}
-	if d := gw.newStreamDecorator(c.list[0], ns); d != nil {
+	if d := gw.newStreamDecorator(c.list[0], section); d != nil {
 		// use a new instance
 		// use a new instance
 		gw.producer = d
 		gw.producer = d
 		return nil
 		return nil
 	} else {
 	} else {
 		return errors.New("please check gateway config [post_process_producer]")
 		return errors.New("please check gateway config [post_process_producer]")
 	}
 	}
-	return notValid
 }
 }

+ 4 - 4
backends/gateway_test.go

@@ -22,12 +22,12 @@ func TestStates(t *testing.T) {
 
 
 func TestInitialize(t *testing.T) {
 func TestInitialize(t *testing.T) {
 	c := BackendConfig{
 	c := BackendConfig{
-		"processors": {
+		ConfigProcessors: {
 			"Debugger": {
 			"Debugger": {
 				"log_received_mails": true,
 				"log_received_mails": true,
 			},
 			},
 		},
 		},
-		"gateways": {
+		ConfigGateways: {
 			"default": {
 			"default": {
 				"save_process":      "HeadersParser|Debugger",
 				"save_process":      "HeadersParser|Debugger",
 				"save_workers_size": "1",
 				"save_workers_size": "1",
@@ -61,12 +61,12 @@ func TestInitialize(t *testing.T) {
 
 
 func TestStartProcessStop(t *testing.T) {
 func TestStartProcessStop(t *testing.T) {
 	c := BackendConfig{
 	c := BackendConfig{
-		"processors": {
+		ConfigProcessors: {
 			"Debugger": {
 			"Debugger": {
 				"log_received_mails": true,
 				"log_received_mails": true,
 			},
 			},
 		},
 		},
-		"gateways": {
+		ConfigGateways: {
 			"default": {
 			"default": {
 				"save_process":      "HeadersParser|Debugger",
 				"save_process":      "HeadersParser|Debugger",
 				"save_workers_size": "2",
 				"save_workers_size": "2",

+ 3 - 3
backends/p_redis_test.go

@@ -11,18 +11,18 @@ import (
 
 
 func TestRedisGeneric(t *testing.T) {
 func TestRedisGeneric(t *testing.T) {
 
 
-	e := mail.NewEnvelope("127.0.0.1", 1, "127.0.0.1:2525")
+	e := mail.NewEnvelope("127.0.0.1", 1, 10)
 	e.RcptTo = append(e.RcptTo, mail.Address{User: "test", Host: "grr.la"})
 	e.RcptTo = append(e.RcptTo, mail.Address{User: "test", Host: "grr.la"})
 
 
 	l, _ := log.GetLogger("./test_redis.log", "debug")
 	l, _ := log.GetLogger("./test_redis.log", "debug")
 	g, err := New("default", BackendConfig{
 	g, err := New("default", BackendConfig{
-		"processors": {
+		ConfigProcessors: {
 			"redis": {
 			"redis": {
 				"redis_interface":      "127.0.0.1:6379",
 				"redis_interface":      "127.0.0.1:6379",
 				"redis_expire_seconds": 7200,
 				"redis_expire_seconds": 7200,
 			},
 			},
 		},
 		},
-		"gateways": {
+		ConfigGateways: {
 			"default": {
 			"default": {
 				"save_process": "Hasher|Redis",
 				"save_process": "Hasher|Redis",
 			},
 			},

+ 3 - 3
chunk/chunk_test.go

@@ -608,7 +608,7 @@ func TestChunkSaverWrite(t *testing.T) {
 
 
 func initTestStream(transform bool) (*StoreMemory, *backends.StreamDecorator, *backends.StreamDecorator, backends.StreamProcessor) {
 func initTestStream(transform bool) (*StoreMemory, *backends.StreamDecorator, *backends.StreamDecorator, backends.StreamProcessor) {
 	// place the parse result in an envelope
 	// place the parse result in an envelope
-	e := mail.NewEnvelope("127.0.0.1", 1, "127.0.0.1:25")
+	e := mail.NewEnvelope("127.0.0.1", 1, 234)
 	to, _ := mail.NewAddress("[email protected]")
 	to, _ := mail.NewAddress("[email protected]")
 	e.RcptTo = append(e.RcptTo, *to)
 	e.RcptTo = append(e.RcptTo, *to)
 	from, _ := mail.NewAddress("[email protected]")
 	from, _ := mail.NewAddress("[email protected]")
@@ -642,7 +642,7 @@ func initTestStream(transform bool) (*StoreMemory, *backends.StreamDecorator, *b
 
 
 	// configure the buffer cap
 	// configure the buffer cap
 	bc := backends.BackendConfig{
 	bc := backends.BackendConfig{
-		"stream_processors": {
+		backends.ConfigStreamProcessors: {
 			"chunksaver": {
 			"chunksaver": {
 				"chunk_size":     8000,
 				"chunk_size":     8000,
 				"storage_engine": "memory",
 				"storage_engine": "memory",
@@ -652,7 +652,7 @@ func initTestStream(transform bool) (*StoreMemory, *backends.StreamDecorator, *b
 	}
 	}
 
 
 	//_ = backends.Svc.Initialize(bc)
 	//_ = backends.Svc.Initialize(bc)
-	_ = chunksaver.Configure(bc["stream_processors"]["chunksaver"])
+	_ = chunksaver.Configure(bc[backends.ConfigStreamProcessors]["chunksaver"])
 	_ = mimeanalyzer.Configure(backends.ConfigGroup{})
 	_ = mimeanalyzer.Configure(backends.ConfigGroup{})
 	// give it the envelope with the parse results
 	// give it the envelope with the parse results
 	_ = chunksaver.Open(e)
 	_ = chunksaver.Open(e)

+ 50 - 10
cmd/guerrillad/serve_test.go

@@ -377,6 +377,42 @@ func exponentialBackoff(i int) {
 	time.Sleep(time.Duration(Round(math.Pow(3.0, float64(i))-1.0)*100.0/2.0) * time.Millisecond)
 	time.Sleep(time.Duration(Round(math.Pow(3.0, float64(i))-1.0)*100.0/2.0) * time.Millisecond)
 }
 }
 
 
+func matchTestlog(startLine int, args ...interface{}) (bool, error) {
+	fd, err := os.Open("../../tests/testlog")
+	if err != nil {
+		return false, err
+	}
+	defer func() {
+		_ = fd.Close()
+	}()
+	for tries := 0; tries < 6; tries++ {
+		if b, err := ioutil.ReadAll(fd); err != nil {
+			return false, err
+		} else {
+			if test.MatchLog(string(b), startLine, args...) {
+				return true, nil
+			}
+		}
+		// close and reopen
+		err = fd.Close()
+		if err != nil {
+			return false, err
+		}
+		fd = nil
+
+		// sleep
+		exponentialBackoff(tries)
+		_ = mainlog.Reopen()
+
+		// re-open
+		fd, err = os.OpenFile("../../tests/testlog", os.O_RDONLY, 0644)
+		if err != nil {
+			return false, err
+		}
+	}
+	return false, nil
+}
+
 var grepNotFound error
 var grepNotFound error
 
 
 // grepTestlog looks for the `match` string in the testlog
 // grepTestlog looks for the `match` string in the testlog
@@ -547,7 +583,7 @@ func TestCmdConfigChangeEvents(t *testing.T) {
 	}
 	}
 
 
 	oldconf.BackendConfig = backends.BackendConfig{
 	oldconf.BackendConfig = backends.BackendConfig{
-		"processors": {"debugger": {"log_received_mails": true}},
+		backends.ConfigProcessors: {"debugger": {"log_received_mails": true}},
 	}
 	}
 	oldconf.BackendConfig.ConfigureDefaults()
 	oldconf.BackendConfig.ConfigureDefaults()
 
 
@@ -750,14 +786,19 @@ func TestServerAddEvent(t *testing.T) {
 
 
 	// shutdown the server
 	// shutdown the server
 	d.Shutdown()
 	d.Shutdown()
-
-	// did backend started as expected?
-	if _, err := grepTestlog("New server added [127.0.0.1:2526]", 0); err != nil {
-		t.Error("Did not add server [127.0.0.1:2526] after sighup")
+	// sever added as as expected?
+	if matched, err := matchTestlog(
+		1, "msg", "new server added",
+		"iface", "127.0.0.1:2526",
+		"event", "server_change:new_server",
+	); !matched {
+		t.Error("Did not add server [127.0.0.1:2526] after sighup", err)
 	}
 	}
 
 
-	if _, err := grepTestlog("Backend shutdown completed", 0); err != nil {
-		t.Error("Server failed to stop")
+	if matched, err := matchTestlog(
+		1, "msg", "Backend shutdown completed",
+	); !matched {
+		t.Error("Server failed to stop", err)
 	}
 	}
 
 
 }
 }
@@ -1539,9 +1580,8 @@ func TestDebugLevelChange(t *testing.T) {
 
 
 	d.Shutdown()
 	d.Shutdown()
 
 
-	// did the log level change to info?
-	if _, err := grepTestlog("log level changed to [info]", 0); err != nil {
-		t.Error("log level did not change to [info]")
+	if ok, err := matchTestlog(1, "msg", "log level changed"); !ok {
+		t.Error("log level did not change", err)
 		t.FailNow()
 		t.FailNow()
 	}
 	}
 
 

+ 2 - 2
config_test.go

@@ -250,8 +250,8 @@ func TestConfigChangeEvents(t *testing.T) {
 	logger, _ := log.GetLogger(oldconf.LogFile, oldconf.LogLevel)
 	logger, _ := log.GetLogger(oldconf.LogFile, oldconf.LogLevel)
 
 
 	oldconf.BackendConfig = backends.BackendConfig{
 	oldconf.BackendConfig = backends.BackendConfig{
-		"processors": {"debugger": {"log_received_mails": true}},
-		"gateways": {
+		backends.ConfigProcessors: {"debugger": {"log_received_mails": true}},
+		backends.ConfigGateways: {
 			"default": {
 			"default": {
 				"save_process": "HeadersParser|Header|Hasher|Debugger",
 				"save_process": "HeadersParser|Header|Hasher|Debugger",
 			},
 			},

+ 32 - 17
guerrilla.go

@@ -85,7 +85,7 @@ func (ls *logStore) setMainlog(log log.Logger) {
 // makeConfiguredBackends makes backends from the config
 // makeConfiguredBackends makes backends from the config
 func (g *guerrilla) makeConfiguredBackends(l log.Logger) ([]backends.Backend, error) {
 func (g *guerrilla) makeConfiguredBackends(l log.Logger) ([]backends.Backend, error) {
 	var list []backends.Backend
 	var list []backends.Backend
-	config := g.Config.BackendConfig[backends.ConfigGateways.String()]
+	config := g.Config.BackendConfig[backends.ConfigGateways]
 	if len(config) == 0 {
 	if len(config) == 0 {
 		return list, errors.New("no backends configured")
 		return list, errors.New("no backends configured")
 	}
 	}
@@ -274,8 +274,9 @@ func (g *guerrilla) subscribeEvents() {
 	events[EventConfigAllowedHosts] = daemonEvent(func(c *AppConfig) {
 	events[EventConfigAllowedHosts] = daemonEvent(func(c *AppConfig) {
 		g.mapServers(func(server *server) {
 		g.mapServers(func(server *server) {
 			server.setAllowedHosts(c.AllowedHosts)
 			server.setAllowedHosts(c.AllowedHosts)
+			g.mainlog().Fields("serverID", server.serverID, "event", EventConfigAllowedHosts).
+				Info("allowed_hosts config changed, a new list was set")
 		})
 		})
-		g.mainlog().Infof("allowed_hosts config changed, a new list was set")
 	})
 	})
 
 
 	// the main log file changed
 	// the main log file changed
@@ -288,9 +289,11 @@ func (g *guerrilla) subscribeEvents() {
 				// it will change server's logger when the next client gets accepted
 				// it will change server's logger when the next client gets accepted
 				server.mainlogStore.Store(l)
 				server.mainlogStore.Store(l)
 			})
 			})
-			g.mainlog().Infof("main log for new clients changed to [%s]", c.LogFile)
+			g.mainlog().Fields("file", c.LogFile).
+				Info("main log for new clients changed")
 		} else {
 		} else {
-			g.mainlog().WithError(err).Errorf("main logging change failed [%s]", c.LogFile)
+			g.mainlog().Fields("error", err, "file", c.LogFile).
+				Error("main logging change failed")
 		}
 		}
 
 
 	})
 	})
@@ -299,10 +302,11 @@ func (g *guerrilla) subscribeEvents() {
 	events[EventConfigLogReopen] = daemonEvent(func(c *AppConfig) {
 	events[EventConfigLogReopen] = daemonEvent(func(c *AppConfig) {
 		err := g.mainlog().Reopen()
 		err := g.mainlog().Reopen()
 		if err != nil {
 		if err != nil {
-			g.mainlog().WithError(err).Errorf("main log file [%s] failed to re-open", c.LogFile)
+			g.mainlog().Fields("error", err, "file", c.LogFile).
+				Error("main log file failed to re-open")
 			return
 			return
 		}
 		}
-		g.mainlog().Infof("re-opened main log file [%s]", c.LogFile)
+		g.mainlog().Fields("file", c.LogFile).Info("re-opened main log file")
 	})
 	})
 
 
 	// when log level changes, apply to mainlog and server logs
 	// when log level changes, apply to mainlog and server logs
@@ -313,7 +317,7 @@ func (g *guerrilla) subscribeEvents() {
 			g.mapServers(func(server *server) {
 			g.mapServers(func(server *server) {
 				server.logStore.Store(l)
 				server.logStore.Store(l)
 			})
 			})
-			g.mainlog().Infof("log level changed to [%s]", c.LogLevel)
+			g.mainlog().Fields("level", c.LogLevel).Info("log level changed")
 		}
 		}
 	})
 	})
 
 
@@ -325,27 +329,31 @@ func (g *guerrilla) subscribeEvents() {
 	// server config was updated
 	// server config was updated
 	events[EventConfigServerConfig] = serverEvent(func(sc *ServerConfig) {
 	events[EventConfigServerConfig] = serverEvent(func(sc *ServerConfig) {
 		g.setServerConfig(sc)
 		g.setServerConfig(sc)
-		g.mainlog().Infof("server %s config change event, a new config has been saved", sc.ListenInterface)
+		g.mainlog().Fields("iface", sc.ListenInterface).
+			Info("server config change event, a new config has been saved")
 	})
 	})
 
 
 	// add a new server to the config & start
 	// add a new server to the config & start
 	events[EventConfigServerNew] = serverEvent(func(sc *ServerConfig) {
 	events[EventConfigServerNew] = serverEvent(func(sc *ServerConfig) {
-		g.mainlog().Debugf("event fired [%s] %s", EventConfigServerNew, sc.ListenInterface)
+		g.mainlog().Fields("iface", sc.ListenInterface, "event", EventConfigServerNew).Debug("event fired")
 		if _, err := g.findServer(sc.ListenInterface); err != nil {
 		if _, err := g.findServer(sc.ListenInterface); err != nil {
+			values := []interface{}{"iface", sc.ListenInterface, "event", EventConfigServerNew}
 			// not found, lets add it
 			// not found, lets add it
 			if err := g.makeServers(); err != nil {
 			if err := g.makeServers(); err != nil {
-				g.mainlog().WithError(err).Errorf("cannot add server [%s]", sc.ListenInterface)
+				g.mainlog().Fields(append(values, "error", err)...).
+					Error("cannot add server")
 				return
 				return
 			}
 			}
-			g.mainlog().Infof("New server added [%s]", sc.ListenInterface)
+			g.mainlog().Fields(values...).Info("new server added")
 			if g.state == daemonStateStarted {
 			if g.state == daemonStateStarted {
 				err := g.Start()
 				err := g.Start()
 				if err != nil {
 				if err != nil {
-					g.mainlog().WithError(err).Info("Event server_change:new_server returned errors when starting")
+					g.mainlog().Fields(append(values, "error", err)...).
+						Error("Event server_change:new_server returned errors when starting")
 				}
 				}
 			}
 			}
 		} else {
 		} else {
-			g.mainlog().Debugf("new event, but server already fund")
+			g.mainlog().Fields("event", EventConfigServerNew).Debug("new event, but server already fund")
 		}
 		}
 	})
 	})
 
 
@@ -353,7 +361,8 @@ func (g *guerrilla) subscribeEvents() {
 	events[EventConfigServerStart] = serverEvent(func(sc *ServerConfig) {
 	events[EventConfigServerStart] = serverEvent(func(sc *ServerConfig) {
 		if server, err := g.findServer(sc.ListenInterface); err == nil {
 		if server, err := g.findServer(sc.ListenInterface); err == nil {
 			if server.state == ServerStateStopped || server.state == ServerStateNew {
 			if server.state == ServerStateStopped || server.state == ServerStateNew {
-				g.mainlog().Infof("Starting server [%s]", server.listenInterface)
+				g.mainlog().Fields("iface", server.listenInterface, "serverID", server.serverID).
+					Info("Starting server")
 				err := g.Start()
 				err := g.Start()
 				if err != nil {
 				if err != nil {
 					g.mainlog().WithError(err).Info("Event server_change:start_server returned errors when starting")
 					g.mainlog().WithError(err).Info("Event server_change:start_server returned errors when starting")
@@ -431,7 +440,12 @@ func (g *guerrilla) subscribeEvents() {
 	events[EventConfigServerLogReopen] = serverEvent(func(sc *ServerConfig) {
 	events[EventConfigServerLogReopen] = serverEvent(func(sc *ServerConfig) {
 		if server, err := g.findServer(sc.ListenInterface); err == nil {
 		if server, err := g.findServer(sc.ListenInterface); err == nil {
 			if err = server.log().Reopen(); err != nil {
 			if err = server.log().Reopen(); err != nil {
-				g.mainlog().WithError(err).Errorf("server [%s] log file [%s] failed to re-open", sc.ListenInterface, sc.LogFile)
+				g.mainlog().Fields(
+					"error", err,
+					"file", sc.LogFile,
+					"iface", sc.ListenInterface,
+					"serverID", server.serverID).
+					Error("server log file failed to re-open")
 				return
 				return
 			}
 			}
 			g.mainlog().Infof("Server [%s] re-opened log file [%s]", sc.ListenInterface, sc.LogFile)
 			g.mainlog().Infof("Server [%s] re-opened log file [%s]", sc.ListenInterface, sc.LogFile)
@@ -600,7 +614,8 @@ func (g *guerrilla) Start() error {
 		}
 		}
 		startWG.Add(1)
 		startWG.Add(1)
 		go func(s *server) {
 		go func(s *server) {
-			g.mainlog().Fields("iface", s.listenInterface).Info("starting server")
+			g.mainlog().Fields("iface", s.listenInterface, "serverID", s.serverID).
+				Info("starting server")
 			if err := s.Start(&startWG); err != nil {
 			if err := s.Start(&startWG); err != nil {
 				errs <- err
 				errs <- err
 			}
 			}
@@ -628,7 +643,7 @@ func (g *guerrilla) Shutdown() {
 	g.mapServers(func(s *server) {
 	g.mapServers(func(s *server) {
 		if s.state == ServerStateRunning {
 		if s.state == ServerStateRunning {
 			s.Shutdown()
 			s.Shutdown()
-			g.mainlog().Fields("iface", s.listenInterface).Info("shutdown completed")
+			g.mainlog().Fields("iface", s.listenInterface, "serverID", s.serverID).Info("shutdown completed")
 		}
 		}
 	})
 	})
 
 

+ 2 - 3
server.go

@@ -249,7 +249,8 @@ func (s *server) Start(startWG *sync.WaitGroup) error {
 	startWG.Done() // start successful, don't wait for me
 	startWG.Done() // start successful, don't wait for me
 
 
 	for {
 	for {
-		s.log().Fields("serverID", s.serverID, "nextSeq", clientID+1).Debug("waiting for a new client")
+		s.log().Fields("serverID", s.serverID, "nextSeq", clientID+1, "iface", s.listenInterface).
+			Debug("waiting for a new client")
 		conn, err := listener.Accept()
 		conn, err := listener.Accept()
 		clientID++
 		clientID++
 		if err != nil {
 		if err != nil {
@@ -276,12 +277,10 @@ func (s *server) Start(startWG *sync.WaitGroup) error {
 				s.log().Fields("error", borrowErr, "serverID", s.serverID).Error("couldn't borrow a new client")
 				s.log().Fields("error", borrowErr, "serverID", s.serverID).Error("couldn't borrow a new client")
 				// we could not get a client, so close the connection.
 				// we could not get a client, so close the connection.
 				_ = conn.Close()
 				_ = conn.Close()
-
 			}
 			}
 			// intentionally placed Borrow in args so that it's called in the
 			// intentionally placed Borrow in args so that it's called in the
 			// same main goroutine.
 			// same main goroutine.
 		}(s.clientPool.Borrow(conn, clientID, s.log(), s.envelopePool, s.serverID))
 		}(s.clientPool.Borrow(conn, clientID, s.log(), s.envelopePool, s.serverID))
-
 	}
 	}
 }
 }
 
 

+ 2 - 2
server_test.go

@@ -53,10 +53,10 @@ func getMockServerConn(sc *ServerConfig, t *testing.T) (*mocks.Conn, *server) {
 	}
 	}
 
 
 	bcfg := backends.BackendConfig{
 	bcfg := backends.BackendConfig{
-		backends.ConfigProcessors.String(): {
+		backends.ConfigProcessors: {
 			"debugger": {"log_received_mails": true},
 			"debugger": {"log_received_mails": true},
 		},
 		},
-		backends.ConfigGateways.String(): {
+		backends.ConfigGateways: {
 			backends.DefaultGateway: {"save_workers_size": 1},
 			backends.DefaultGateway: {"save_workers_size": 1},
 		},
 		},
 	}
 	}

+ 53 - 13
tests/guerrilla_test.go

@@ -186,6 +186,39 @@ func cleanTestArtifacts(t *testing.T) {
 
 
 }
 }
 
 
+func TestMatchConfig(t *testing.T) {
+	str := `
+time="2020-07-20T14:14:17+09:00" level=info msg="pid_file written" file=tests/go-guerrilla.pid pid=15247
+time="2020-07-20T14:14:17+09:00" level=debug msg="making servers"
+time="2020-07-20T14:14:17+09:00" level=info msg="processing worker started" gateway=default id=3
+time="2020-07-20T14:14:17+09:00" level=info msg="processing worker started" gateway=default id=2
+time="2020-07-20T14:14:17+09:00" level=info msg="starting server" iface="127.0.0.1:2526" serverID=0
+time="2020-07-20T14:14:17+09:00" level=info msg="processing worker started" gateway=default id=1
+time="2020-07-20T14:14:17+09:00" level=info msg="processing worker started" gateway=temp id=2
+time="2020-07-20T14:14:17+09:00" level=info msg="processing worker started" gateway=default id=4
+time="2020-07-20T14:14:17+09:00" level=info msg="processing worker started" gateway=temp id=3
+time="2020-07-20T14:14:17+09:00" level=info msg="processing worker started" gateway=temp id=4
+time="2020-07-20T14:14:17+09:00" level=info msg="processing worker started" gateway=temp id=1
+time="2020-07-20T14:14:17+09:00" level=info msg="listening on TCP" iface="127.0.0.1:2526" serverID=0
+time="2020-07-20T14:14:17+09:00" level=debug msg="waiting for a new client" nextSeq=1 serverID=0
+
+
+`
+	defer cleanTestArtifacts(t)
+	if !MatchLog(str, 1, "msg", "making servers") {
+		t.Error("making servers not matched")
+	}
+
+	if MatchLog(str, 10, "msg", "making servers") {
+		t.Error("not expecting making servers matched")
+	}
+
+	if !MatchLog(str, 1, "msg", "listening on TCP", "serverID", 0) {
+		t.Error("2 not pairs matched")
+	}
+
+}
+
 // Testing start and stop of server
 // Testing start and stop of server
 func TestStart(t *testing.T) {
 func TestStart(t *testing.T) {
 	if initErr != nil {
 	if initErr != nil {
@@ -201,41 +234,48 @@ func TestStart(t *testing.T) {
 	app.Shutdown()
 	app.Shutdown()
 	if read, err := ioutil.ReadFile("./testlog"); err == nil {
 	if read, err := ioutil.ReadFile("./testlog"); err == nil {
 		logOutput := string(read)
 		logOutput := string(read)
-		if i := strings.Index(logOutput, "msg=\"listening on TCP\" iface=\"127.0.0.1:4654\""); i < 0 {
+		if !MatchLog(logOutput, 1, "msg", "listening on TCP", "iface", "127.0.0.1:2526") {
 			t.Error("Server did not listen on 127.0.0.1:4654")
 			t.Error("Server did not listen on 127.0.0.1:4654")
 		}
 		}
-		if i := strings.Index(logOutput, "msg=\"listening on TCP\" iface=\"127.0.0.1:2526\""); i < 0 {
+
+		if !MatchLog(logOutput, 1, "msg", "listening on TCP", "iface", "127.0.0.1:2526") {
 			t.Error("Server did not listen on 127.0.0.1:2526")
 			t.Error("Server did not listen on 127.0.0.1:2526")
 		}
 		}
-		// msg="waiting for a new client" iface="127.0.0.1:4654"
-		if i := strings.Index(logOutput, "msg=\"waiting for a new client\" iface=\"127.0.0.1:4654\""); i < 0 {
+
+		if !MatchLog(logOutput, 1, "msg", "waiting for a new client", "iface", "127.0.0.1:4654") {
 			t.Error("Server did not wait on 127.0.0.1:4654")
 			t.Error("Server did not wait on 127.0.0.1:4654")
 		}
 		}
-		if i := strings.Index(logOutput, "msg=\"waiting for a new client\" iface=\"127.0.0.1:2526\""); i < 0 {
+
+		if !MatchLog(logOutput, 1, "msg", "waiting for a new client", "iface", "127.0.0.1:2526") {
 			t.Error("Server did not wait on 127.0.0.1:2526")
 			t.Error("Server did not wait on 127.0.0.1:2526")
 		}
 		}
-		if i := strings.Index(logOutput, "msg=\"server has stopped accepting new clients\" iface=\"127.0.0.1:4654\""); i < 0 {
+
+		if !MatchLog(logOutput, 1, "msg", "server has stopped accepting new clients", "iface", "127.0.0.1:4654") {
 			t.Error("Server did not stop on 127.0.0.1:4654")
 			t.Error("Server did not stop on 127.0.0.1:4654")
 		}
 		}
-		if i := strings.Index(logOutput, "msg=\"server has stopped accepting new clients\" iface=\"127.0.0.1:2526\""); i < 0 {
+		if !MatchLog(logOutput, 1, "msg", "server has stopped accepting new clients", "iface", "127.0.0.1:2526") {
 			t.Error("Server did not stop on 127.0.0.1:2526")
 			t.Error("Server did not stop on 127.0.0.1:2526")
 		}
 		}
-		if i := strings.Index(logOutput, "msg=\"shutdown completed\" iface=\"127.0.0.1:4654\""); i < 0 {
+
+		if !MatchLog(logOutput, 1, "msg", "shutdown completed", "iface", "127.0.0.1:4654") {
 			t.Error("Server did not complete shutdown on 127.0.0.1:4654")
 			t.Error("Server did not complete shutdown on 127.0.0.1:4654")
 		}
 		}
-		if i := strings.Index(logOutput, "msg=\"shutdown completed\" iface=\"127.0.0.1:2526\""); i < 0 {
+
+		if !MatchLog(logOutput, 1, "msg", "shutdown completed", "iface", "127.0.0.1:2526") {
 			t.Error("Server did not complete shutdown on 127.0.0.1:2526")
 			t.Error("Server did not complete shutdown on 127.0.0.1:2526")
 		}
 		}
-		if i := strings.Index(logOutput, "msg=\"shutting down pool\" iface=\"127.0.0.1:4654\""); i < 0 {
+
+		if !MatchLog(logOutput, 1, "msg", "shutting down pool", "iface", "127.0.0.1:4654") {
 			t.Error("Server did not shutdown pool on 127.0.0.1:4654")
 			t.Error("Server did not shutdown pool on 127.0.0.1:4654")
 		}
 		}
-		if i := strings.Index(logOutput, "msg=\"shutting down pool\" iface=\"127.0.0.1:2526\""); i < 0 {
+
+		if !MatchLog(logOutput, 1, "msg", "shutting down pool", "iface", "127.0.0.1:2526") {
 			t.Error("Server did not shutdown pool on 127.0.0.1:2526")
 			t.Error("Server did not shutdown pool on 127.0.0.1:2526")
 		}
 		}
-		if i := strings.Index(logOutput, "Backend shutdown completed"); i < 0 {
+
+		if !MatchLog(logOutput, 1, "msg", "Backend shutdown completed") {
 			t.Error("Backend didn't shut down")
 			t.Error("Backend didn't shut down")
 		}
 		}
-
 	}
 	}
 
 
 }
 }

+ 54 - 0
tests/util.go

@@ -0,0 +1,54 @@
+package test
+
+import (
+	"fmt"
+	"strings"
+)
+
+// MatchLog looks for the key/val in the input (a log file)
+func MatchLog(input string, startLine int, args ...interface{}) bool {
+	size := len(args)
+	if size < 2 || size%2 != 0 {
+		panic("args must be even")
+	}
+	lines := strings.Split(input, "\n")
+	if len(lines) < startLine {
+		panic("log too short, lines:" + string(len(lines)))
+	}
+	var lookFor string
+	// for each line
+	found := false
+	for i := startLine - 1; i < len(lines); i++ {
+		// for each pair
+		for j := 0; j < len(args); j++ {
+			if j%2 != 0 {
+				continue
+			}
+			key := args[j]
+			val := args[j+1]
+			switch val.(type) {
+			case string:
+				// quote it
+				lookFor = fmt.Sprintf(`%s="%s"`, key, val)
+				break
+			case fmt.Stringer:
+				// quote it
+				lookFor = fmt.Sprintf(`%s="%s"`, key, val)
+				break
+			default:
+				lookFor = fmt.Sprintf(`%s=%v`, key, val)
+			}
+			if pos := strings.Index(lines[i], lookFor); pos != -1 {
+				found = true
+			} else {
+				found = false
+				// short circuit
+				break
+			}
+		}
+		if found {
+			break
+		}
+	}
+	return found
+}