Browse Source

ported guerrilla_db_redis to new backend system

flashmob 8 years ago
parent
commit
9c09c5743d

+ 4 - 0
backends/backend.go

@@ -130,6 +130,10 @@ func (e Errors) Error() string {
 func New(backendName string, backendConfig BackendConfig, l log.Logger) (Backend, error) {
 	Svc.StoreMainlog(l)
 	if backend, found := backends[backendName]; found {
+		err := backend.Initialize(backendConfig)
+		if err != nil {
+			return nil, fmt.Errorf("error while initializing the backend: %s", err)
+		}
 		b = backend
 	} else {
 		gateway := &BackendGateway{config: backendConfig}

+ 36 - 0
backends/dummy.go

@@ -1 +1,37 @@
 package backends
+
+func init() {
+	// decorator pattern
+	backends["dummy"] = &AbstractBackend{
+		extend: &DummyBackend{},
+	}
+}
+
+// custom configuration we will parse from the json
+// see guerrillaDBAndRedisConfig struct for a more complete example
+type dummyConfig struct {
+	LogReceivedMails bool `json:"log_received_mails"`
+}
+
+// putting all the paces we need together
+type DummyBackend struct {
+	config dummyConfig
+	// embed functions form AbstractBackend so that DummyBackend satisfies the Backend interface
+	AbstractBackend
+}
+
+// Backends should implement this method and set b.config field with a custom config struct
+// Therefore, your implementation would have a custom config type instead of dummyConfig
+func (b *DummyBackend) loadConfig(backendConfig BackendConfig) (err error) {
+	// Load the backend config for the backend. It has already been unmarshalled
+	// from the main config file 'backend' config "backend_config"
+	// Now we need to convert each type and copy into the dummyConfig struct
+	configType := baseConfig(&dummyConfig{})
+	bcfg, err := b.extractConfig(backendConfig, configType)
+	if err != nil {
+		return err
+	}
+	m := bcfg.(*dummyConfig)
+	b.config = *m
+	return nil
+}

+ 1 - 1
backends/p_compressor.go

@@ -91,7 +91,7 @@ func (c *compressor) clear() {
 
 func Compressor() Decorator {
 	return func(c Processor) Processor {
-		return ProcessorFunc(func(e *envelope.Envelope, task SelectTask) (Result, error) {
+		return ProcessWith(func(e *envelope.Envelope, task SelectTask) (Result, error) {
 			if task == TaskSaveMail {
 				compressor := newCompressor()
 				compressor.set([]byte(e.DeliveryHeader), &e.Data)

+ 1 - 1
backends/p_debugger.go

@@ -38,7 +38,7 @@ func Debugger() Decorator {
 	})
 	Svc.AddInitializer(initFunc)
 	return func(c Processor) Processor {
-		return ProcessorFunc(func(e *envelope.Envelope, task SelectTask) (Result, error) {
+		return ProcessWith(func(e *envelope.Envelope, task SelectTask) (Result, error) {
 			if task == TaskSaveMail {
 				if config.LogReceivedMails {
 					Log().Infof("Mail from: %s / to: %v", e.MailFrom.String(), e.RcptTo)

+ 117 - 138
backends/guerrilla_db_redis.go → backends/p_guerrilla_db_redis.go

@@ -1,59 +1,45 @@
 package backends
 
-// This backend is presented here as an example only, please modify it to your needs.
-// The backend stores the email data in Redis.
-// Other meta-information is stored in MySQL to be joined later.
-// A lot of email gets discarded without viewing on Guerrilla Mail,
-// so it's much faster to put in Redis, where other programs can
-// process it later, without touching the disk.
-//
-// Some features:
-// - It batches the SQL inserts into a single query and inserts either after a time threshold or if the batch is full
-// - If the mysql driver crashes, it's able to recover, log the incident and resume again.
-// - It also does a clean shutdown - it tries to save everything before returning
-//
-// Short history:
-// Started with issuing an insert query for each single email and another query to update the tally
-// Then applied the following optimizations:
-// - Moved tally updates to another background process which does the tallying in a single query
-// - Changed the MySQL queries to insert in batch
-// - Made a Compressor that recycles buffers using sync.Pool
-// The result was around 400% speed improvement. If you know of any more improvements, please share!
-// - Added the recovery mechanism,
-
 import (
-	"fmt"
-
-	"time"
-
-	"github.com/garyburd/redigo/redis"
-
 	"bytes"
 	"compress/zlib"
 	"database/sql"
-	_ "github.com/go-sql-driver/mysql"
-
+	"fmt"
+	"github.com/flashmob/go-guerrilla/envelope"
+	"github.com/garyburd/redigo/redis"
 	"github.com/go-sql-driver/mysql"
 	"io"
 	"runtime/debug"
 	"strings"
 	"sync"
+	"time"
 )
 
+// ----------------------------------------------------------------------------------
+// Processor Name: Guerrilla-reds-db
+// ----------------------------------------------------------------------------------
+// Description   : Saves the body to redis, meta data to mysql. Example
+// ----------------------------------------------------------------------------------
+// Config Options: ...
+// --------------:-------------------------------------------------------------------
+// Input         : envelope
+// ----------------------------------------------------------------------------------
+// Output        :
+// ----------------------------------------------------------------------------------
+func init() {
+	processors["guerrilla-redis-db"] = func() Decorator {
+		return GuerrillaDbReddis()
+	}
+}
+
 // how many rows to batch at a time
 const GuerrillaDBAndRedisBatchMax = 2
 
 // tick on every...
 const GuerrillaDBAndRedisBatchTimeout = time.Second * 3
 
-func init() {
-	backends["guerrilla-db-redis"] = &AbstractBackend{
-		extend: &GuerrillaDBAndRedisBackend{}}
-}
-
 type GuerrillaDBAndRedisBackend struct {
-	AbstractBackend
-	config    guerrillaDBAndRedisConfig
+	config    *guerrillaDBAndRedisConfig
 	batcherWg sync.WaitGroup
 	// cache prepared queries
 	cache stmtCache
@@ -83,12 +69,12 @@ func convertError(name string) error {
 // Now we need to convert each type and copy into the guerrillaDBAndRedisConfig struct
 func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig BackendConfig) (err error) {
 	configType := baseConfig(&guerrillaDBAndRedisConfig{})
-	bcfg, err := g.extractConfig(backendConfig, configType)
+	bcfg, err := Svc.ExtractConfig(backendConfig, configType)
 	if err != nil {
 		return err
 	}
 	m := bcfg.(*guerrillaDBAndRedisConfig)
-	g.config = *m
+	g.config = m
 	return nil
 }
 
@@ -285,8 +271,6 @@ func trimToLimit(str string, limit int) string {
 	return ret
 }
 
-var workerId = 0
-
 func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
 	conf := mysql.Config{
 		User:         g.config.MysqlUser,
@@ -307,20 +291,47 @@ func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
 
 }
 
-func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePayload) {
-	var to, body string
+func (c *redisClient) redisConnection(redisInterface string) (err error) {
+	if c.isConnected == false {
+		c.conn, err = redis.Dial("tcp", redisInterface)
+		if err != nil {
+			// handle error
+			return err
+		}
+		c.isConnected = true
+	}
+	return nil
+}
 
-	var redisErr error
+var workerId = 0
 
-	workerId++
+// GuerrillaDbReddis is a specialized processor for Guerrilla mail. It is here as an example.
+// It's an example of a 'monolithic' processor.
+func GuerrillaDbReddis() Decorator {
 
+	g := GuerrillaDBAndRedisBackend{}
 	redisClient := &redisClient{}
+
 	var db *sql.DB
-	var err error
-	db, err = g.mysqlConnect()
-	if err != nil {
-		mainlog.Fatalf("cannot open mysql: %s", err)
-	}
+	var to, body string
+
+	var redisErr error
+
+	Svc.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
+		configType := BaseConfig(&guerrillaDBAndRedisConfig{})
+		bcfg, err := Svc.ExtractConfig(backendConfig, configType)
+		if err != nil {
+			return err
+		}
+		g.config = bcfg.(*guerrillaDBAndRedisConfig)
+		db, err = g.mysqlConnect()
+		if err != nil {
+			mainlog.Fatalf("cannot open mysql: %s", err)
+		}
+		return nil
+	}))
+
+	workerId++
 
 	// start the query SQL batching where we will send data via the feeder channel
 	feeder := make(chan []interface{}, 1)
@@ -353,96 +364,64 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePaylo
 	}()
 	var vals []interface{}
 	data := newCompressedData()
-	//  receives values from the channel repeatedly until it is closed.
-
-	for {
-		payload := <-saveMailChan
-		if payload == nil {
-			mainlog.Debug("No more saveMailChan payload")
-			return
-		}
-		mainlog.Debug("Got mail from chan", payload.mail.RemoteAddress)
-		to = trimToLimit(strings.TrimSpace(payload.recipient.User)+"@"+g.config.PrimaryHost, 255)
-		payload.mail.Helo = trimToLimit(payload.mail.Helo, 255)
-		payload.recipient.Host = trimToLimit(payload.recipient.Host, 255)
-		ts := fmt.Sprintf("%d", time.Now().UnixNano())
-		payload.mail.ParseHeaders()
-		hash := MD5Hex(
-			to,
-			payload.mail.MailFrom.String(),
-			payload.mail.Subject,
-			ts)
-		// Add extra headers
-		var addHead string
-		addHead += "Delivered-To: " + to + "\r\n"
-		addHead += "Received: from " + payload.mail.Helo + " (" + payload.mail.Helo + "  [" + payload.mail.RemoteAddress + "])\r\n"
-		addHead += "	by " + payload.recipient.Host + " with SMTP id " + hash + "@" + payload.recipient.Host + ";\r\n"
-		addHead += "	" + time.Now().Format(time.RFC1123Z) + "\r\n"
-
-		// data will be compressed when printed, with addHead added to beginning
-
-		data.set([]byte(addHead), &payload.mail.Data)
-		body = "gzencode"
-
-		// data will be written to redis - it implements the Stringer interface, redigo uses fmt to
-		// print the data to redis.
-
-		redisErr = redisClient.redisConnection(g.config.RedisInterface)
-		if redisErr == nil {
-			_, doErr := redisClient.conn.Do("SETEX", hash, g.config.RedisExpireSeconds, data)
-			if doErr == nil {
-				body = "redis" // the backend system will know to look in redis for the message data
-				data.clear()   // blank
-			}
-		} else {
-			mainlog.WithError(redisErr).Warn("Error while connecting redis")
-		}
-
-		vals = []interface{}{} // clear the vals
-		vals = append(vals,
-			trimToLimit(to, 255),
-			trimToLimit(payload.mail.MailFrom.String(), 255),
-			trimToLimit(payload.mail.Subject, 255),
-			body,
-			data.String(),
-			hash,
-			trimToLimit(to, 255),
-			payload.mail.RemoteAddress,
-			trimToLimit(payload.mail.MailFrom.String(), 255),
-			payload.mail.TLS)
-		feeder <- vals
-		payload.savedNotify <- &saveStatus{nil, hash}
-
-	}
-}
-
-func (c *redisClient) redisConnection(redisInterface string) (err error) {
-	if c.isConnected == false {
-		c.conn, err = redis.Dial("tcp", redisInterface)
-		if err != nil {
-			// handle error
-			return err
-		}
-		c.isConnected = true
-	}
-	return nil
-}
 
-// test database connection settings
-func (g *GuerrillaDBAndRedisBackend) testSettings() (err error) {
-
-	var db *sql.DB
-
-	if db, err = g.mysqlConnect(); err != nil {
-		err = fmt.Errorf("MySql cannot connect, check your settings: %s", err)
-	} else {
-		db.Close()
-	}
+	return func(c Processor) Processor {
+		return ProcessWith(func(e *envelope.Envelope, task SelectTask) (Result, error) {
+			if task == TaskSaveMail {
+				mainlog.Debug("Got mail from chan", e.RemoteAddress)
+				to = trimToLimit(strings.TrimSpace(e.RcptTo[0].User)+"@"+g.config.PrimaryHost, 255)
+				e.Helo = trimToLimit(e.Helo, 255)
+				e.RcptTo[0].Host = trimToLimit(e.RcptTo[0].Host, 255)
+				ts := fmt.Sprintf("%d", time.Now().UnixNano())
+				e.ParseHeaders()
+				hash := MD5Hex(
+					to,
+					e.MailFrom.String(),
+					e.Subject,
+					ts)
+				// Add extra headers
+				var addHead string
+				addHead += "Delivered-To: " + to + "\r\n"
+				addHead += "Received: from " + e.Helo + " (" + e.Helo + "  [" + e.RemoteAddress + "])\r\n"
+				addHead += "	by " + e.RcptTo[0].Host + " with SMTP id " + hash + "@" + e.RcptTo[0].Host + ";\r\n"
+				addHead += "	" + time.Now().Format(time.RFC1123Z) + "\r\n"
+
+				// data will be compressed when printed, with addHead added to beginning
+
+				data.set([]byte(addHead), &e.Data)
+				body = "gzencode"
+
+				// data will be written to redis - it implements the Stringer interface, redigo uses fmt to
+				// print the data to redis.
+
+				redisErr = redisClient.redisConnection(g.config.RedisInterface)
+				if redisErr == nil {
+					_, doErr := redisClient.conn.Do("SETEX", hash, g.config.RedisExpireSeconds, data)
+					if doErr == nil {
+						body = "redis" // the backend system will know to look in redis for the message data
+						data.clear()   // blank
+					}
+				} else {
+					mainlog.WithError(redisErr).Warn("Error while connecting redis")
+				}
 
-	redisClient := &redisClient{}
-	if redisErr := redisClient.redisConnection(g.config.RedisInterface); redisErr != nil {
-		err = fmt.Errorf("Redis cannot connect, check your settings: %s", redisErr)
+				vals = []interface{}{} // clear the vals
+				vals = append(vals,
+					trimToLimit(to, 255),
+					trimToLimit(e.MailFrom.String(), 255),
+					trimToLimit(e.Subject, 255),
+					body,
+					data.String(),
+					hash,
+					trimToLimit(to, 255),
+					e.RemoteAddress,
+					trimToLimit(e.MailFrom.String(), 255),
+					e.TLS)
+				return c.Process(e, task)
+
+			} else {
+				return c.Process(e, task)
+			}
+		})
 	}
-
-	return
 }

+ 0 - 0
backends/guerrilla_db_redis_test.go → backends/p_guerrilla_db_redis_test.go


+ 1 - 1
backends/p_hasher.go

@@ -32,7 +32,7 @@ func init() {
 // It appends the hashes to envelope's Hashes slice.
 func Hasher() Decorator {
 	return func(c Processor) Processor {
-		return ProcessorFunc(func(e *envelope.Envelope, task SelectTask) (Result, error) {
+		return ProcessWith(func(e *envelope.Envelope, task SelectTask) (Result, error) {
 
 			if task == TaskSaveMail {
 				// base hash, use subject from and timestamp-nano

+ 1 - 1
backends/p_header.go

@@ -47,7 +47,7 @@ func Header() Decorator {
 	}))
 
 	return func(c Processor) Processor {
-		return ProcessorFunc(func(e *envelope.Envelope, task SelectTask) (Result, error) {
+		return ProcessWith(func(e *envelope.Envelope, task SelectTask) (Result, error) {
 			if task == TaskSaveMail {
 				to := strings.TrimSpace(e.RcptTo[0].User) + "@" + config.PrimaryHost
 				hash := "unknown"

+ 1 - 1
backends/p_headers_parser.go

@@ -23,7 +23,7 @@ func init() {
 
 func HeadersParser() Decorator {
 	return func(c Processor) Processor {
-		return ProcessorFunc(func(e *envelope.Envelope, task SelectTask) (Result, error) {
+		return ProcessWith(func(e *envelope.Envelope, task SelectTask) (Result, error) {
 			if task == TaskSaveMail {
 				e.ParseHeaders()
 				// next processor

+ 1 - 1
backends/p_mysql.go

@@ -159,7 +159,7 @@ func MySql() Decorator {
 	}))
 
 	return func(c Processor) Processor {
-		return ProcessorFunc(func(e *envelope.Envelope, task SelectTask) (Result, error) {
+		return ProcessWith(func(e *envelope.Envelope, task SelectTask) (Result, error) {
 
 			if task == TaskSaveMail {
 				var to, body string

+ 1 - 1
backends/p_redis.go

@@ -85,7 +85,7 @@ func Redis() Decorator {
 	var redisErr error
 
 	return func(c Processor) Processor {
-		return ProcessorFunc(func(e *envelope.Envelope, task SelectTask) (Result, error) {
+		return ProcessWith(func(e *envelope.Envelope, task SelectTask) (Result, error) {
 
 			if task == TaskSaveMail {
 				hash := ""

+ 2 - 2
backends/processor.go

@@ -29,10 +29,10 @@ type Processor interface {
 }
 
 // Signature of Processor
-type ProcessorFunc func(*envelope.Envelope, SelectTask) (Result, error)
+type ProcessWith func(*envelope.Envelope, SelectTask) (Result, error)
 
 // Make ProcessorFunc will satisfy the Processor interface
-func (f ProcessorFunc) Process(e *envelope.Envelope, task SelectTask) (Result, error) {
+func (f ProcessWith) Process(e *envelope.Envelope, task SelectTask) (Result, error) {
 	// delegate to the anonymous function
 	return f(e, task)
 }

+ 31 - 12
backends/proxy.go

@@ -4,11 +4,11 @@ import (
 	"errors"
 	"fmt"
 	"github.com/flashmob/go-guerrilla/envelope"
+	"github.com/flashmob/go-guerrilla/log"
 	"sync"
 )
 
-// The ProxyBackend makes it possible to use the old backend system
-// which is not using processors
+// Deprecated: ProxyBackend makes it possible to use the old backend system
 type ProxyBackend struct {
 	config       proxyConfig
 	extend       proxy
@@ -18,6 +18,7 @@ type ProxyBackend struct {
 	wg sync.WaitGroup
 }
 
+// Deprecated: Use workerMsg instead
 type savePayload struct {
 	mail        *envelope.Envelope
 	from        *envelope.EmailAddress
@@ -25,23 +26,42 @@ type savePayload struct {
 	savedNotify chan *saveStatus
 }
 
+// Deprecated: Use notifyMsg instead
 type saveStatus struct {
 	err  error
 	hash string
 }
 
-type proxy interface {
+// Deprecated: Kept for compatibility, use BackendGateway instead
+// AbstractBackend is an alias ProxyBackend for back-compatibility.
+type AbstractBackend struct {
+	extend proxy
+	ProxyBackend
+}
 
-	//	Backend
-	/*
-		saveMailWorker
-		numberOfWorkersGetter
-		settingsTester
-		configLoader
-	*/
+// extractConfig for compatibility only. Forward to Svc.ExtractConfig
+func (ac *AbstractBackend) extractConfig(configData BackendConfig, configType BaseConfig) (interface{}, error) {
+	return Svc.ExtractConfig(configData, configType)
+}
 
+// copy the extend field down to the Proxy
+func (ac *AbstractBackend) Initialize(config BackendConfig) error {
+	if ac.extend != nil {
+		ac.ProxyBackend.extend = ac.extend
+	}
+	return ac.ProxyBackend.Initialize(config)
 }
 
+// Deprecated: backeConfig is an alias to BaseConfig, use BaseConfig instead
+type baseConfig BaseConfig
+
+// Deprecated: Use Log() instead to get a hold of a logger
+var mainlog log.Logger
+
+// Deprecated: proxy may implement backend interface or any of the interfaces below
+type proxy interface{}
+
+//
 type saveMailWorker interface {
 	// start save mail worker(s)
 	saveMailWorker(chan *savePayload)
@@ -83,9 +103,8 @@ func (pb *ProxyBackend) loadConfig(backendConfig BackendConfig) (err error) {
 }
 
 func (pb *ProxyBackend) initialize(config BackendConfig) error {
-
 	if cl, ok := pb.extend.(configLoader); ok {
-		return cl.loadConfig(config)
+		cl.loadConfig(config)
 	}
 	err := pb.loadConfig(config)
 	if err != nil {