Bläddra i källkod

Fixed tests
- Added in type support to Backend's extractConfig func
- saveStatus contains queued instead of hash, better semantically
- BackendGateway default config if processor_line not present, or save_workers_size not present
- added queued id to envelope, a temporary default id is generated too

flashmob 8 år sedan
förälder
incheckning
32c1e9e9ad

+ 5 - 2
backends/backend.go

@@ -43,8 +43,8 @@ type BackendConfig map[string]interface{}
 type baseConfig interface{}
 type baseConfig interface{}
 
 
 type saveStatus struct {
 type saveStatus struct {
-	err  error
-	hash string
+	err      error
+	queuedID string
 }
 }
 
 
 // BackendResult represents a response to an SMTP client after receiving DATA.
 // BackendResult represents a response to an SMTP client after receiving DATA.
@@ -188,8 +188,11 @@ func (b *BackendService) extractConfig(configData BackendConfig, configType base
 			field_name = typeOfT.Field(i).Name
 			field_name = typeOfT.Field(i).Name
 		}
 		}
 		if f.Type().Name() == "int" {
 		if f.Type().Name() == "int" {
+			// in json, there is no int, only floats...
 			if intVal, converted := configData[field_name].(float64); converted {
 			if intVal, converted := configData[field_name].(float64); converted {
 				s.Field(i).SetInt(int64(intVal))
 				s.Field(i).SetInt(int64(intVal))
+			} else if intVal, converted := configData[field_name].(int); converted {
+				s.Field(i).SetInt(int64(intVal))
 			} else {
 			} else {
 				return configType, convertError("property missing/invalid: '" + field_name + "' of expected type: " + f.Type().Name())
 				return configType, convertError("property missing/invalid: '" + field_name + "' of expected type: " + f.Type().Name())
 			}
 			}

+ 8 - 3
backends/gateway.go

@@ -87,7 +87,7 @@ func (gw *BackendGateway) Process(e *envelope.Envelope) BackendResult {
 		if status.err != nil {
 		if status.err != nil {
 			return NewBackendResult(response.Canned.FailBackendTransaction + status.err.Error())
 			return NewBackendResult(response.Canned.FailBackendTransaction + status.err.Error())
 		}
 		}
-		return NewBackendResult(response.Canned.SuccessMessageQueued + status.hash)
+		return NewBackendResult(response.Canned.SuccessMessageQueued + status.queuedID)
 
 
 	case <-time.After(time.Second * 30):
 	case <-time.After(time.Second * 30):
 		Log().Infof("Backend has timed out")
 		Log().Infof("Backend has timed out")
@@ -112,8 +112,6 @@ func (gw *BackendGateway) Shutdown() error {
 
 
 // Reinitialize starts up a backend gateway that was shutdown before
 // Reinitialize starts up a backend gateway that was shutdown before
 func (gw *BackendGateway) Reinitialize() error {
 func (gw *BackendGateway) Reinitialize() error {
-	gw.Lock()
-	defer gw.Unlock()
 	if gw.State != BackendStateShuttered {
 	if gw.State != BackendStateShuttered {
 		return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
 		return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
 	}
 	}
@@ -121,6 +119,7 @@ func (gw *BackendGateway) Reinitialize() error {
 	if err != nil {
 	if err != nil {
 		return fmt.Errorf("error while initializing the backend: %s", err)
 		return fmt.Errorf("error while initializing the backend: %s", err)
 	}
 	}
+
 	gw.State = BackendStateRunning
 	gw.State = BackendStateRunning
 	return err
 	return err
 }
 }
@@ -149,6 +148,12 @@ func (gw *BackendGateway) newProcessorLine() Processor {
 // loadConfig loads the config for the GatewayConfig
 // loadConfig loads the config for the GatewayConfig
 func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
 func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
 	configType := baseConfig(&GatewayConfig{})
 	configType := baseConfig(&GatewayConfig{})
+	if _, ok := cfg["process_line"]; !ok {
+		cfg["process_line"] = "Debugger"
+	}
+	if _, ok := cfg["save_workers_size"]; !ok {
+		cfg["save_workers_size"] = 1
+	}
 	bcfg, err := Service.extractConfig(cfg, configType)
 	bcfg, err := Service.extractConfig(cfg, configType)
 	if err != nil {
 	if err != nil {
 		return err
 		return err

+ 2 - 1
backends/p_mysql.go

@@ -30,7 +30,7 @@ import (
 //               : e.MailFrom
 //               : e.MailFrom
 //               : e.Subject - generated by by ParseHeader() processor
 //               : e.Subject - generated by by ParseHeader() processor
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
-// Output        : No output, just saves to mysql
+// Output        : Sets e.QueuedId with the first item fromHashes[0]
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
 func init() {
 func init() {
 	Processors["mysql"] = func() Decorator {
 	Processors["mysql"] = func() Decorator {
@@ -164,6 +164,7 @@ func MySql() Decorator {
 			hash := ""
 			hash := ""
 			if len(e.Hashes) > 0 {
 			if len(e.Hashes) > 0 {
 				hash = e.Hashes[0]
 				hash = e.Hashes[0]
+				e.QueuedId = e.Hashes[0]
 			}
 			}
 
 
 			var co *compressor
 			var co *compressor

+ 2 - 1
backends/p_redis.go

@@ -23,7 +23,7 @@ import (
 //               : e.DeliveryHeader generated by Header() processor
 //               : e.DeliveryHeader generated by Header() processor
 //               :
 //               :
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
-// Output        : No output, just saves to redis
+// Output        : Sets e.QueuedId with the first item fromHashes[0]
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
 func init() {
 func init() {
 
 
@@ -89,6 +89,7 @@ func Redis() Decorator {
 			hash := ""
 			hash := ""
 
 
 			if len(e.Hashes) > 0 {
 			if len(e.Hashes) > 0 {
+				e.QueuedId = e.Hashes[0]
 				hash = e.Hashes[0]
 				hash = e.Hashes[0]
 
 
 				var stringer fmt.Stringer
 				var stringer fmt.Stringer

+ 1 - 1
backends/worker.go

@@ -31,7 +31,7 @@ func (w *Worker) saveMailWorker(saveMailChan chan *savePayload, p Processor, wor
 		result, _ := p.Process(payload.mail)
 		result, _ := p.Process(payload.mail)
 		// if all good
 		// if all good
 		if result.Code() < 300 {
 		if result.Code() < 300 {
-			payload.savedNotify <- &saveStatus{nil, payload.mail.Hashes[0]}
+			payload.savedNotify <- &saveStatus{nil, payload.mail.QueuedId}
 		} else {
 		} else {
 			payload.savedNotify <- &saveStatus{errors.New(result.String()), ""}
 			payload.savedNotify <- &saveStatus{errors.New(result.String()), ""}
 		}
 		}

+ 1 - 1
client.go

@@ -54,7 +54,7 @@ type client struct {
 func NewClient(conn net.Conn, clientID uint64, logger log.Logger) *client {
 func NewClient(conn net.Conn, clientID uint64, logger log.Logger) *client {
 	c := &client{
 	c := &client{
 		conn:        conn,
 		conn:        conn,
-		Envelope:    envelope.NewEnvelope(getRemoteAddr(conn)),
+		Envelope:    envelope.NewEnvelope(getRemoteAddr(conn), clientID),
 		ConnectedAt: time.Now(),
 		ConnectedAt: time.Now(),
 		bufin:       newSMTPBufferedReader(conn),
 		bufin:       newSMTPBufferedReader(conn),
 		bufout:      bufio.NewWriter(conn),
 		bufout:      bufio.NewWriter(conn),

+ 4 - 4
cmd/guerrillad/serve_test.go

@@ -253,9 +253,9 @@ func TestCmdConfigChangeEvents(t *testing.T) {
 	newerconf.load([]byte(configJsonC))
 	newerconf.load([]byte(configJsonC))
 
 
 	expectedEvents := map[guerrilla.Event]bool{
 	expectedEvents := map[guerrilla.Event]bool{
-		guerrilla.EvConfigBackendConfig: false,
-		guerrilla.EvConfigBackendName:   false,
-		guerrilla.EvConfigEvServerNew:   false,
+		guerrilla.EventConfigBackendConfig: false,
+		guerrilla.EventConfigBackendName:   false,
+		guerrilla.EventConfigEvServerNew:   false,
 	}
 	}
 	mainlog, _ = log.GetLogger("off")
 	mainlog, _ = log.GetLogger("off")
 
 
@@ -690,7 +690,7 @@ func TestAllowedHostsEvent(t *testing.T) {
 		//fmt.Println(logOutput)
 		//fmt.Println(logOutput)
 		if i := strings.Index(logOutput, "allowed_hosts config changed, a new list was set"); i < 0 {
 		if i := strings.Index(logOutput, "allowed_hosts config changed, a new list was set"); i < 0 {
 			t.Errorf("did not change allowed_hosts, most likely because Bus.Subscribe(\"%s\" didnt fire",
 			t.Errorf("did not change allowed_hosts, most likely because Bus.Subscribe(\"%s\" didnt fire",
-				guerrilla.EvConfigAllowedHosts)
+				guerrilla.EventConfigAllowedHosts)
 		}
 		}
 	}
 	}
 	// cleanup
 	// cleanup

+ 12 - 12
config_test.go

@@ -211,19 +211,19 @@ func TestConfigChangeEvents(t *testing.T) {
 	newconf.LogLevel = "off"
 	newconf.LogLevel = "off"
 	newconf.LogFile = "off"
 	newconf.LogFile = "off"
 	expectedEvents := map[Event]bool{
 	expectedEvents := map[Event]bool{
-		EvConfigPidFile:         false,
-		EvConfigLogFile:         false,
-		EvConfigLogLevel:        false,
-		EvConfigAllowedHosts:    false,
-		EvConfigEvServerNew:     false, // 127.0.0.1:4654 will be added
-		EvConfigServerRemove:    false, // 127.0.0.1:9999 server removed
-		EvConfigServerStop:      false, // 127.0.0.1:3333: server (disabled)
-		EvConfigServerLogFile:   false, // 127.0.0.1:2526
-		EvConfigServerLogReopen: false, // 127.0.0.1:2527
-		EvConfigServerTimeout:   false, // 127.0.0.1:2526 timeout
+		EventConfigPidFile:         false,
+		EventConfigLogFile:         false,
+		EventConfigLogLevel:        false,
+		EventConfigAllowedHosts:    false,
+		EventConfigEvServerNew:     false, // 127.0.0.1:4654 will be added
+		EventConfigServerRemove:    false, // 127.0.0.1:9999 server removed
+		EventConfigServerStop:      false, // 127.0.0.1:3333: server (disabled)
+		EventConfigServerLogFile:   false, // 127.0.0.1:2526
+		EventConfigServerLogReopen: false, // 127.0.0.1:2527
+		EventConfigServerTimeout:   false, // 127.0.0.1:2526 timeout
 		//"server_change:tls_config":    false, // 127.0.0.1:2526
 		//"server_change:tls_config":    false, // 127.0.0.1:2526
-		EvConfigServerMaxClients: false, // 127.0.0.1:2526
-		EvConfigServerTLSConfig:  false, // 127.0.0.1:2527 timestamp changed on certificates
+		EventConfigServerMaxClients: false, // 127.0.0.1:2526
+		EventConfigServerTLSConfig:  false, // 127.0.0.1:2527 timestamp changed on certificates
 	}
 	}
 	toUnsubscribe := map[Event]func(c *AppConfig){}
 	toUnsubscribe := map[Event]func(c *AppConfig){}
 	toUnsubscribeSrv := map[Event]func(c *ServerConfig){}
 	toUnsubscribeSrv := map[Event]func(c *ServerConfig){}

+ 11 - 2
envelope/envelope.go

@@ -3,6 +3,7 @@ package envelope
 import (
 import (
 	"bufio"
 	"bufio"
 	"bytes"
 	"bytes"
+	"crypto/md5"
 	"encoding/base64"
 	"encoding/base64"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
@@ -12,6 +13,7 @@ import (
 	"net/textproto"
 	"net/textproto"
 	"regexp"
 	"regexp"
 	"strings"
 	"strings"
+	"time"
 )
 )
 
 
 // EmailAddress encodes an email address of the form `<user@host>`
 // EmailAddress encodes an email address of the form `<user@host>`
@@ -50,14 +52,21 @@ type Envelope struct {
 	Info map[string]interface{}
 	Info map[string]interface{}
 	// Hashes of each email on the rcpt
 	// Hashes of each email on the rcpt
 	Hashes []string
 	Hashes []string
-	//
+	// additional delivery header that may be added
 	DeliveryHeader string
 	DeliveryHeader string
+	// Email(s) will be queued with this id
+	QueuedId string
+	// ClientID is a unique id given to the client on connect
+	ClientID uint64
 }
 }
 
 
-func NewEnvelope(remoteAddr string) *Envelope {
+func NewEnvelope(remoteAddr string, clientID uint64) *Envelope {
+
 	return &Envelope{
 	return &Envelope{
 		RemoteAddress: remoteAddr,
 		RemoteAddress: remoteAddr,
 		Info:          make(map[string]interface{}),
 		Info:          make(map[string]interface{}),
+		QueuedId:      fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID)))),
+		ClientID:      clientID,
 	}
 	}
 }
 }
 
 

+ 4 - 1
server_test.go

@@ -41,7 +41,10 @@ func getMockServerConn(sc *ServerConfig, t *testing.T) (*mocks.Conn, *server) {
 	if logOpenError != nil {
 	if logOpenError != nil {
 		mainlog.WithError(logOpenError).Errorf("Failed creating a logger for mock conn [%s]", sc.ListenInterface)
 		mainlog.WithError(logOpenError).Errorf("Failed creating a logger for mock conn [%s]", sc.ListenInterface)
 	}
 	}
-	backend, err := backends.New("dummy", backends.BackendConfig{"log_received_mails": true}, mainlog)
+	backend, err := backends.New(
+		"dummy",
+		backends.BackendConfig{"log_received_mails": true, "save_workers_size": 1},
+		mainlog)
 	if err != nil {
 	if err != nil {
 		t.Error("new dummy backend failed because:", err)
 		t.Error("new dummy backend failed because:", err)
 	}
 	}

+ 2 - 2
tests/guerrilla_test.go

@@ -188,7 +188,6 @@ func TestGreeting(t *testing.T) {
 		t.FailNow()
 		t.FailNow()
 	}
 	}
 	if startErrors := app.Start(); startErrors == nil {
 	if startErrors := app.Start(); startErrors == nil {
-
 		// 1. plaintext connection
 		// 1. plaintext connection
 		conn, err := net.Dial("tcp", config.Servers[0].ListenInterface)
 		conn, err := net.Dial("tcp", config.Servers[0].ListenInterface)
 		if err != nil {
 		if err != nil {
@@ -236,6 +235,7 @@ func TestGreeting(t *testing.T) {
 		conn.Close()
 		conn.Close()
 
 
 	} else {
 	} else {
+		fmt.Println("Nope", startErrors)
 		if startErrors := app.Start(); startErrors != nil {
 		if startErrors := app.Start(); startErrors != nil {
 			t.Error(startErrors)
 			t.Error(startErrors)
 			t.FailNow()
 			t.FailNow()
@@ -1096,7 +1096,7 @@ func TestDataCommand(t *testing.T) {
 				bufin,
 				bufin,
 				email+"\r\n.\r\n")
 				email+"\r\n.\r\n")
 			//expected := "500 Line too long"
 			//expected := "500 Line too long"
-			expected := "250 2.0.0 OK : queued as s0m3l337Ha5hva1u3LOL"
+			expected := "250 2.0.0 OK : queued as "
 			if strings.Index(response, expected) != 0 {
 			if strings.Index(response, expected) != 0 {
 				t.Error("Server did not respond with", expected, ", it said:"+response, err)
 				t.Error("Server did not respond with", expected, ", it said:"+response, err)
 			}
 			}