Prechádzať zdrojové kódy

separate process stack for validating recipients - it means it use a different processing stack for validating. Better efficiency and ability to control

flashmob 8 rokov pred
rodič
commit
29db21a16f

+ 22 - 13
api_test.go

@@ -100,7 +100,7 @@ func TestSMTPCustomBackend(t *testing.T) {
 	cfg.Servers = append(cfg.Servers, sc)
 	bcfg := backends.BackendConfig{
 		"save_workers_size":  3,
-		"process_stack":      "HeadersParser|Header|Hasher|Debugger",
+		"save_process":       "HeadersParser|Header|Hasher|Debugger",
 		"log_received_mails": true,
 		"primary_mail_host":  "example.com",
 	}
@@ -126,7 +126,7 @@ func TestSMTPLoadFile(t *testing.T) {
     "backend_config" :
         {
             "log_received_mails" : true,
-            "process_stack": "HeadersParser|Header|Hasher|Debugger",
+            "save_process": "HeadersParser|Header|Hasher|Debugger",
             "save_workers_size":  3
         },
     "servers" : [
@@ -154,7 +154,7 @@ func TestSMTPLoadFile(t *testing.T) {
     "backend_config" :
         {
             "log_received_mails" : true,
-            "process_stack": "HeadersParser|Header|Hasher|Debugger",
+            "save_process": "HeadersParser|Header|Hasher|Debugger",
             "save_workers_size":  3
         },
     "servers" : [
@@ -366,9 +366,12 @@ var funkyLogger = func() backends.Decorator {
 func TestSetAddProcessor(t *testing.T) {
 	os.Truncate("tests/testlog", 0)
 	cfg := &AppConfig{
-		LogFile:       "tests/testlog",
-		AllowedHosts:  []string{"grr.la"},
-		BackendConfig: backends.BackendConfig{"process_stack": "HeadersParser|Debugger|FunkyLogger"},
+		LogFile:      "tests/testlog",
+		AllowedHosts: []string{"grr.la"},
+		BackendConfig: backends.BackendConfig{
+			"save_process":     "HeadersParser|Debugger|FunkyLogger",
+			"validate_process": "FunkyLogger",
+		},
 	}
 	d := Daemon{Config: cfg}
 	d.AddProcessor("FunkyLogger", funkyLogger)
@@ -442,9 +445,12 @@ func TestReloadConfig(t *testing.T) {
 	d.Start()
 
 	cfg := &AppConfig{
-		LogFile:       "tests/testlog",
-		AllowedHosts:  []string{"grr.la"},
-		BackendConfig: backends.BackendConfig{"process_stack": "HeadersParser|Debugger|FunkyLogger"},
+		LogFile:      "tests/testlog",
+		AllowedHosts: []string{"grr.la"},
+		BackendConfig: backends.BackendConfig{
+			"save_process":     "HeadersParser|Debugger|FunkyLogger",
+			"validate_process": "FunkyLogger",
+		},
 	}
 	// Look mom, reloading the config without shutting down!
 	d.ReloadConfig(cfg)
@@ -461,10 +467,13 @@ func TestPubSubAPI(t *testing.T) {
 
 	// new config
 	cfg := &AppConfig{
-		PidFile:       "tests/pidfilex.pid",
-		LogFile:       "tests/testlog",
-		AllowedHosts:  []string{"grr.la"},
-		BackendConfig: backends.BackendConfig{"process_stack": "HeadersParser|Debugger|FunkyLogger"},
+		PidFile:      "tests/pidfilex.pid",
+		LogFile:      "tests/testlog",
+		AllowedHosts: []string{"grr.la"},
+		BackendConfig: backends.BackendConfig{
+			"save_process":     "HeadersParser|Debugger|FunkyLogger",
+			"validate_process": "FunkyLogger",
+		},
 	}
 
 	var i = 0

+ 2 - 1
backends/backend.go

@@ -210,7 +210,8 @@ func (s *service) shutdown() Errors {
 	return errors
 }
 
-// AddProcessor adds a new processor, which becomes available to the backend_config.process_stack option
+// AddProcessor adds a new processor, which becomes available to the backend_config.save_process option
+// and also the backend_config.validate_process option
 // Use to add your own custom processor when using backends as a package, or after importing an external
 // processor.
 func (s *service) AddProcessor(name string, p ProcessorConstructor) {

+ 34 - 13
backends/gateway.go

@@ -27,7 +27,8 @@ type BackendGateway struct {
 	// waits for backend workers to start/stop
 	wg           sync.WaitGroup
 	workStoppers []chan bool
-	chains       []Processor
+	processors   []Processor
+	validators   []Processor
 
 	// controls access to state
 	sync.Mutex
@@ -39,8 +40,10 @@ type BackendGateway struct {
 type GatewayConfig struct {
 	// WorkersSize controls how many concurrent workers to start. Defaults to 1
 	WorkersSize int `json:"save_workers_size,omitempty"`
-	// ProcessorStack controls which processors to chain in a stack.
-	ProcessorStack string `json:"process_stack,omitempty"`
+	// SaveProcess controls which processors to chain in a stack for saving email tasks
+	SaveProcess string `json:"save_process,omitempty"`
+	// ValidateProcess is like ProcessorStack, but for recipient validation tasks
+	ValidateProcess string `json:"validate_process,omitempty"`
 	// TimeoutSave is the number of seconds before timeout when saving an email
 	TimeoutSave int `json:"gw_save_timeout,omitempty"`
 	// TimeoutValidateRcpt is how many seconds before timeout when validating a recipient
@@ -191,10 +194,10 @@ func (gw *BackendGateway) Reinitialize() error {
 // newChain creates a new Processor by chaining multiple Processors in a call stack
 // Decorators are functions of Decorator type, source files prefixed with p_*
 // Each decorator does a specific task during the processing stage.
-// This function uses the config value process_stack to figure out which Decorator to use
-func (gw *BackendGateway) newChain() (Processor, error) {
+// This function uses the config value save_process or validate_process to figure out which Decorator to use
+func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 	var decorators []Decorator
-	cfg := strings.ToLower(strings.TrimSpace(gw.gwConfig.ProcessorStack))
+	cfg := strings.ToLower(strings.TrimSpace(stackConfig))
 	if len(cfg) == 0 {
 		cfg = strings.ToLower(defaultProcessor)
 	}
@@ -241,14 +244,22 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 			gw.State = BackendStateError
 			return errors.New("Must have at least 1 worker")
 		}
-		gw.chains = make([]Processor, 0)
+		gw.processors = make([]Processor, 0)
+		gw.validators = make([]Processor, 0)
 		for i := 0; i < workersSize; i++ {
-			p, err := gw.newChain()
+			p, err := gw.newStack(gw.gwConfig.SaveProcess)
 			if err != nil {
 				gw.State = BackendStateError
 				return err
 			}
-			gw.chains = append(gw.chains, p)
+			gw.processors = append(gw.processors, p)
+
+			v, err := gw.newStack(gw.gwConfig.ValidateProcess)
+			if err != nil {
+				gw.State = BackendStateError
+				return err
+			}
+			gw.validators = append(gw.validators, v)
 		}
 		// initialize processors
 		if err := Svc.initialize(cfg); err != nil {
@@ -282,7 +293,12 @@ func (gw *BackendGateway) Start() error {
 			stop := make(chan bool)
 			go func(workerId int, stop chan bool) {
 				// blocks here until the worker exits
-				gw.workDispatcher(gw.conveyor, gw.chains[workerId], workerId+1, stop)
+				gw.workDispatcher(
+					gw.conveyor,
+					gw.processors[workerId],
+					gw.validators[workerId],
+					workerId+1,
+					stop)
 				gw.wg.Done()
 			}(i, stop)
 			gw.workStoppers = append(gw.workStoppers, stop)
@@ -319,7 +335,12 @@ func (gw *BackendGateway) validateRcptTimeout() time.Duration {
 	return time.Duration(gw.gwConfig.TimeoutValidateRcpt)
 }
 
-func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, workerId int, stop chan bool) {
+func (gw *BackendGateway) workDispatcher(
+	workIn chan *workerMsg,
+	save Processor,
+	validate Processor,
+	workerId int,
+	stop chan bool) {
 
 	defer func() {
 		if r := recover(); r != nil {
@@ -345,7 +366,7 @@ func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, wo
 			if msg.task == TaskSaveMail {
 				// process the email here
 				// TODO we should check the err
-				result, _ := p.Process(msg.e, TaskSaveMail)
+				result, _ := save.Process(msg.e, TaskSaveMail)
 				if result.Code() < 300 {
 					// if all good, let the gateway know that it was queued
 					msg.notifyMe <- &notifyMsg{nil, msg.e.QueuedId}
@@ -354,7 +375,7 @@ func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, wo
 					msg.notifyMe <- &notifyMsg{err: errors.New(result.String())}
 				}
 			} else if msg.task == TaskValidateRcpt {
-				_, err := p.Process(msg.e, TaskValidateRcpt)
+				_, err := validate.Process(msg.e, TaskValidateRcpt)
 				if err != nil {
 					// validation failed
 					msg.notifyMe <- &notifyMsg{err: err}

+ 5 - 5
backends/gateway_test.go

@@ -19,7 +19,7 @@ func TestStates(t *testing.T) {
 
 func TestInitialize(t *testing.T) {
 	c := BackendConfig{
-		"process_stack":      "HeadersParser|Debugger",
+		"save_process":       "HeadersParser|Debugger",
 		"log_received_mails": true,
 		"save_workers_size":  "1",
 	}
@@ -30,10 +30,10 @@ func TestInitialize(t *testing.T) {
 		t.Error("Gateway did not init because:", err)
 		t.Fail()
 	}
-	if gateway.chains == nil {
+	if gateway.processors == nil {
 		t.Error("gateway.chains should not be nil")
-	} else if len(gateway.chains) != 1 {
-		t.Error("len(gateway.chains) should be 1, but got", len(gateway.chains))
+	} else if len(gateway.processors) != 1 {
+		t.Error("len(gateway.chains) should be 1, but got", len(gateway.processors))
 	}
 
 	if gateway.conveyor == nil {
@@ -50,7 +50,7 @@ func TestInitialize(t *testing.T) {
 
 func TestStartProcessStop(t *testing.T) {
 	c := BackendConfig{
-		"process_stack":      "HeadersParser|Debugger",
+		"save_process":       "HeadersParser|Debugger",
 		"log_received_mails": true,
 		"save_workers_size":  2,
 	}

+ 5 - 5
cmd/guerrillad/serve_test.go

@@ -34,7 +34,7 @@ var configJsonA = `
     ],
     "backend_config": {
     	"save_workers_size" : 1,
-    	"process_stack": "HeadersParser|Debugger",
+    	"save_process": "HeadersParser|Debugger",
         "log_received_mails": true
     },
     "servers" : [
@@ -83,7 +83,7 @@ var configJsonB = `
     ],
     "backend_config": {
     	"save_workers_size" : 1,
-    	"process_stack": "HeadersParser|Debugger",
+    	"save_process": "HeadersParser|Debugger",
         "log_received_mails": false
     },
     "servers" : [
@@ -130,7 +130,7 @@ var configJsonC = `
             "save_workers_size" : 3,
             "primary_mail_host":"sharklasers.com",
             "save_workers_size" : 1,
-	    "process_stack": "HeadersParser|Debugger",
+	    "save_process": "HeadersParser|Debugger",
 	    "log_received_mails": true
         },
     "servers" : [
@@ -179,7 +179,7 @@ var configJsonD = `
     ],
     "backend_config": {
         "save_workers_size" : 1,
-    	"process_stack": "HeadersParser|Debugger",
+    	"save_process": "HeadersParser|Debugger",
         "log_received_mails": false
     },
     "servers" : [
@@ -229,7 +229,7 @@ var configJsonE = `
     "backend_config" :
         {
             "process_stack_old": "HeadersParser|Debugger|Hasher|Header|Compressor|Redis|MySql",
-            "process_stack": "GuerrillaRedisDB",
+            "save_process": "GuerrillaRedisDB",
             "log_received_mails" : true,
             "mysql_db":"gmail_mail",
             "mysql_host":"127.0.0.1:3306",

+ 3 - 3
config.go

@@ -242,12 +242,12 @@ func (c *AppConfig) setBackendDefaults() error {
 		c.BackendConfig = backends.BackendConfig{
 			"log_received_mails": true,
 			"save_workers_size":  1,
-			"process_stack":      "HeadersParser|Header|Debugger",
+			"save_process":       "HeadersParser|Header|Debugger",
 			"primary_mail_host":  h,
 		}
 	} else {
-		if _, ok := c.BackendConfig["process_stack"]; !ok {
-			c.BackendConfig["process_stack"] = "HeadersParser|Header|Debugger"
+		if _, ok := c.BackendConfig["save_process"]; !ok {
+			c.BackendConfig["save_process"] = "HeadersParser|Header|Debugger"
 		}
 		if _, ok := c.BackendConfig["primary_mail_host"]; !ok {
 			h, err := os.Hostname()

+ 1 - 1
goguerrilla.conf.sample

@@ -12,7 +12,7 @@
     "backend_config": {
         "log_received_mails": true,
         "save_workers_size": 1,
-        "process_stack" : "HeadersParser|Header|Debugger",
+        "save_process" : "HeadersParser|Header|Debugger",
         "primary_mail_host" : "mail.example.com"
     },
     "servers" : [