Browse Source

more progress with decorators:
- Compressor decorator
- Header decorator
- Redis decorator

Other:
Moved events to its own package so that they can be used by backend without getting into circular dep.
Guerrilla embeds events directly instead of wrapping the function

Todo:
, cleanup / remove legacy code, finalise interfaces, change processor initialize to use events. Use a channel for backend errors.

flashmob 8 years ago
parent
commit
00dfc08445

+ 3 - 0
backends/abstract.go

@@ -4,6 +4,7 @@ import (
 	"errors"
 	"fmt"
 	"github.com/flashmob/go-guerrilla/envelope"
+	"github.com/flashmob/go-guerrilla/ev"
 	"reflect"
 	"runtime/debug"
 	"strings"
@@ -74,6 +75,8 @@ func (b *AbstractBackend) Initialize(config BackendConfig) error {
 	for i := range b.initializers {
 		b.initializers[i]()
 	}
+	//Service.Publish(ev.BackendProcConfigLoad, config)
+	Service.Publish(ev.BackendProcInitialize, config)
 	return nil
 
 	// TODO delete

+ 11 - 0
backends/backend.go

@@ -3,6 +3,7 @@ package backends
 import (
 	"fmt"
 	"github.com/flashmob/go-guerrilla/envelope"
+	"github.com/flashmob/go-guerrilla/ev"
 	"github.com/flashmob/go-guerrilla/log"
 	"strconv"
 	"strings"
@@ -10,6 +11,12 @@ import (
 
 var mainlog log.Logger
 
+var Service BackendService
+
+func init() {
+	Service = BackendService{}
+}
+
 // Backends process received mail. Depending on the implementation, they can store mail in the database,
 // write to a file, check for spam, re-transmit to another server, etc.
 // Must return an SMTP message (i.e. "250 OK") and a boolean indicating
@@ -99,3 +106,7 @@ func (br backendResult) Code() int {
 func NewBackendResult(message string) BackendResult {
 	return backendResult(message)
 }
+
+type BackendService struct {
+	ev.EventHandler
+}

+ 8 - 6
backends/dummy.go

@@ -2,12 +2,14 @@ package backends
 
 func init() {
 	backends["dummy"] = &AbstractBackend{}
-	cb := &DecoratorCallbacks{}
-	guerrillaDBcb := &DecoratorCallbacks{}
-	backends["dummy"].SetProcessors(GuerrillaDB(guerrillaDBcb), Hasher(), Debugger(cb), HeadersParser())
-	backends["dummy"].AddConfigLoader(cb.loader)
-	backends["dummy"].AddConfigLoader(guerrillaDBcb.loader)
-	backends["dummy"].AddInitializer(guerrillaDBcb.initialize)
+	debuggercb := &DecoratorCallbacks{}
+	headerCB := &DecoratorCallbacks{}
+	redisCB := &DecoratorCallbacks{}
+	backends["dummy"].SetProcessors(
+		MySql(), Redis(redisCB), Compressor(), Header(headerCB), Hasher(), Debugger(debuggercb), HeadersParser())
+	backends["dummy"].AddConfigLoader(debuggercb.loader)
+	backends["dummy"].AddConfigLoader(headerCB.loader)
+	backends["dummy"].AddConfigLoader(redisCB.loader)
 }
 
 // custom configuration we will parse from the json

+ 81 - 0
backends/p_compressor.go

@@ -0,0 +1,81 @@
+package backends
+
+import (
+	"bytes"
+	"compress/zlib"
+	"github.com/flashmob/go-guerrilla/envelope"
+	"io"
+	"sync"
+)
+
+// compressedData struct will be compressed using zlib when printed via fmt
+type compressor struct {
+	extraHeaders []byte
+	data         *bytes.Buffer
+	pool         *sync.Pool
+}
+
+// newCompressedData returns a new CompressedData
+func newCompressor() *compressor {
+	var p = sync.Pool{
+		New: func() interface{} {
+			var b bytes.Buffer
+			return &b
+		},
+	}
+	return &compressor{
+		pool: &p,
+	}
+}
+
+// Set the extraheaders and buffer of data to compress
+func (c *compressor) set(b []byte, d *bytes.Buffer) {
+	c.extraHeaders = b
+	c.data = d
+}
+
+// implement Stringer interface
+func (c *compressor) String() string {
+	if c.data == nil {
+		return ""
+	}
+	//borrow a buffer form the pool
+	b := c.pool.Get().(*bytes.Buffer)
+	// put back in the pool
+	defer func() {
+		b.Reset()
+		c.pool.Put(b)
+	}()
+
+	var r *bytes.Reader
+	w, _ := zlib.NewWriterLevel(b, zlib.BestSpeed)
+	r = bytes.NewReader(c.extraHeaders)
+	io.Copy(w, r)
+	io.Copy(w, c.data)
+	w.Close()
+	return b.String()
+}
+
+// clear it, without clearing the pool
+func (c *compressor) clear() {
+	c.extraHeaders = []byte{}
+	c.data = nil
+}
+
+// The hasher decorator computes a hash of the email for each recipient
+// It appends the hashes to envelope's Hashes slice.
+func Compressor() Decorator {
+	return func(c Processor) Processor {
+		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
+
+			compressor := newCompressor()
+			compressor.set([]byte(e.DeliveryHeader), &e.Data)
+			if e.Meta == nil {
+				e.Meta = make(map[string]interface{})
+			}
+			e.Meta["zlib-compressor"] = compressor
+
+			return c.Process(e)
+		})
+	}
+}

+ 0 - 0
backends/debugger.go → backends/p_debugger.go


+ 0 - 0
backends/hasher.go → backends/p_hasher.go


+ 49 - 0
backends/p_header.go

@@ -0,0 +1,49 @@
+package backends
+
+import (
+	"github.com/flashmob/go-guerrilla/envelope"
+	"strings"
+	"time"
+)
+
+type HeaderConfig struct {
+	PrimaryHost string `json:"primary_mail_host"`
+}
+
+// Generate the MTA delivery header
+// Sets e.DeliveryHeader with the result
+func Header(dc *DecoratorCallbacks) Decorator {
+
+	var config *HeaderConfig
+	dc.loader = func(backendConfig BackendConfig) error {
+		configType := baseConfig(&HeaderConfig{})
+		bcfg, err := ab.extractConfig(backendConfig, configType)
+		if err != nil {
+			return err
+		}
+		config = bcfg.(*HeaderConfig)
+
+		return nil
+	}
+
+	return func(c Processor) Processor {
+		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
+			to := strings.TrimSpace(e.RcptTo[0].User) + "@" + config.PrimaryHost
+			hash := "unknown"
+			if len(e.Hashes) > 0 {
+				hash = e.Hashes[0]
+			}
+			var addHead string
+			addHead += "Delivered-To: " + to + "\r\n"
+			addHead += "Received: from " + e.Helo + " (" + e.Helo + "  [" + e.RemoteAddress + "])\r\n"
+			if len(e.RcptTo) > 0 {
+				addHead += "	by " + e.RcptTo[0].Host + " with SMTP id " + hash + "@" + e.RcptTo[0].Host + ";\r\n"
+			}
+			addHead += "	" + time.Now().Format(time.RFC1123Z) + "\r\n"
+			// save the result
+			e.DeliveryHeader = addHead
+			// next processor
+			return c.Process(e)
+		})
+	}
+}

+ 0 - 0
backends/headers_parser.go → backends/p_headers_parser.go


+ 37 - 36
backends/guerrilla_db.go → backends/p_mysql.go

@@ -8,10 +8,11 @@ import (
 	"github.com/flashmob/go-guerrilla/envelope"
 	"github.com/go-sql-driver/mysql"
 
+	"github.com/flashmob/go-guerrilla/ev"
 	"runtime/debug"
 )
 
-type guerrillaDBConfig struct {
+type MysqlProcessorConfig struct {
 	NumberOfWorkers    int    `json:"save_workers_size"`
 	MysqlTable         string `json:"mail_table"`
 	MysqlDB            string `json:"mysql_db"`
@@ -23,13 +24,13 @@ type guerrillaDBConfig struct {
 	PrimaryHost        string `json:"primary_mail_host"`
 }
 
-type guerrillaDBDecorator struct {
+type MysqlProcessorDecorator struct {
 	cache  stmtCache
-	config *guerrillaDBConfig
+	config *MysqlProcessorConfig
 }
 
 // prepares the sql query with the number of rows that can be batched with it
-func (g *guerrillaDBDecorator) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
+func (g *MysqlProcessorDecorator) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
 	if rows == 0 {
 		panic("rows argument cannot be 0")
 	}
@@ -57,7 +58,7 @@ func (g *guerrillaDBDecorator) prepareInsertQuery(rows int, db *sql.DB) *sql.Stm
 	return stmt
 }
 
-func (g *guerrillaDBDecorator) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) {
+func (g *MysqlProcessorDecorator) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) {
 	var execErr error
 	defer func() {
 		if r := recover(); r != nil {
@@ -81,25 +82,14 @@ func (g *guerrillaDBDecorator) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt,
 	}
 }
 
-func GuerrillaDB(dc *DecoratorCallbacks) Decorator {
+func MySql() Decorator {
 
-	decorator := guerrillaDBDecorator{}
+	decorator := MysqlProcessorDecorator{}
 
-	var config *guerrillaDBConfig
-	dc.loader = func(backendConfig BackendConfig) error {
-		configType := baseConfig(&guerrillaDBConfig{})
-		bcfg, err := ab.extractConfig(backendConfig, configType)
-		if err != nil {
-			return err
-		}
-		config = bcfg.(*guerrillaDBConfig)
-		decorator.config = config
-		return nil
-	}
+	var config *MysqlProcessorConfig
 
 	var vals []interface{}
 	var db *sql.DB
-	var err error
 
 	mysqlConnect := func() (*sql.DB, error) {
 		conf := mysql.Config{
@@ -121,14 +111,21 @@ func GuerrillaDB(dc *DecoratorCallbacks) Decorator {
 		}
 
 	}
+	Service.Subscribe(ev.BackendProcInitialize, func(backendConfig BackendConfig) {
+
+		configType := baseConfig(&MysqlProcessorConfig{})
+		// TODO deal with error (supressed) push them on a channel? eg, Service.Errors <- service.ErrConfigLoad
+		bcfg, _ := ab.extractConfig(backendConfig, configType)
+		config = bcfg.(*MysqlProcessorConfig)
+		decorator.config = config
 
-	dc.initialize = func() error {
+		// todo backendErrors chan error
+		var err error
 		db, err = mysqlConnect()
 		if err != nil {
 			mainlog.Fatalf("cannot open mysql: %s", err)
 		}
-		return err
-	}
+	})
 
 	return func(c Processor) Processor {
 		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
@@ -139,16 +136,17 @@ func GuerrillaDB(dc *DecoratorCallbacks) Decorator {
 				hash = e.Hashes[0]
 			}
 
-			var compressor *compressedData
+			var co *compressor
 			// a compressor was set
-			if c, ok := e.Meta["gzip"]; ok {
-				body = "gzip"
-				compressor = c.(*compressedData)
-			}
-
-			// was saved in redis
-			if _, ok := e.Meta["redis"]; ok {
-				body = "redis"
+			if e.Meta != nil {
+				if c, ok := e.Meta["zlib-compressor"]; ok {
+					body = "gzip"
+					co = c.(*compressor)
+				}
+				// was saved in redis
+				if _, ok := e.Meta["redis"]; ok {
+					body = "redis"
+				}
 			}
 
 			// build the values for the query
@@ -158,12 +156,15 @@ func GuerrillaDB(dc *DecoratorCallbacks) Decorator {
 				trimToLimit(e.MailFrom.String(), 255),
 				trimToLimit(e.Subject, 255),
 				body)
-			if compressor != nil {
-				// use a compressor
-				vals = append(vals,
-					compressor.String())
+			if body == "redis" {
+				// data already saved in redis
+				vals = append(vals, "")
+			} else if co != nil {
+				// use a compressor (automatically adds e.DeliveryHeader)
+				vals = append(vals, co.String())
+				//co.clear()
 			} else {
-				vals = append(vals, e.Data.String())
+				vals = append(vals, e.DeliveryHeader+e.Data.String())
 			}
 
 			vals = append(vals,

+ 78 - 0
backends/p_redis.go

@@ -0,0 +1,78 @@
+package backends
+
+import (
+	"github.com/flashmob/go-guerrilla/envelope"
+
+	"github.com/flashmob/go-guerrilla/response"
+)
+
+type RedisProcessorConfig struct {
+	RedisExpireSeconds int    `json:"redis_expire_seconds"`
+	RedisInterface     string `json:"redis_interface"`
+}
+
+// The redis decorator stores the email data in redis
+
+func Redis(dc *DecoratorCallbacks) Decorator {
+
+	var config *RedisProcessorConfig
+	redisClient := &redisClient{}
+	dc.loader = func(backendConfig BackendConfig) error {
+		configType := baseConfig(&RedisProcessorConfig{})
+		bcfg, err := ab.extractConfig(backendConfig, configType)
+		if err != nil {
+			return err
+		}
+		config = bcfg.(*RedisProcessorConfig)
+
+		return nil
+	}
+	var redisErr error
+
+	return func(c Processor) Processor {
+		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
+			hash := ""
+			if len(e.Hashes) > 0 {
+				hash = e.Hashes[0]
+
+				var cData *compressor
+				// a compressor was set
+				if e.Meta != nil {
+					if c, ok := e.Meta["zlib-compressor"]; ok {
+						cData = c.(*compressor)
+					}
+				} else {
+					e.Meta = make(map[string]interface{})
+				}
+
+				redisErr = redisClient.redisConnection(config.RedisInterface)
+				if redisErr == nil {
+					if cData != nil {
+						// send data is using the compressor
+						_, doErr := redisClient.conn.Do("SETEX", hash, config.RedisExpireSeconds, cData)
+						if doErr != nil {
+							redisErr = doErr
+						}
+					} else {
+						// not using compressor
+						_, doErr := redisClient.conn.Do("SETEX", hash, config.RedisExpireSeconds, e.Data.String())
+						if doErr != nil {
+							redisErr = doErr
+						}
+					}
+				}
+				if redisErr != nil {
+					mainlog.WithError(redisErr).Warn("Error while talking to redis")
+					result := NewBackendResult(response.Canned.FailBackendTransaction)
+					return result, redisErr
+				} else {
+					e.Meta["redis"] = "redis" // the backend system will know to look in redis for the message data
+				}
+			} else {
+				mainlog.Error("Redis needs a Hash() process before it")
+			}
+
+			return c.Process(e)
+		})
+	}
+}

+ 7 - 6
cmd/guerrillad/serve.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 	"github.com/flashmob/go-guerrilla"
 	"github.com/flashmob/go-guerrilla/backends"
+	"github.com/flashmob/go-guerrilla/ev"
 	"github.com/flashmob/go-guerrilla/log"
 	"github.com/spf13/cobra"
 	"io/ioutil"
@@ -85,7 +86,7 @@ func sigHandler(app guerrilla.Guerrilla) {
 	}
 }
 
-func subscribeBackendEvent(event guerrilla.Event, backend backends.Backend, app guerrilla.Guerrilla) {
+func subscribeBackendEvent(event ev.Event, backend backends.Backend, app guerrilla.Guerrilla) {
 
 	app.Subscribe(event, func(cmdConfig *CmdConfig) {
 		logger, _ := log.GetLogger(cmdConfig.LogFile)
@@ -144,12 +145,12 @@ func serve(cmd *cobra.Command, args []string) {
 	if err != nil {
 		mainlog.WithError(err).Error("Error(s) when starting server(s)")
 	}
-	subscribeBackendEvent(guerrilla.EvConfigBackendConfig, backend, app)
-	subscribeBackendEvent(guerrilla.EvConfigBackendName, backend, app)
+	subscribeBackendEvent(ev.ConfigBackendConfig, backend, app)
+	subscribeBackendEvent(ev.ConfigBackendName, backend, app)
 	// Write out our PID
 	writePid(cmdConfig.PidFile)
 	// ...and write out our pid whenever the file name changes in the config
-	app.Subscribe(guerrilla.EvConfigPidFile, func(ac *guerrilla.AppConfig) {
+	app.Subscribe(ev.ConfigPidFile, func(ac *guerrilla.AppConfig) {
 		writePid(ac.PidFile)
 	})
 	// change the logger from stdrerr to one from config
@@ -184,10 +185,10 @@ func (c *CmdConfig) load(jsonBytes []byte) error {
 func (c *CmdConfig) emitChangeEvents(oldConfig *CmdConfig, app guerrilla.Guerrilla) {
 	// has backend changed?
 	if !reflect.DeepEqual((*c).BackendConfig, (*oldConfig).BackendConfig) {
-		app.Publish(guerrilla.EvConfigBackendConfig, c)
+		app.Publish(ev.ConfigBackendConfig, c)
 	}
 	if c.BackendName != oldConfig.BackendName {
-		app.Publish(guerrilla.EvConfigBackendName, c)
+		app.Publish(ev.ConfigBackendName, c)
 	}
 	// call other emitChangeEvents
 	c.AppConfig.EmitChangeEvents(&oldConfig.AppConfig, app)

+ 19 - 84
config.go

@@ -5,6 +5,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"github.com/flashmob/go-guerrilla/ev"
 	"os"
 	"reflect"
 	"strings"
@@ -37,72 +38,6 @@ type ServerConfig struct {
 	_publicKeyFile_mtime  int
 }
 
-type Event int
-
-const (
-	// when a new config was loaded
-	EvConfigNewConfig Event = iota
-	// when allowed_hosts changed
-	EvConfigAllowedHosts
-	// when pid_file changed
-	EvConfigPidFile
-	// when log_file changed
-	EvConfigLogFile
-	// when it's time to reload the main log file
-	EvConfigLogReopen
-	// when log level changed
-	EvConfigLogLevel
-	// when the backend changed
-	EvConfigBackendName
-	// when the backend's config changed
-	EvConfigBackendConfig
-	// when a new server was added
-	EvConfigEvServerNew
-	// when an existing server was removed
-	EvConfigServerRemove
-	// when a new server config was detected (general event)
-	EvConfigServerConfig
-	// when a server was enabled
-	EvConfigServerStart
-	// when a server was disabled
-	EvConfigServerStop
-	// when a server's log file changed
-	EvConfigServerLogFile
-	// when it's time to reload the server's log
-	EvConfigServerLogReopen
-	// when a server's timeout changed
-	EvConfigServerTimeout
-	// when a server's max clients changed
-	EvConfigServerMaxClients
-	// when a server's TLS config changed
-	EvConfigServerTLSConfig
-)
-
-var configEvents = [...]string{
-	"config_change:new_config",
-	"config_change:allowed_hosts",
-	"config_change:pid_file",
-	"config_change:log_file",
-	"config_change:reopen_log_file",
-	"config_change:log_level",
-	"config_change:backend_config",
-	"config_change:backend_name",
-	"server_change:new_server",
-	"server_change:remove_server",
-	"server_change:update_config",
-	"server_change:start_server",
-	"server_change:stop_server",
-	"server_change:new_log_file",
-	"server_change:reopen_log_file",
-	"server_change:timeout",
-	"server_change:max_clients",
-	"server_change:tls_config",
-}
-
-func (e Event) String() string {
-	return configEvents[e]
-}
-
 // Unmarshalls json data into AppConfig struct and any other initialization of the struct
 // also does validation, returns error if validation failed or something went wrong
 func (c *AppConfig) Load(jsonBytes []byte) error {
@@ -132,26 +67,26 @@ func (c *AppConfig) Load(jsonBytes []byte) error {
 func (c *AppConfig) EmitChangeEvents(oldConfig *AppConfig, app Guerrilla) {
 	// has config changed, general check
 	if !reflect.DeepEqual(oldConfig, c) {
-		app.Publish(EvConfigNewConfig, c)
+		app.Publish(ev.ConfigNewConfig, c)
 	}
 	// has 'allowed hosts' changed?
 	if !reflect.DeepEqual(oldConfig.AllowedHosts, c.AllowedHosts) {
-		app.Publish(EvConfigAllowedHosts, c)
+		app.Publish(ev.ConfigAllowedHosts, c)
 	}
 	// has pid file changed?
 	if strings.Compare(oldConfig.PidFile, c.PidFile) != 0 {
-		app.Publish(EvConfigPidFile, c)
+		app.Publish(ev.ConfigPidFile, c)
 	}
 	// has mainlog log changed?
 	if strings.Compare(oldConfig.LogFile, c.LogFile) != 0 {
-		app.Publish(EvConfigLogFile, c)
+		app.Publish(ev.ConfigLogFile, c)
 	} else {
 		// since config file has not changed, we reload it
-		app.Publish(EvConfigLogReopen, c)
+		app.Publish(ev.ConfigLogReopen, c)
 	}
 	// has log level changed?
 	if strings.Compare(oldConfig.LogLevel, c.LogLevel) != 0 {
-		app.Publish(EvConfigLogLevel, c)
+		app.Publish(ev.ConfigLogLevel, c)
 	}
 	// server config changes
 	oldServers := oldConfig.getServers()
@@ -164,21 +99,21 @@ func (c *AppConfig) EmitChangeEvents(oldConfig *AppConfig, app Guerrilla) {
 			newServer.emitChangeEvents(oldServer, app)
 		} else {
 			// start new server
-			app.Publish(EvConfigEvServerNew, newServer)
+			app.Publish(ev.ConfigEvServerNew, newServer)
 		}
 
 	}
 	// remove any servers that don't exist anymore
 	for _, oldserver := range oldServers {
-		app.Publish(EvConfigServerRemove, oldserver)
+		app.Publish(ev.ConfigServerRemove, oldserver)
 	}
 }
 
 // EmitLogReopen emits log reopen events using existing config
 func (c *AppConfig) EmitLogReopenEvents(app Guerrilla) {
-	app.Publish(EvConfigLogReopen, c)
+	app.Publish(ev.ConfigLogReopen, c)
 	for _, sc := range c.getServers() {
-		app.Publish(EvConfigServerLogReopen, sc)
+		app.Publish(ev.ConfigServerLogReopen, sc)
 	}
 }
 
@@ -201,33 +136,33 @@ func (sc *ServerConfig) emitChangeEvents(oldServer *ServerConfig, app Guerrilla)
 	)
 	if len(changes) > 0 {
 		// something changed in the server config
-		app.Publish(EvConfigServerConfig, sc)
+		app.Publish(ev.ConfigServerConfig, sc)
 	}
 
 	// enable or disable?
 	if _, ok := changes["IsEnabled"]; ok {
 		if sc.IsEnabled {
-			app.Publish(EvConfigServerStart, sc)
+			app.Publish(ev.ConfigServerStart, sc)
 		} else {
-			app.Publish(EvConfigServerStop, sc)
+			app.Publish(ev.ConfigServerStop, sc)
 		}
 		// do not emit any more events when IsEnabled changed
 		return
 	}
 	// log file change?
 	if _, ok := changes["LogFile"]; ok {
-		app.Publish(EvConfigServerLogFile, sc)
+		app.Publish(ev.ConfigServerLogFile, sc)
 	} else {
 		// since config file has not changed, we reload it
-		app.Publish(EvConfigServerLogReopen, sc)
+		app.Publish(ev.ConfigServerLogReopen, sc)
 	}
 	// timeout changed
 	if _, ok := changes["Timeout"]; ok {
-		app.Publish(EvConfigServerTimeout, sc)
+		app.Publish(ev.ConfigServerTimeout, sc)
 	}
 	// max_clients changed
 	if _, ok := changes["MaxClients"]; ok {
-		app.Publish(EvConfigServerMaxClients, sc)
+		app.Publish(ev.ConfigServerMaxClients, sc)
 	}
 
 	// tls changed
@@ -246,7 +181,7 @@ func (sc *ServerConfig) emitChangeEvents(oldServer *ServerConfig, app Guerrilla)
 		}
 		return false
 	}(); ok {
-		app.Publish(EvConfigServerTLSConfig, sc)
+		app.Publish(ev.ConfigServerTLSConfig, sc)
 	}
 }
 

+ 2 - 0
envelope/envelope.go

@@ -50,6 +50,8 @@ type Envelope struct {
 	Meta map[string]interface{}
 	// Hashes of each email on the rcpt
 	Hashes []string
+	//
+	DeliveryHeader string
 }
 
 // ParseHeaders parses the headers into Header field of the Envelope struct.

+ 100 - 0
ev/event.go

@@ -0,0 +1,100 @@
+package ev
+
+import (
+	evbus "github.com/asaskevich/EventBus"
+)
+
+type Event int
+
+const (
+	// when a new config was loaded
+	ConfigNewConfig Event = iota
+	// when allowed_hosts changed
+	ConfigAllowedHosts
+	// when pid_file changed
+	ConfigPidFile
+	// when log_file changed
+	ConfigLogFile
+	// when it's time to reload the main log file
+	ConfigLogReopen
+	// when log level changed
+	ConfigLogLevel
+	// when the backend changed
+	ConfigBackendName
+	// when the backend's config changed
+	ConfigBackendConfig
+	// when a new server was added
+	ConfigEvServerNew
+	// when an existing server was removed
+	ConfigServerRemove
+	// when a new server config was detected (general event)
+	ConfigServerConfig
+	// when a server was enabled
+	ConfigServerStart
+	// when a server was disabled
+	ConfigServerStop
+	// when a server's log file changed
+	ConfigServerLogFile
+	// when it's time to reload the server's log
+	ConfigServerLogReopen
+	// when a server's timeout changed
+	ConfigServerTimeout
+	// when a server's max clients changed
+	ConfigServerMaxClients
+	// when a server's TLS config changed
+	ConfigServerTLSConfig
+
+	// Load a backend processor's config todo: dont need it?
+	BackendProcConfigLoad
+	// initialize a backend processor
+	BackendProcInitialize
+	// shutdown a backend processor
+	BackendProcShutdown
+)
+
+var eventList = [...]string{
+	"config_change:new_config",
+	"config_change:allowed_hosts",
+	"config_change:pid_file",
+	"config_change:log_file",
+	"config_change:reopen_log_file",
+	"config_change:log_level",
+	"config_change:backend_config",
+	"config_change:backend_name",
+	"server_change:new_server",
+	"server_change:remove_server",
+	"server_change:update_config",
+	"server_change:start_server",
+	"server_change:stop_server",
+	"server_change:new_log_file",
+	"server_change:reopen_log_file",
+	"server_change:timeout",
+	"server_change:max_clients",
+	"server_change:tls_config",
+	"backend:proc_config_load",
+	"backend:proc_init",
+	"backend:proc_shutdown",
+}
+
+func (e Event) String() string {
+	return eventList[e]
+}
+
+type EventHandler struct {
+	*evbus.EventBus
+}
+
+func (h *EventHandler) Subscribe(topic Event, fn interface{}) error {
+	if h.EventBus == nil {
+		h.EventBus = evbus.New()
+	}
+	return h.EventBus.Subscribe(topic.String(), fn)
+}
+
+func (h *EventHandler) Publish(topic Event, args ...interface{}) {
+	h.EventBus.Publish(topic.String(), args...)
+}
+
+func (h *EventHandler) Unsubscribe(topic Event, handler interface{}) error {
+	return h.EventBus.Unsubscribe(topic.String(), handler)
+}

+ 20 - 33
guerrilla.go

@@ -2,8 +2,8 @@ package guerrilla
 
 import (
 	"errors"
-	evbus "github.com/asaskevich/EventBus"
 	"github.com/flashmob/go-guerrilla/backends"
+	"github.com/flashmob/go-guerrilla/ev"
 	"github.com/flashmob/go-guerrilla/log"
 	"sync"
 	"sync/atomic"
@@ -36,9 +36,9 @@ func (e Errors) Error() string {
 type Guerrilla interface {
 	Start() error
 	Shutdown()
-	Subscribe(topic Event, fn interface{}) error
-	Publish(topic Event, args ...interface{})
-	Unsubscribe(topic Event, handler interface{}) error
+	Subscribe(topic ev.Event, fn interface{}) error
+	Publish(topic ev.Event, args ...interface{})
+	Unsubscribe(topic ev.Event, handler interface{}) error
 	SetLogger(log.Logger)
 }
 
@@ -49,7 +49,7 @@ type guerrilla struct {
 	// guard controls access to g.servers
 	guard sync.Mutex
 	state int8
-	bus   *evbus.EventBus
+	ev.EventHandler
 	logStore
 }
 
@@ -77,7 +77,6 @@ func New(ac *AppConfig, b backends.Backend, l log.Logger) (Guerrilla, error) {
 		Config:  *ac, // take a local copy
 		servers: make(map[string]*server, len(ac.Servers)),
 		backend: b,
-		bus:     evbus.New(),
 	}
 	g.storeMainlog(l)
 
@@ -174,12 +173,12 @@ func (g *guerrilla) mapServers(callback func(*server)) map[string]*server {
 func (g *guerrilla) subscribeEvents() {
 
 	// main config changed
-	g.Subscribe(EvConfigNewConfig, func(c *AppConfig) {
+	g.Subscribe(ev.ConfigNewConfig, func(c *AppConfig) {
 		g.setConfig(c)
 	})
 
 	// allowed_hosts changed, set for all servers
-	g.Subscribe(EvConfigAllowedHosts, func(c *AppConfig) {
+	g.Subscribe(ev.ConfigAllowedHosts, func(c *AppConfig) {
 		g.mapServers(func(server *server) {
 			server.setAllowedHosts(c.AllowedHosts)
 		})
@@ -187,7 +186,7 @@ func (g *guerrilla) subscribeEvents() {
 	})
 
 	// the main log file changed
-	g.Subscribe(EvConfigLogFile, func(c *AppConfig) {
+	g.Subscribe(ev.ConfigLogFile, func(c *AppConfig) {
 		var err error
 		var l log.Logger
 		if l, err = log.GetLogger(c.LogFile); err == nil {
@@ -204,13 +203,13 @@ func (g *guerrilla) subscribeEvents() {
 	})
 
 	// re-open the main log file (file not changed)
-	g.Subscribe(EvConfigLogReopen, func(c *AppConfig) {
+	g.Subscribe(ev.ConfigLogReopen, func(c *AppConfig) {
 		g.mainlog().Reopen()
 		g.mainlog().Infof("re-opened main log file [%s]", c.LogFile)
 	})
 
 	// when log level changes, apply to mainlog and server logs
-	g.Subscribe(EvConfigLogLevel, func(c *AppConfig) {
+	g.Subscribe(ev.ConfigLogLevel, func(c *AppConfig) {
 		g.mainlog().SetLevel(c.LogLevel)
 		g.mapServers(func(server *server) {
 			server.log.SetLevel(c.LogLevel)
@@ -219,12 +218,12 @@ func (g *guerrilla) subscribeEvents() {
 	})
 
 	// server config was updated
-	g.Subscribe(EvConfigServerConfig, func(sc *ServerConfig) {
+	g.Subscribe(ev.ConfigServerConfig, func(sc *ServerConfig) {
 		g.setServerConfig(sc)
 	})
 
 	// add a new server to the config & start
-	g.Subscribe(EvConfigEvServerNew, func(sc *ServerConfig) {
+	g.Subscribe(ev.ConfigEvServerNew, func(sc *ServerConfig) {
 		if _, err := g.findServer(sc.ListenInterface); err != nil {
 			// not found, lets add it
 			if err := g.makeServers(); err != nil {
@@ -241,7 +240,7 @@ func (g *guerrilla) subscribeEvents() {
 		}
 	})
 	// start a server that already exists in the config and has been enabled
-	g.Subscribe(EvConfigServerStart, func(sc *ServerConfig) {
+	g.Subscribe(ev.ConfigServerStart, func(sc *ServerConfig) {
 		if server, err := g.findServer(sc.ListenInterface); err == nil {
 			if server.state == ServerStateStopped || server.state == ServerStateNew {
 				g.mainlog().Infof("Starting server [%s]", server.listenInterface)
@@ -253,7 +252,7 @@ func (g *guerrilla) subscribeEvents() {
 		}
 	})
 	// stop running a server
-	g.Subscribe(EvConfigServerStop, func(sc *ServerConfig) {
+	g.Subscribe(ev.ConfigServerStop, func(sc *ServerConfig) {
 		if server, err := g.findServer(sc.ListenInterface); err == nil {
 			if server.state == ServerStateRunning {
 				server.Shutdown()
@@ -262,7 +261,7 @@ func (g *guerrilla) subscribeEvents() {
 		}
 	})
 	// server was removed from config
-	g.Subscribe(EvConfigServerRemove, func(sc *ServerConfig) {
+	g.Subscribe(ev.ConfigServerRemove, func(sc *ServerConfig) {
 		if server, err := g.findServer(sc.ListenInterface); err == nil {
 			server.Shutdown()
 			g.removeServer(sc.ListenInterface)
@@ -271,7 +270,7 @@ func (g *guerrilla) subscribeEvents() {
 	})
 
 	// TLS changes
-	g.Subscribe(EvConfigServerTLSConfig, func(sc *ServerConfig) {
+	g.Subscribe(ev.ConfigServerTLSConfig, func(sc *ServerConfig) {
 		if server, err := g.findServer(sc.ListenInterface); err == nil {
 			if err := server.configureSSL(); err == nil {
 				g.mainlog().Infof("Server [%s] new TLS configuration loaded", sc.ListenInterface)
@@ -281,19 +280,19 @@ func (g *guerrilla) subscribeEvents() {
 		}
 	})
 	// when server's timeout change.
-	g.Subscribe(EvConfigServerTimeout, func(sc *ServerConfig) {
+	g.Subscribe(ev.ConfigServerTimeout, func(sc *ServerConfig) {
 		g.mapServers(func(server *server) {
 			server.setTimeout(sc.Timeout)
 		})
 	})
 	// when server's max clients change.
-	g.Subscribe(EvConfigServerMaxClients, func(sc *ServerConfig) {
+	g.Subscribe(ev.ConfigServerMaxClients, func(sc *ServerConfig) {
 		g.mapServers(func(server *server) {
 			// TODO resize the pool somehow
 		})
 	})
 	// when a server's log file changes
-	g.Subscribe(EvConfigServerLogFile, func(sc *ServerConfig) {
+	g.Subscribe(ev.ConfigServerLogFile, func(sc *ServerConfig) {
 		if server, err := g.findServer(sc.ListenInterface); err == nil {
 			var err error
 			var l log.Logger
@@ -315,7 +314,7 @@ func (g *guerrilla) subscribeEvents() {
 		}
 	})
 	// when the daemon caught a sighup, event for individual server
-	g.Subscribe(EvConfigServerLogReopen, func(sc *ServerConfig) {
+	g.Subscribe(ev.ConfigServerLogReopen, func(sc *ServerConfig) {
 		if server, err := g.findServer(sc.ListenInterface); err == nil {
 			server.log.Reopen()
 			g.mainlog().Infof("Server [%s] re-opened log file [%s]", sc.ListenInterface, sc.LogFile)
@@ -398,18 +397,6 @@ func (g *guerrilla) Shutdown() {
 	}
 }
 
-func (g *guerrilla) Subscribe(topic Event, fn interface{}) error {
-	return g.bus.Subscribe(topic.String(), fn)
-}
-
-func (g *guerrilla) Publish(topic Event, args ...interface{}) {
-	g.bus.Publish(topic.String(), args...)
-}
-
-func (g *guerrilla) Unsubscribe(topic Event, handler interface{}) error {
-	return g.bus.Unsubscribe(topic.String(), handler)
-}
-
 func (g *guerrilla) SetLogger(l log.Logger) {
 	g.storeMainlog(l)
 }