Browse Source

debugging in progress 6

flashmob 5 years ago
parent
commit
137128d1d5

+ 0 - 16
backends/backend_test.go

@@ -1,16 +0,0 @@
-package backends
-
-import (
-	"encoding/json"
-	"fmt"
-	"testing"
-)
-
-func TestSetProcessorValue(t *testing.T) {
-
-	var test BackendConfig
-	test = make(map[string]map[string]interface{}, 0)
-	test.SetValue("processors", "ABC", "key", "value")
-	out, _ := json.MarshalIndent(test, "", "   ")
-	fmt.Println(string(out))
-}

+ 1 - 1
backends/config.go

@@ -151,7 +151,7 @@ func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed
 	added = make(map[string]bool, 0)
 	added = make(map[string]bool, 0)
 	removed = make(map[string]bool, 0)
 	removed = make(map[string]bool, 0)
 	cp := ConfigProcessors.String()
 	cp := ConfigProcessors.String()
-	csp := ConfigProcessors.String()
+	csp := ConfigStreamProcessors.String()
 	cg := ConfigGateways.String()
 	cg := ConfigGateways.String()
 	changedProcessors := changedConfigGroups(
 	changedProcessors := changedConfigGroups(
 		oldConfig[cp], c[cp])
 		oldConfig[cp], c[cp])

+ 22 - 6
backends/gateway_test.go

