Browse Source

debugging in progress 4

flashmob 5 years ago
parent
commit
589af8f4af
9 changed files with 64 additions and 52 deletions
  1. 1 1
      api.go
  2. 0 2
      backends/backend.go
  3. 30 3
      backends/config.go
  4. 14 13
      cmd/guerrillad/serve_test.go
  5. 1 25
      config.go
  6. 5 3
      event.go
  7. 11 1
      guerrilla.go
  8. 1 4
      server.go
  9. 1 0
      tests/guerrilla_test.go

+ 1 - 1
api.go

@@ -214,7 +214,7 @@ func (d *Daemon) configureDefaults() error {
 	if d.Backends == nil {
 	if d.Backends == nil {
 		d.Backends = make([]backends.Backend, 0)
 		d.Backends = make([]backends.Backend, 0)
 		// the config will be used to make backends
 		// the config will be used to make backends
-		err = d.Config.setBackendDefaults()
+		err = d.Config.BackendConfig.ConfigureDefaults()
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}

+ 0 - 2
backends/backend.go

@@ -244,7 +244,6 @@ func (s *service) AddProcessor(name string, p ProcessorConstructor) {
 	c = func() Decorator {
 	c = func() Decorator {
 		return p()
 		return p()
 	}
 	}
-	// add to our processors list
 	processors[strings.ToLower(name)] = c
 	processors[strings.ToLower(name)] = c
 }
 }
 
 
@@ -254,7 +253,6 @@ func (s *service) AddStreamProcessor(name string, p StreamProcessorConstructor)
 	c = func() *StreamDecorator {
 	c = func() *StreamDecorator {
 		return p()
 		return p()
 	}
 	}
-	// add to our processors list
 	Streamers[strings.ToLower(name)] = c
 	Streamers[strings.ToLower(name)] = c
 }
 }
 
 

+ 30 - 3
backends/config.go

@@ -3,6 +3,7 @@ package backends
 import (
 import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"os"
 	"reflect"
 	"reflect"
 	"strings"
 	"strings"
 )
 )
@@ -51,6 +52,30 @@ func (c *BackendConfig) GetValue(ns configNameSpace, name string, key string) in
 	return nil
 	return nil
 }
 }
 
 
