Browse Source

- AddProcessor function to give more abstraction into the processor creation
- Made some symbols public, ie. Service.ExtractConfig, BaseConfig
- Envelope: added a NewRreader() function that returns a reader for the processed email
- Moved resetting the envelope from client.go to envelope itself.
- Added a Reseed() function that seeds the envelope with data at the start of a connection

flashmob 8 years ago
parent
commit
6c1c48bc9a

+ 13 - 2
backends/backend.go

@@ -40,7 +40,7 @@ type Backend interface {
 type BackendConfig map[string]interface{}
 type BackendConfig map[string]interface{}
 
 
 // All config structs extend from this
 // All config structs extend from this
-type baseConfig interface{}
+type BaseConfig interface{}
 
 
 type saveStatus struct {
 type saveStatus struct {
 	err      error
 	err      error
@@ -160,13 +160,24 @@ func (b *BackendService) Shutdown() {
 	b.Shutdowners = make([]ProcessorShutdowner, 0)
 	b.Shutdowners = make([]ProcessorShutdowner, 0)
 }
 }
 
 
+// AddProcessor adds a new processor, which becomes available to the backend_config.process_line option
+func (b *BackendService) AddProcessor(name string, p ProcessorConstructor) {
+	// wrap in a constructor since we want to defer calling it
+	var c ProcessorConstructor
+	c = func() Decorator {
+		return p()
+	}
+	// add to our processors list
+	Processors[strings.ToLower(name)] = c
+}
+
 // extractConfig loads the backend config. It has already been unmarshalled
 // extractConfig loads the backend config. It has already been unmarshalled
 // configData contains data from the main config file's "backend_config" value
 // configData contains data from the main config file's "backend_config" value
 // configType is a Processor's specific config value.
 // configType is a Processor's specific config value.
 // The reason why using reflection is because we'll get a nice error message if the field is missing
 // The reason why using reflection is because we'll get a nice error message if the field is missing
 // the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
 // the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
 // error messages
 // error messages
-func (b *BackendService) extractConfig(configData BackendConfig, configType baseConfig) (interface{}, error) {
+func (b *BackendService) ExtractConfig(configData BackendConfig, configType BaseConfig) (interface{}, error) {
 	// Use reflection so that we can provide a nice error message
 	// Use reflection so that we can provide a nice error message
 	s := reflect.ValueOf(configType).Elem() // so that we can set the values
 	s := reflect.ValueOf(configType).Elem() // so that we can set the values
 	m := reflect.ValueOf(configType).Elem()
 	m := reflect.ValueOf(configType).Elem()

+ 2 - 2
backends/gateway.go

@@ -147,14 +147,14 @@ func (gw *BackendGateway) newProcessorLine() Processor {
 
 
 // loadConfig loads the config for the GatewayConfig
 // loadConfig loads the config for the GatewayConfig
 func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
 func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
-	configType := baseConfig(&GatewayConfig{})
+	configType := BaseConfig(&GatewayConfig{})
 	if _, ok := cfg["process_line"]; !ok {
 	if _, ok := cfg["process_line"]; !ok {
 		cfg["process_line"] = "Debugger"
 		cfg["process_line"] = "Debugger"
 	}
 	}
 	if _, ok := cfg["save_workers_size"]; !ok {
 	if _, ok := cfg["save_workers_size"]; !ok {
 		cfg["save_workers_size"] = 1
 		cfg["save_workers_size"] = 1
 	}
 	}
-	bcfg, err := Service.extractConfig(cfg, configType)
+	bcfg, err := Service.ExtractConfig(cfg, configType)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 2 - 2
backends/guerrilla_db_redis.go

@@ -84,8 +84,8 @@ func convertError(name string) error {
 // Now we need to convert each type and copy into the guerrillaDBAndRedisConfig struct
 // Now we need to convert each type and copy into the guerrillaDBAndRedisConfig struct
 
 
 func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig BackendConfig) (err error) {
 func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig BackendConfig) (err error) {
-	configType := baseConfig(&guerrillaDBAndRedisConfig{})
-	bcfg, err := Service.extractConfig(backendConfig, configType)
+	configType := BaseConfig(&guerrillaDBAndRedisConfig{})
+	bcfg, err := Service.ExtractConfig(backendConfig, configType)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 2 - 2
backends/p_debugger.go

@@ -28,8 +28,8 @@ type debuggerConfig struct {
 func Debugger() Decorator {
 func Debugger() Decorator {
 	var config *debuggerConfig
 	var config *debuggerConfig
 	initFunc := Initialize(func(backendConfig BackendConfig) error {
 	initFunc := Initialize(func(backendConfig BackendConfig) error {
-		configType := baseConfig(&debuggerConfig{})
-		bcfg, err := Service.extractConfig(backendConfig, configType)
+		configType := BaseConfig(&debuggerConfig{})
+		bcfg, err := Service.ExtractConfig(backendConfig, configType)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}

+ 2 - 2
backends/p_header.go

@@ -37,8 +37,8 @@ func Header() Decorator {
 	var config *HeaderConfig
 	var config *HeaderConfig
 
 
 	Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
 	Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
-		configType := baseConfig(&HeaderConfig{})
-		bcfg, err := Service.extractConfig(backendConfig, configType)
+		configType := BaseConfig(&HeaderConfig{})
+		bcfg, err := Service.ExtractConfig(backendConfig, configType)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}

+ 4 - 4
backends/p_mysql.go

@@ -96,7 +96,7 @@ func (g *MysqlProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
 	}
 	}
 	stmt, sqlErr := db.Prepare(sqlstr)
 	stmt, sqlErr := db.Prepare(sqlstr)
 	if sqlErr != nil {
 	if sqlErr != nil {
-		Log().WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
+		Log().WithError(sqlErr).Panic("failed while db.Prepare(INSERT...)")
 	}
 	}
 	// cache it
 	// cache it
 	g.cache[rows-1] = stmt
 	g.cache[rows-1] = stmt
@@ -134,8 +134,8 @@ func MySql() Decorator {
 	mp := &MysqlProcessor{}
 	mp := &MysqlProcessor{}
 
 
 	Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
 	Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
-		configType := baseConfig(&MysqlProcessorConfig{})
-		bcfg, err := Service.extractConfig(backendConfig, configType)
+		configType := BaseConfig(&MysqlProcessorConfig{})
+		bcfg, err := Service.ExtractConfig(backendConfig, configType)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -143,7 +143,7 @@ func MySql() Decorator {
 		mp.config = config
 		mp.config = config
 		db, err = mp.connect(config)
 		db, err = mp.connect(config)
 		if err != nil {
 		if err != nil {
-			Log().Fatalf("cannot open mysql: %s", err)
+			Log().Error("cannot open mysql: %s", err)
 			return err
 			return err
 		}
 		}
 		return nil
 		return nil

+ 2 - 2
backends/p_redis.go

@@ -62,8 +62,8 @@ func Redis() Decorator {
 	redisClient := &RedisProcessor{}
 	redisClient := &RedisProcessor{}
 	// read the config into RedisProcessorConfig
 	// read the config into RedisProcessorConfig
 	Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
 	Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
-		configType := baseConfig(&RedisProcessorConfig{})
-		bcfg, err := Service.extractConfig(backendConfig, configType)
+		configType := BaseConfig(&RedisProcessorConfig{})
+		bcfg, err := Service.ExtractConfig(backendConfig, configType)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}

+ 2 - 11
client.go

@@ -111,11 +111,7 @@ func (c *client) sendResponse(r ...interface{}) {
 // -End of DATA command
 // -End of DATA command
 // TLS handhsake
 // TLS handhsake
 func (c *client) resetTransaction() {
 func (c *client) resetTransaction() {
-	c.MailFrom = envelope.EmailAddress{}
-	c.RcptTo = []envelope.EmailAddress{}
-	c.Data.Reset()
-	c.Subject = ""
-	c.Header = nil
+	c.Envelope.ResetTransaction()
 }
 }
 
 
 // isInTransaction returns true if the connection is inside a transaction.
 // isInTransaction returns true if the connection is inside a transaction.
@@ -162,18 +158,13 @@ func (c *client) init(conn net.Conn, clientID uint64) {
 	// reset our reader & writer
 	// reset our reader & writer
 	c.bufout.Reset(conn)
 	c.bufout.Reset(conn)
 	c.bufin.Reset(conn)
 	c.bufin.Reset(conn)
-	// reset the data buffer, keep it allocated
-	c.Data.Reset()
 	// reset session data
 	// reset session data
 	c.state = 0
 	c.state = 0
 	c.KilledAt = time.Time{}
 	c.KilledAt = time.Time{}
 	c.ConnectedAt = time.Now()
 	c.ConnectedAt = time.Now()
 	c.ID = clientID
 	c.ID = clientID
-	c.TLS = false
 	c.errors = 0
 	c.errors = 0
-	c.Helo = ""
-	c.Header = nil
-	c.RemoteAddress = getRemoteAddr(conn)
+	c.Envelope.Reseed(getRemoteAddr(conn), clientID)
 
 
 }
 }
 
 

+ 6 - 3
cmd/guerrillad/serve.go

@@ -94,11 +94,14 @@ func subscribeBackendEvent(event guerrilla.Event, backend backends.Backend, app
 			logger.WithError(err).Warn("Backend failed to shutdown")
 			logger.WithError(err).Warn("Backend failed to shutdown")
 			return
 			return
 		}
 		}
-		backend, err = backends.New(cmdConfig.BackendName, cmdConfig.BackendConfig, logger)
-		if err != nil {
-			logger.WithError(err).Fatalf("Error while loading the backend %q",
+		newBackend, newErr := backends.New(cmdConfig.BackendName, cmdConfig.BackendConfig, logger)
+		if newErr != nil {
+			// this will continue using old backend
+			logger.WithError(newErr).Error("Error while loading the backend %q",
 				cmdConfig.BackendName)
 				cmdConfig.BackendName)
 		} else {
 		} else {
+			// swap to the bew backend (assuming old backend was shutdown so it can be safely swapped)
+			backend = newBackend
 			logger.Info("Backend started:", cmdConfig.BackendName)
 			logger.Info("Backend started:", cmdConfig.BackendName)
 		}
 		}
 	})
 	})

+ 46 - 11
envelope/envelope.go

@@ -9,6 +9,7 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/sloonz/go-qprintable"
 	"github.com/sloonz/go-qprintable"
 	"gopkg.in/iconv.v1"
 	"gopkg.in/iconv.v1"
+	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"net/textproto"
 	"net/textproto"
 	"regexp"
 	"regexp"
@@ -16,6 +17,8 @@ import (
 	"time"
 	"time"
 )
 )
 
 
+const maxHeaderChunk = iota + (1<<10)*3 // 3KB
+
 // EmailAddress encodes an email address of the form `<user@host>`
 // EmailAddress encodes an email address of the form `<user@host>`
 type EmailAddress struct {
 type EmailAddress struct {
 	User string
 	User string
@@ -56,8 +59,6 @@ type Envelope struct {
 	DeliveryHeader string
 	DeliveryHeader string
 	// Email(s) will be queued with this id
 	// Email(s) will be queued with this id
 	QueuedId string
 	QueuedId string
-	// ClientID is a unique id given to the client on connect
-	ClientID uint64
 }
 }
 
 
 func NewEnvelope(remoteAddr string, clientID uint64) *Envelope {
 func NewEnvelope(remoteAddr string, clientID uint64) *Envelope {
@@ -65,11 +66,14 @@ func NewEnvelope(remoteAddr string, clientID uint64) *Envelope {
 	return &Envelope{
 	return &Envelope{
 		RemoteAddress: remoteAddr,
 		RemoteAddress: remoteAddr,
 		Info:          make(map[string]interface{}),
 		Info:          make(map[string]interface{}),
-		QueuedId:      fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID)))),
-		ClientID:      clientID,
+		QueuedId:      queuedID(clientID),
 	}
 	}
 }
 }
 
 
+func queuedID(clientID uint64) string {
+	return fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID))))
+}
+
 // ParseHeaders parses the headers into Header field of the Envelope struct.
 // ParseHeaders parses the headers into Header field of the Envelope struct.
 // Data buffer must be full before calling.
 // Data buffer must be full before calling.
 // It assumes that at most 30kb of email data can be a header
 // It assumes that at most 30kb of email data can be a header
@@ -79,16 +83,16 @@ func (e *Envelope) ParseHeaders() error {
 	if e.Header != nil {
 	if e.Header != nil {
 		return errors.New("Headers already parsed")
 		return errors.New("Headers already parsed")
 	}
 	}
-	b2 := bytes.NewBuffer(e.Data.Bytes())
+	buf := bytes.NewBuffer(e.Data.Bytes())
 	// find where the header ends, assuming that over 30 kb would be max
 	// find where the header ends, assuming that over 30 kb would be max
-	max := 1024 * 30
-	if b2.Len() < max {
-		max = b2.Len()
+	max := maxHeaderChunk
+	if buf.Len() < max {
+		max = buf.Len()
 	}
 	}
 	// read in the chunk which we'll scan for the header
 	// read in the chunk which we'll scan for the header
 	chunk := make([]byte, max)
 	chunk := make([]byte, max)
-	b2.Read(chunk)
-	headerEnd := strings.Index(string(chunk), "\n\n") // the first two new-lines is the end of header
+	buf.Read(chunk)
+	headerEnd := strings.Index(string(chunk), "\n\n") // the first two new-lines chars are the End Of Header
 	if headerEnd > -1 {
 	if headerEnd > -1 {
 		header := chunk[0:headerEnd]
 		header := chunk[0:headerEnd]
 		headerReader := textproto.NewReader(bufio.NewReader(bytes.NewBuffer(header)))
 		headerReader := textproto.NewReader(bufio.NewReader(bytes.NewBuffer(header)))
@@ -105,15 +109,46 @@ func (e *Envelope) ParseHeaders() error {
 	return err
 	return err
 }
 }
 
 
-// String converts the email to string. Typically, you would want to use the compressor processor for more efficiency
+// Returns a new reader for reading the email contents, including the delivery headers
+func (e *Envelope) NewReader() io.Reader {
+	return io.MultiReader(
+		strings.NewReader(e.DeliveryHeader),
+		bytes.NewReader(e.Data.Bytes()),
+	)
+}
+
+// String converts the email to string.
+// Typically, you would want to use the compressor guerrilla.Processor for more efficiency, or use NewReader
 func (e *Envelope) String() string {
 func (e *Envelope) String() string {
 	return e.DeliveryHeader + e.Data.String()
 	return e.DeliveryHeader + e.Data.String()
 }
 }
 
 
+// ResetTransaction is called when the transaction is reset (but save connection)
+func (e *Envelope) ResetTransaction() {
+	e.MailFrom = EmailAddress{}
+	e.RcptTo = []EmailAddress{}
+	// reset the data buffer, keep it allocated
+	e.Data.Reset()
+}
+
+// Seed is called when used with a new connection, once it's accepted
+func (e *Envelope) Reseed(remoteAddr string, clientID uint64) {
+	e.Subject = ""
+	e.RemoteAddress = remoteAddr
+	e.Helo = ""
+	e.Header = nil
+	e.TLS = false
+	e.Hashes = make([]string, 0)
+	e.DeliveryHeader = ""
+	e.Info = make(map[string]interface{})
+	e.QueuedId = queuedID(clientID)
+}
+
 var mimeRegex, _ = regexp.Compile(`=\?(.+?)\?([QBqp])\?(.+?)\?=`)
 var mimeRegex, _ = regexp.Compile(`=\?(.+?)\?([QBqp])\?(.+?)\?=`)
 
 
 // Decode strings in Mime header format
 // Decode strings in Mime header format
 // eg. =?ISO-2022-JP?B?GyRCIVo9dztSOWJAOCVBJWMbKEI=?=
 // eg. =?ISO-2022-JP?B?GyRCIVo9dztSOWJAOCVBJWMbKEI=?=
+// This function uses GNU iconv under the hood, for more charset support than in Go's library
 func MimeHeaderDecode(str string) string {
 func MimeHeaderDecode(str string) string {
 
 
 	matched := mimeRegex.FindAllStringSubmatch(str, -1)
 	matched := mimeRegex.FindAllStringSubmatch(str, -1)

+ 1 - 0
server.go

@@ -476,6 +476,7 @@ func (server *server) handleClient(client *client) {
 					client.kill()
 					client.kill()
 				}
 				}
 				server.log.WithError(err).Warn("Error reading data")
 				server.log.WithError(err).Warn("Error reading data")
+				client.resetTransaction()
 				break
 				break
 			}
 			}