@@ -22,9 +22,17 @@ func TestStates(t *testing.T) {
 
 
 func TestInitialize(t *testing.T) {
 func TestInitialize(t *testing.T) {
 	c := BackendConfig{
 	c := BackendConfig{
-		"save_process":       "HeadersParser|Debugger",
-		"log_received_mails": true,
-		"save_workers_size":  "1",
+		"processors": {
+			"Debugger": {
+				"log_received_mails": true,
+			},
+		},
+		"gateways": {
+			"default": {
+				"save_process":      "HeadersParser|Debugger",
+				"save_workers_size": "1",
+			},
+		},
 	}
 	}
 
 
 	gateway := &BackendGateway{}
 	gateway := &BackendGateway{}
@@ -53,9 +61,17 @@ func TestInitialize(t *testing.T) {
 
 
 func TestStartProcessStop(t *testing.T) {
 func TestStartProcessStop(t *testing.T) {
 	c := BackendConfig{
 	c := BackendConfig{
-		"save_process":       "HeadersParser|Debugger",
-		"log_received_mails": true,
-		"save_workers_size":  2,
+		"processors": {
+			"Debugger": {
+				"log_received_mails": true,
+			},
+		},
+		"gateways": {
+			"default": {
+				"save_process":      "HeadersParser|Debugger",
+				"save_workers_size": "2",
+			},
+		},
 	}
 	}
 
 
 	gateway := &BackendGateway{}
 	gateway := &BackendGateway{}

+ 12 - 4
backends/p_redis_test.go

@@ -15,10 +15,18 @@ func TestRedisGeneric(t *testing.T) {
 	e.RcptTo = append(e.RcptTo, mail.Address{User: "test", Host: "grr.la"})
 	e.RcptTo = append(e.RcptTo, mail.Address{User: "test", Host: "grr.la"})
 
 
 	l, _ := log.GetLogger("./test_redis.log", "debug")
 	l, _ := log.GetLogger("./test_redis.log", "debug")
-	g, err := New(BackendConfig{
-		"save_process":         "Hasher|Redis",
-		"redis_interface":      "127.0.0.1:6379",
-		"redis_expire_seconds": 7200,
+	g, err := New("default", BackendConfig{
+		"processors": {
+			"redis": {
+				"redis_interface":      "127.0.0.1:6379",
+				"redis_expire_seconds": 7200,
+			},
+		},
+		"gateways": {
+			"default": {
+				"save_process": "Hasher|Redis",
+			},
+		},
 	}, l)
 	}, l)
 	if err != nil {
 	if err != nil {
 		t.Error(err)
 		t.Error(err)

+ 1 - 1
backends/s_transformer.go

@@ -224,7 +224,7 @@ func Transformer() *StreamDecorator {
 	var conf *TransformerConfig
 	var conf *TransformerConfig
 
 
 	Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
 	Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
-		configType := BaseConfig(&HeaderConfig{})
+		configType := BaseConfig(&TransformerConfig{})
 		bcfg, err := Svc.ExtractConfig(
 		bcfg, err := Svc.ExtractConfig(
 			ConfigStreamProcessors, "transformer", backendConfig, configType)
 			ConfigStreamProcessors, "transformer", backendConfig, configType)
 		if err != nil {
 		if err != nil {

+ 10 - 4
chunk/chunk_test.go

@@ -641,10 +641,16 @@ func initTestStream(transform bool) (*StoreMemory, *backends.StreamDecorator, *b
 	}
 	}
 
 
 	// configure the buffer cap
 	// configure the buffer cap
-	bc := backends.BackendConfig{}
-	bc["chunksaver_chunk_size"] = 8000
-	bc["chunksaver_storage_engine"] = "memory"
-	bc["chunksaver_compress_level"] = 0
+	bc := backends.BackendConfig{
+		"stream_processors": {
+			"chunksaver": {
+				"chunksaver_chunk_size":     8000,
+				"chunksaver_storage_engine": "memory",
+				"chunksaver_compress_level": 0,
+			},
+		},
+	}
+
 	_ = backends.Svc.Initialize(bc)
 	_ = backends.Svc.Initialize(bc)
 
 
 	// give it the envelope with the parse results
 	// give it the envelope with the parse results

+ 28 - 24
cmd/guerrillad/serve_test.go

@@ -29,7 +29,7 @@ var configJsonA = `
 {
 {
     "log_file" : "../../tests/testlog",
     "log_file" : "../../tests/testlog",
     "log_level" : "debug",
     "log_level" : "debug",
-    "pid_file" : "./pidfile.pid",
+    "pid_file" : "pidfile.pid",
     "allowed_hosts": [
     "allowed_hosts": [
       "guerrillamail.com",
       "guerrillamail.com",
       "guerrillamailblock.com",
       "guerrillamailblock.com",
@@ -336,8 +336,8 @@ var configJsonE = `
 const testPauseDuration = time.Millisecond * 1010
 const testPauseDuration = time.Millisecond * 1010
 
 
 // reload config
 // reload config
-func sigHup() {
-	if data, err := ioutil.ReadFile("pidfile.pid"); err == nil {
+func sigHup(pidfile string) {
+	if data, err := ioutil.ReadFile(pidfile); err == nil {
 		mainlog.Infof("pid read is %s", data)
 		mainlog.Infof("pid read is %s", data)
 		ecmd := exec.Command("kill", "-HUP", string(data))
 		ecmd := exec.Command("kill", "-HUP", string(data))
 		_, err = ecmd.Output()
 		_, err = ecmd.Output()
@@ -351,8 +351,8 @@ func sigHup() {
 }
 }
 
 
 // shutdown after calling serve()
 // shutdown after calling serve()
-func sigKill() {
-	if data, err := ioutil.ReadFile("pidfile.pid"); err == nil {
+func sigKill(pidfile string) {
+	if data, err := ioutil.ReadFile(pidfile); err == nil {
 		mainlog.Infof("pid read is %s", data)
 		mainlog.Infof("pid read is %s", data)
 		ecmd := exec.Command("kill", string(data))
 		ecmd := exec.Command("kill", string(data))
 		_, err = ecmd.Output()
 		_, err = ecmd.Output()
@@ -627,13 +627,18 @@ func TestServe(t *testing.T) {
 
 
 	cmd := &cobra.Command{}
 	cmd := &cobra.Command{}
 	configPath = "configJsonA.json"
 	configPath = "configJsonA.json"
-
 	go func() {
 	go func() {
 		serve(cmd, []string{})
 		serve(cmd, []string{})
 	}()
 	}()
 	if _, err := grepTestlog("istening on TCP 127.0.0.1:3536", 0); err != nil {
 	if _, err := grepTestlog("istening on TCP 127.0.0.1:3536", 0); err != nil {
 		t.Error("server not started")
 		t.Error("server not started")
 	}
 	}
+
+	// wait for the pidfle to be written out
+	if _, err := grepTestlog("pid_file", 0); err != nil {
+		t.Error("pid_file not written")
+	}
+
 	data, err := ioutil.ReadFile("pidfile.pid")
 	data, err := ioutil.ReadFile("pidfile.pid")
 	if err != nil {
 	if err != nil {
 		t.Error("error reading pidfile.pid", err)
 		t.Error("error reading pidfile.pid", err)
@@ -656,21 +661,20 @@ func TestServe(t *testing.T) {
 	// Would not work on windows as kill is not available.
 	// Would not work on windows as kill is not available.
 	// TODO: Implement an alternative test for windows.
 	// TODO: Implement an alternative test for windows.
 	if runtime.GOOS != "windows" {
 	if runtime.GOOS != "windows" {
-		sigHup()
+		sigHup("pidfile.pid")
 		// did the pidfile change as expected?
 		// did the pidfile change as expected?
 		if _, err := grepTestlog("Configuration was reloaded", 0); err != nil {
 		if _, err := grepTestlog("Configuration was reloaded", 0); err != nil {
-			t.Error("server did not catch sighp")
+			t.Error("server did not catch sighup")
 		}
 		}
 	}
 	}
+	if _, err := grepTestlog("gateway with new config started", 0); err != nil {
+		t.Error("Dummy backend not restarted")
+	}
+
 	// send kill signal and wait for exit
 	// send kill signal and wait for exit
 	d.Shutdown()
 	d.Shutdown()
-
 	// did backend started as expected?
 	// did backend started as expected?
 
 
-	if _, err := grepTestlog("new backend started", 0); err != nil {
-		t.Error("Dummy backend not restarted")
-	}
-
 	// wait for shutdown
 	// wait for shutdown
 	if _, err := grepTestlog("Backend shutdown completed", 0); err != nil {
 	if _, err := grepTestlog("Backend shutdown completed", 0); err != nil {
 		t.Error("server didn't stop")
 		t.Error("server didn't stop")
@@ -727,7 +731,7 @@ func TestServerAddEvent(t *testing.T) {
 		}
 		}
 	}
 	}
 	// send a sighup signal to the server
 	// send a sighup signal to the server
-	sigHup()
+	sigHup("pidfile.pid")
 	if _, err := grepTestlog("[127.0.0.1:2526] Waiting for a new client", 0); err != nil {
 	if _, err := grepTestlog("[127.0.0.1:2526] Waiting for a new client", 0); err != nil {
 		t.Error("new server didn't start")
 		t.Error("new server didn't start")
 	}
 	}
@@ -806,7 +810,7 @@ func TestServerStartEvent(t *testing.T) {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
 	// send a sighup signal to the server
 	// send a sighup signal to the server
-	sigHup()
+	sigHup("pidfile.pid")
 
 
 	// see if the new server started?
 	// see if the new server started?
 	if _, err := grepTestlog("Listening on TCP 127.0.0.1:2228", 0); err != nil {
 	if _, err := grepTestlog("Listening on TCP 127.0.0.1:2228", 0); err != nil {
@@ -887,7 +891,7 @@ func TestServerStopEvent(t *testing.T) {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
 	// send a sighup signal to the server
 	// send a sighup signal to the server
-	sigHup()
+	sigHup("pidfile.pid")
 	// detect config change
 	// detect config change
 	if _, err := grepTestlog("Listening on TCP 127.0.0.1:2228", 0); err != nil {
 	if _, err := grepTestlog("Listening on TCP 127.0.0.1:2228", 0); err != nil {
 		t.Error("new server didn't start")
 		t.Error("new server didn't start")
@@ -920,7 +924,7 @@ func TestServerStopEvent(t *testing.T) {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
 	// send a sighup signal to the server
 	// send a sighup signal to the server
-	sigHup()
+	sigHup("pidfile.pid")
 	// detect config change
 	// detect config change
 	if _, err := grepTestlog("Server [127.0.0.1:2228] has stopped accepting new clients", 27); err != nil {
 	if _, err := grepTestlog("Server [127.0.0.1:2228] has stopped accepting new clients", 27); err != nil {
 		t.Error("127.0.0.1:2228 did not stop")
 		t.Error("127.0.0.1:2228 did not stop")
@@ -1037,7 +1041,7 @@ func TestAllowedHostsEvent(t *testing.T) {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
 	// send a sighup signal to the server to reload config
 	// send a sighup signal to the server to reload config
-	sigHup()
+	sigHup("pidfile.pid")
 
 
 	if _, err := grepTestlog("allowed_hosts config changed", 0); err != nil {
 	if _, err := grepTestlog("allowed_hosts config changed", 0); err != nil {
 		t.Error("allowed_hosts config not changed")
 		t.Error("allowed_hosts config not changed")
@@ -1175,7 +1179,7 @@ func TestTLSConfigEvent(t *testing.T) {
 		t.Error("Did not create cert ", err)
 		t.Error("Did not create cert ", err)
 	}
 	}
 
 
-	sigHup()
+	sigHup("pidfile.pid")
 
 
 	// wait for config to reload
 	// wait for config to reload
 	if _, err := grepTestlog("Server [127.0.0.1:4655] re-opened", 0); err != nil {
 	if _, err := grepTestlog("Server [127.0.0.1:4655] re-opened", 0); err != nil {
@@ -1245,7 +1249,7 @@ func TestBadTLSStart(t *testing.T) {
 		// it should exit by now because the TLS config is incorrect
 		// it should exit by now because the TLS config is incorrect
 		time.Sleep(testPauseDuration)
 		time.Sleep(testPauseDuration)
 
 
-		sigKill()
+		sigKill("pidfile.pid")
 		serveWG.Wait()
 		serveWG.Wait()
 
 
 		return
 		return
@@ -1334,7 +1338,7 @@ func TestBadTLSReload(t *testing.T) {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
 	// send a sighup signal to the server to reload config
 	// send a sighup signal to the server to reload config
-	sigHup()
+	sigHup("pidfile.pid")
 	// did the config reload reload event fire? There should be config read error
 	// did the config reload reload event fire? There should be config read error
 	if _, err := grepTestlog("could not read config file", 0); err != nil {
 	if _, err := grepTestlog("could not read config file", 0); err != nil {
 		t.Error("was expecting an error reading config")
 		t.Error("was expecting an error reading config")
@@ -1411,7 +1415,7 @@ func TestSetTimeoutEvent(t *testing.T) {
 	}
 	}
 
 
 	// send a sighup signal to the server to reload config
 	// send a sighup signal to the server to reload config
-	sigHup()
+	sigHup("pidfile.pid")
 
 
 	// did config update?
 	// did config update?
 	if _, err := grepTestlog("a new config has been saved", 0); err != nil {
 	if _, err := grepTestlog("a new config has been saved", 0); err != nil {
@@ -1513,7 +1517,7 @@ func TestDebugLevelChange(t *testing.T) {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
 	// send a sighup signal to the server to reload config
 	// send a sighup signal to the server to reload config
-	sigHup()
+	sigHup("pidfile.pid")
 	// did the config reload?
 	// did the config reload?
 	if _, err := grepTestlog("Configuration was reloaded", 0); err != nil {
 	if _, err := grepTestlog("Configuration was reloaded", 0); err != nil {
 		t.Error("config did not reload")
 		t.Error("config did not reload")
@@ -1545,7 +1549,7 @@ func TestDebugLevelChange(t *testing.T) {
 
 
 // When reloading with a bad backend config, it should revert to old backend config
 // When reloading with a bad backend config, it should revert to old backend config
 // using the API way
 // using the API way
-func TestBadBackendReload2(t *testing.T) {
+func TestBadBackendReload(t *testing.T) {
 
 
 	defer cleanTestArtifacts(t)
 	defer cleanTestArtifacts(t)
 	var err error
 	var err error

+ 28 - 18
guerrilla.go

@@ -449,20 +449,10 @@ func (g *guerrilla) subscribeEvents() {
 		g.storeBackend(b)
 		g.storeBackend(b)
 	})
 	})
 
 
-	events[EventConfigBackendConfigChanged] = backendEvent(func(appConfig *AppConfig, name string) {
-		logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel)
-		// shutdown the backend first.
-		var err error
-
-		if err = g.backend(name).Shutdown(); err != nil {
+	revertIfError := func(err error, name string, logger log.Logger, g *guerrilla) {
+		if err != nil {
 			fields := logrus.Fields{"error": err, "gateway": name}
 			fields := logrus.Fields{"error": err, "gateway": name}
-			logger.WithFields(fields).Warn("gateway failed to shutdown")
-			return
-		}
-		// init a new backend, Revert to old backend config if it fails
-		if newBackend, newErr := backends.New(name, appConfig.BackendConfig, logger); newErr != nil {
-			fields := logrus.Fields{"error": newErr, "gateway": name}
-			logger.WithFields(fields).Error("cannot change gateway config")
+			logger.WithFields(fields).Error("cannot change gateway config, reverting to old config")
 			err = g.backend(name).Reinitialize()
 			err = g.backend(name).Reinitialize()
 			if err != nil {
 			if err != nil {
 				fields = logrus.Fields{"error": err, "gateway": name}
 				fields = logrus.Fields{"error": err, "gateway": name}
@@ -472,18 +462,36 @@ func (g *guerrilla) subscribeEvents() {
 			err = g.backend(name).Start()
 			err = g.backend(name).Start()
 			if err != nil {
 			if err != nil {
 				fields = logrus.Fields{"error": err, "gateway": name}
 				fields = logrus.Fields{"error": err, "gateway": name}
-				logger.WithFields(fields).Fatal("failed to start gateway with old config")
+				logger.WithFields(fields).Error("failed to start gateway with old config")
 				return
 				return
 			}
 			}
 			logger.WithField("gateway", name).Info("reverted to old gateway config")
 			logger.WithField("gateway", name).Info("reverted to old gateway config")
+		}
+	}
+
+	events[EventConfigBackendConfigChanged] = backendEvent(func(appConfig *AppConfig, name string) {
+		logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel)
+		var err error
+		// shutdown the backend first.
+		if err = g.backend(name).Shutdown(); err != nil {
+			fields := logrus.Fields{"error": err, "gateway": name}
+			logger.WithFields(fields).Error("gateway failed to shutdown")
+			return // we can't do anything then
+		}
+		if newBackend, newErr := backends.New(name, appConfig.BackendConfig, logger); newErr != nil {
+			err = newErr
+			revertIfError(newErr, name, logger, g) // revert to old backend
+			return
 		} else {
 		} else {
-			// revert to the bew backend (assuming old gateway was shutdown so it can be safely swapped)
-			if err := newBackend.Start(); err != nil {
+			if err = newBackend.Start(); err != nil {
 				fields := logrus.Fields{"error": err, "gateway": name}
 				fields := logrus.Fields{"error": err, "gateway": name}
 				logger.WithFields(fields).Error("gateway could not start")
 				logger.WithFields(fields).Error("gateway could not start")
+				revertIfError(err, name, logger, g) // revert to old backend
+				return
+			} else {
+				logger.WithField("gateway", name).Info("gateway with new config started")
+				g.storeBackend(newBackend)
 			}
 			}
-			logger.WithField("gateway", name).Info("no gateway changed")
-			g.storeBackend(newBackend)
 		}
 		}
 	})
 	})
 
 
@@ -512,6 +520,8 @@ func (g *guerrilla) subscribeEvents() {
 		logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel)
 		logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel)
 		// shutdown the backend first.
 		// shutdown the backend first.
 		var err error
 		var err error
+		// revert
+		defer revertIfError(err, name, logger, g)
 		if err = g.backend(name).Shutdown(); err != nil {
 		if err = g.backend(name).Shutdown(); err != nil {
 			logger.WithFields(logrus.Fields{"error": err, "gateway": name}).Warn("gateway failed to shutdown")
 			logger.WithFields(logrus.Fields{"error": err, "gateway": name}).Warn("gateway failed to shutdown")
 			return
 			return