+// ConfigureDefaults sets default values for the backend config,
+// if no backend config was added before starting, then use a default config
+// otherwise, see what required values were missed in the config and add any missing with defaults
+func (c *BackendConfig) ConfigureDefaults() error {
+	// set the defaults if no value has been configured
+	if c.GetValue(ConfigGateways, "default", "save_workers_size") == nil {
+		c.SetValue(ConfigGateways, "default", "save_workers_size", 1)
+	}
+	if c.GetValue(ConfigGateways, "default", "save_process") == nil {
+		c.SetValue(ConfigGateways, "default", "save_process", "HeadersParser|Header|Debugger")
+	}
+	if c.GetValue(ConfigProcessors, "default", "primary_mail_host") == nil {
+		h, err := os.Hostname()
+		if err != nil {
+			return err
+		}
+		c.SetValue(ConfigProcessors, "Header", "primary_mail_host", h)
+	}
+	if c.GetValue(ConfigProcessors, "default", "log_received_mails") == nil {
+		c.SetValue(ConfigProcessors, "Debugger", "log_received_mails", true)
+	}
+	return nil
+}
+
 type configNameSpace int
 type configNameSpace int
 
 
 const (
 const (
@@ -126,13 +151,15 @@ func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed
 	added = make(map[string]bool, 0)
 	added = make(map[string]bool, 0)
 	removed = make(map[string]bool, 0)
 	removed = make(map[string]bool, 0)
 
 
-	changedProcessors := changedConfigGroups(oldConfig[string(ConfigProcessors)], c[string(ConfigProcessors)])
-	changedStreamProcessors := changedConfigGroups(oldConfig[string(ConfigProcessors)], c[string(ConfigProcessors)])
+	changedProcessors := changedConfigGroups(
+		oldConfig[ConfigProcessors.String()], c[ConfigProcessors.String()])
+	changedStreamProcessors := changedConfigGroups(
+		oldConfig[ConfigProcessors.String()], c[ConfigProcessors.String()])
 	configType := BaseConfig(&GatewayConfig{})
 	configType := BaseConfig(&GatewayConfig{})
 
 
 	// go through all the gateway configs,
 	// go through all the gateway configs,
 	// make a list of all the ones that have processors whose config had changed
 	// make a list of all the ones that have processors whose config had changed
-	for key, _ := range c[string(ConfigGateways)] {
+	for key, _ := range c[ConfigGateways.String()] {
 		e, _ := Svc.ExtractConfig(ConfigGateways, key, c, configType)
 		e, _ := Svc.ExtractConfig(ConfigGateways, key, c, configType)
 		bcfg := e.(*GatewayConfig)
 		bcfg := e.(*GatewayConfig)
 		config := NewStackConfig(bcfg.SaveProcess)
 		config := NewStackConfig(bcfg.SaveProcess)

+ 14 - 13
cmd/guerrillad/serve_test.go

@@ -152,7 +152,7 @@ var configJsonC = `
             "sql" : {
             "sql" : {
  				"sql_dsn": "root:ok@tcp(127.0.0.1:3306)/gmail_mail?readTimeout=10s&writeTimeout=10s",
  				"sql_dsn": "root:ok@tcp(127.0.0.1:3306)/gmail_mail?readTimeout=10s&writeTimeout=10s",
             	"mail_table":"new_mail",
             	"mail_table":"new_mail",
-				"primary_mail_host":"sharklasers.com",
+				"primary_mail_host":"sharklasers.com"
 			}
 			}
 		},
 		},
 		"gateways" : {
 		"gateways" : {
@@ -537,8 +537,8 @@ func TestCmdConfigChangeEvents(t *testing.T) {
 	}
 	}
 
 
 	expectedEvents := map[guerrilla.Event]bool{
 	expectedEvents := map[guerrilla.Event]bool{
-		guerrilla.EventConfigBackendConfigChanged: false,
-		guerrilla.EventConfigServerNew:            false,
+		guerrilla.EventConfigBackendConfigChanged: false, // backend_change:backend
+		guerrilla.EventConfigServerNew:            false, // server_change:new_server
 	}
 	}
 	mainlog, err = getTestLog()
 	mainlog, err = getTestLog()
 	if err != nil {
 	if err != nil {
@@ -549,28 +549,34 @@ func TestCmdConfigChangeEvents(t *testing.T) {
 	oldconf.BackendConfig = backends.BackendConfig{
 	oldconf.BackendConfig = backends.BackendConfig{
 		"processors": {"debugger": {"log_received_mails": true}},
 		"processors": {"debugger": {"log_received_mails": true}},
 	}
 	}
+	oldconf.BackendConfig.ConfigureDefaults()
 
 
 	backend, err := backends.New(backends.DefaultGateway, oldconf.BackendConfig, mainlog)
 	backend, err := backends.New(backends.DefaultGateway, oldconf.BackendConfig, mainlog)
 	if err != nil {
 	if err != nil {
 		t.Error("failed to create backend", err)
 		t.Error("failed to create backend", err)
+		return
 	}
 	}
-
 	app, err := guerrilla.New(oldconf, mainlog, backend)
 	app, err := guerrilla.New(oldconf, mainlog, backend)
 	if err != nil {
 	if err != nil {
 		t.Error("Failed to create new app", err)
 		t.Error("Failed to create new app", err)
 	}
 	}
-	toUnsubscribe := map[guerrilla.Event]func(c *guerrilla.AppConfig){}
-	toUnsubscribeS := map[guerrilla.Event]func(c *guerrilla.ServerConfig){}
+	toUnsubscribe := map[guerrilla.Event]interface{}{}
 
 
 	for event := range expectedEvents {
 	for event := range expectedEvents {
 		// Put in anon func since range is overwriting event
 		// Put in anon func since range is overwriting event
 		func(e guerrilla.Event) {
 		func(e guerrilla.Event) {
-			if strings.Index(e.String(), "server_change") == 0 {
+			if strings.Index(e.String(), "backend_change") == 0 {
+				f := func(c *guerrilla.AppConfig, gateway string) {
+					expectedEvents[e] = true
+				}
+				_ = app.Subscribe(e, f)
+				toUnsubscribe[e] = f
+			} else if strings.Index(e.String(), "server_change") == 0 {
 				f := func(c *guerrilla.ServerConfig) {
 				f := func(c *guerrilla.ServerConfig) {
 					expectedEvents[e] = true
 					expectedEvents[e] = true
 				}
 				}
 				_ = app.Subscribe(e, f)
 				_ = app.Subscribe(e, f)
-				toUnsubscribeS[e] = f
+				toUnsubscribe[e] = f
 			} else {
 			} else {
 				f := func(c *guerrilla.AppConfig) {
 				f := func(c *guerrilla.AppConfig) {
 					expectedEvents[e] = true
 					expectedEvents[e] = true
@@ -578,7 +584,6 @@ func TestCmdConfigChangeEvents(t *testing.T) {
 				_ = app.Subscribe(e, f)
 				_ = app.Subscribe(e, f)
 				toUnsubscribe[e] = f
 				toUnsubscribe[e] = f
 			}
 			}
-
 		}(event)
 		}(event)
 	}
 	}
 
 
@@ -589,10 +594,6 @@ func TestCmdConfigChangeEvents(t *testing.T) {
 	for unevent, unfun := range toUnsubscribe {
 	for unevent, unfun := range toUnsubscribe {
 		_ = app.Unsubscribe(unevent, unfun)
 		_ = app.Unsubscribe(unevent, unfun)
 	}
 	}
-	for unevent, unfun := range toUnsubscribeS {
-		_ = app.Unsubscribe(unevent, unfun)
-	}
-
 	for event, val := range expectedEvents {
 	for event, val := range expectedEvents {
 		if val == false {
 		if val == false {
 			t.Error("Did not fire config change event:", event)
 			t.Error("Did not fire config change event:", event)

+ 1 - 25
config.go

@@ -168,7 +168,7 @@ func (c *AppConfig) Load(jsonBytes []byte) error {
 	if err = c.setDefaults(); err != nil {
 	if err = c.setDefaults(); err != nil {
 		return err
 		return err
 	}
 	}
-	if err = c.setBackendDefaults(); err != nil {
+	if err = c.BackendConfig.ConfigureDefaults(); err != nil {
 		return err
 		return err
 	}
 	}
 
 
@@ -340,30 +340,6 @@ func (c *AppConfig) setDefaults() error {
 	return nil
 	return nil
 }
 }
 
 
-// setBackendDefaults sets default values for the backend config,
-// if no backend config was added before starting, then use a default config
-// otherwise, see what required values were missed in the config and add any missing with defaults
-func (c *AppConfig) setBackendDefaults() error {
-	h, err := os.Hostname()
-	if err != nil {
-		return err
-	}
-	// set the defaults if no value has been configured
-	if c.BackendConfig.GetValue(backends.ConfigGateways, "default", "save_workers_size") == nil {
-		c.BackendConfig.SetValue(backends.ConfigGateways, "default", "save_workers_size", 1)
-	}
-	if c.BackendConfig.GetValue(backends.ConfigGateways, "default", "save_process") == nil {
-		c.BackendConfig.SetValue(backends.ConfigGateways, "default", "save_process", "HeadersParser|Header|Debugger")
-	}
-	if c.BackendConfig.GetValue(backends.ConfigProcessors, "default", "primary_mail_host") == nil {
-		c.BackendConfig.SetValue(backends.ConfigProcessors, "Header", "primary_mail_host", h)
-	}
-	if c.BackendConfig.GetValue(backends.ConfigProcessors, "default", "log_received_mails") == nil {
-		c.BackendConfig.SetValue(backends.ConfigProcessors, "Debugger", "log_received_mails", true)
-	}
-	return nil
-}
-
 // Emits any configuration change events on the server.
 // Emits any configuration change events on the server.
 // All events are fired and run synchronously
 // All events are fired and run synchronously
 func (sc *ServerConfig) emitChangeEvents(oldServer *ServerConfig, app Guerrilla) {
 func (sc *ServerConfig) emitChangeEvents(oldServer *ServerConfig, app Guerrilla) {

+ 5 - 3
event.go

@@ -21,7 +21,9 @@ const (
 	EventConfigLogLevel
 	EventConfigLogLevel
 	// when the backend's config changed
 	// when the backend's config changed
 	EventConfigBackendConfigChanged
 	EventConfigBackendConfigChanged
+	// when a gateway was added
 	EventConfigBackendConfigAdded
 	EventConfigBackendConfigAdded
+	// when a gateway was removed
 	EventConfigBackendConfigRemoved
 	EventConfigBackendConfigRemoved
 	// when a new server was added
 	// when a new server was added
 	EventConfigServerNew
 	EventConfigServerNew
@@ -54,9 +56,9 @@ var eventList = [...]string{
 	"config_change:log_file",
 	"config_change:log_file",
 	"config_change:reopen_log_file",
 	"config_change:reopen_log_file",
 	"config_change:log_level",
 	"config_change:log_level",
-	"config_change:backend_config",
-	"config_change:backend_config_added",
-	"config_change:backend_removed",
+	"backend_change:backend",
+	"backend_change:backend_config_added",
+	"backend_change:backend_removed",
 	"server_change:new_server",
 	"server_change:new_server",
 	"server_change:remove_server",
 	"server_change:remove_server",
 	"server_change:update_config",
 	"server_change:update_config",

+ 11 - 1
guerrilla.go

@@ -120,6 +120,9 @@ func New(ac *AppConfig, l log.Logger, b ...backends.Backend) (Guerrilla, error)
 		g.backends = make(BackendContainer)
 		g.backends = make(BackendContainer)
 	}
 	}
 	for i := range b {
 	for i := range b {
+		if b[i] == nil {
+			return g, errors.New("cannot use a nil backend")
+		}
 		g.storeBackend(b[i])
 		g.storeBackend(b[i])
 	}
 	}
 	g.setMainlog(l)
 	g.setMainlog(l)
@@ -538,7 +541,10 @@ func (g *guerrilla) storeBackend(b backends.Backend) {
 	defer g.beGuard.Unlock()
 	defer g.beGuard.Unlock()
 	g.backends[b.Name()] = b
 	g.backends[b.Name()] = b
 	g.mapServers(func(server *server) {
 	g.mapServers(func(server *server) {
-		server.setBackend(b)
+		sc := server.configStore.Load().(ServerConfig)
+		if b.Name() == sc.Gateway {
+			server.setBackend(b)
+		}
 	})
 	})
 }
 }
 
 
@@ -548,6 +554,10 @@ func (g *guerrilla) backend(name string) backends.Backend {
 	if b, ok := g.backends[name]; ok {
 	if b, ok := g.backends[name]; ok {
 		return b
 		return b
 	}
 	}
+	// if not found, return a random one
+	for b := range g.backends {
+		return g.backends[b]
+	}
 	return nil
 	return nil
 }
 }
 
 

+ 1 - 4
server.go

@@ -180,10 +180,7 @@ func (s *server) configureTLS() error {
 
 
 // setBackend sets the backend to use for processing email envelopes
 // setBackend sets the backend to use for processing email envelopes
 func (s *server) setBackend(b backends.Backend) {
 func (s *server) setBackend(b backends.Backend) {
-	sc := s.configStore.Load().(ServerConfig)
-	if b.Name() == sc.Gateway {
-		s.backendStore.Store(b)
-	}
+	s.backendStore.Store(b)
 }
 }
 
 
 // backend gets the backend used to process email envelopes
 // backend gets the backend used to process email envelopes

+ 1 - 0
tests/guerrilla_test.go

@@ -125,6 +125,7 @@ var configJson = `
 `
 `
 
 
 func getBackend(backendConfig backends.BackendConfig, l log.Logger) (backends.Backend, error) {
 func getBackend(backendConfig backends.BackendConfig, l log.Logger) (backends.Backend, error) {
+	_ = backendConfig.ConfigureDefaults()
 	b, err := backends.New(backends.DefaultGateway, backendConfig, l)
 	b, err := backends.New(backends.DefaultGateway, backendConfig, l)
 	if err != nil {
 	if err != nil {
 		fmt.Println("backend init error", err)
 		fmt.Println("backend init error", err)