Browse Source

more progress with decorators:
- Hasher decorator
- MySQL decorator

also fixes a bug with header parsing

Todo:
redis decorator, cleanup / remove legacy code, finalise interfaces

flashmob 8 years ago
parent
commit
b19bac2361

+ 12 - 0
backends/abstract.go

@@ -15,6 +15,7 @@ type AbstractBackend struct {
 	p             Processor
 	configLoaders []ConfigLoaderFunc
 	configTesters []ConfigTesterFunc
+	initializers  []DecoratorinitializeFunc
 }
 
 type abstractConfig struct {
@@ -35,6 +36,12 @@ func (b *AbstractBackend) AddConfigTester(f ConfigTesterFunc) {
 	b.configTesters = append(b.configTesters, f)
 }
 
+type DecoratorinitializeFunc func() error
+
+func (b *AbstractBackend) AddInitializer(f DecoratorinitializeFunc) {
+	b.initializers = append(b.initializers, f)
+}
+
 // Your backend should implement this method and set b.config field with a custom config struct
 // Therefore, your implementation would have your own custom config type instead of dummyConfig
 func (b *AbstractBackend) loadConfig(backendConfig BackendConfig) (err error) {
@@ -64,7 +71,12 @@ func (b *AbstractBackend) Initialize(config BackendConfig) error {
 	for _, loader := range b.configLoaders {
 		loader(config)
 	}
+	for i := range b.initializers {
+		b.initializers[i]()
+	}
 	return nil
+
+	// TODO delete
 	if b.Extend != nil {
 		return b.Extend.loadConfig(config)
 	}

+ 4 - 2
backends/backend.go

@@ -33,6 +33,7 @@ type Worker interface {
 
 	AddConfigLoader(f ConfigLoaderFunc)
 	AddConfigTester(f ConfigTesterFunc)
+	AddInitializer(f DecoratorinitializeFunc)
 
 	Shutdown() error
 	Process(*envelope.Envelope) BackendResult
@@ -42,8 +43,9 @@ type Worker interface {
 }
 
 type DecoratorCallbacks struct {
-	loader ConfigLoaderFunc
-	tester ConfigTesterFunc
+	loader     ConfigLoaderFunc
+	tester     ConfigTesterFunc
+	initialize DecoratorinitializeFunc
 }
 
 type BackendConfig map[string]interface{}

+ 1 - 1
backends/debugger.go

@@ -25,7 +25,7 @@ func Debugger(dc *DecoratorCallbacks) Decorator {
 		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
 			if config.LogReceivedMails {
 				mainlog.Infof("Mail from: %s / to: %v", e.MailFrom.String(), e.RcptTo)
-				mainlog.Info("So, Headers are: %s", e.Header)
+				mainlog.Info("So, Headers are:", e.Header)
 			}
 			// continue to the next Processor in the decorator chain
 			return c.Process(e)

+ 4 - 1
backends/dummy.go

@@ -3,8 +3,11 @@ package backends
 func init() {
 	backends["dummy"] = &AbstractBackend{}
 	cb := &DecoratorCallbacks{}
-	backends["dummy"].SetProcessors(Debugger(cb), HeadersParser())
+	guerrillaDBcb := &DecoratorCallbacks{}
+	backends["dummy"].SetProcessors(GuerrillaDB(guerrillaDBcb), Hasher(), Debugger(cb), HeadersParser())
 	backends["dummy"].AddConfigLoader(cb.loader)
+	backends["dummy"].AddConfigLoader(guerrillaDBcb.loader)
+	backends["dummy"].AddInitializer(guerrillaDBcb.initialize)
 }
 
 // custom configuration we will parse from the json

+ 182 - 0
backends/guerrilla_db.go

@@ -0,0 +1,182 @@
+package backends
+
+import (
+	"database/sql"
+	"strings"
+	"time"
+
+	"github.com/flashmob/go-guerrilla/envelope"
+	"github.com/go-sql-driver/mysql"
+
+	"runtime/debug"
+)
+
+type guerrillaDBConfig struct {
+	NumberOfWorkers    int    `json:"save_workers_size"`
+	MysqlTable         string `json:"mail_table"`
+	MysqlDB            string `json:"mysql_db"`
+	MysqlHost          string `json:"mysql_host"`
+	MysqlPass          string `json:"mysql_pass"`
+	MysqlUser          string `json:"mysql_user"`
+	RedisExpireSeconds int    `json:"redis_expire_seconds"`
+	RedisInterface     string `json:"redis_interface"`
+	PrimaryHost        string `json:"primary_mail_host"`
+}
+
+type guerrillaDBDecorator struct {
+	cache  stmtCache
+	config *guerrillaDBConfig
+}
+
+// prepares the sql query with the number of rows that can be batched with it
+func (g *guerrillaDBDecorator) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
+	if rows == 0 {
+		panic("rows argument cannot be 0")
+	}
+	if g.cache[rows-1] != nil {
+		return g.cache[rows-1]
+	}
+	sqlstr := "INSERT INTO " + g.config.MysqlTable + " "
+	sqlstr += "(`date`, `to`, `from`, `subject`, `body`, `charset`, `mail`, `spam_score`, `hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, `return_path`, `is_tls`)"
+	sqlstr += " values "
+	values := "(NOW(), ?, ?, ?, ? , 'UTF-8' , ?, 0, ?, '', ?, 0, ?, ?, ?)"
+	// add more rows
+	comma := ""
+	for i := 0; i < rows; i++ {
+		sqlstr += comma + values
+		if comma == "" {
+			comma = ","
+		}
+	}
+	stmt, sqlErr := db.Prepare(sqlstr)
+	if sqlErr != nil {
+		mainlog.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
+	}
+	// cache it
+	g.cache[rows-1] = stmt
+	return stmt
+}
+
+func (g *guerrillaDBDecorator) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) {
+	var execErr error
+	defer func() {
+		if r := recover(); r != nil {
+			//logln(1, fmt.Sprintf("Recovered in %v", r))
+			mainlog.Error("Recovered form panic:", r, string(debug.Stack()))
+			sum := 0
+			for _, v := range *vals {
+				if str, ok := v.(string); ok {
+					sum = sum + len(str)
+				}
+			}
+			mainlog.Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
+			panic("query failed")
+		}
+	}()
+	// prepare the query used to insert when rows reaches batchMax
+	insertStmt = g.prepareInsertQuery(c, db)
+	_, execErr = insertStmt.Exec(*vals...)
+	if execErr != nil {
+		mainlog.WithError(execErr).Error("There was a problem the insert")
+	}
+}
+
+func GuerrillaDB(dc *DecoratorCallbacks) Decorator {
+
+	decorator := guerrillaDBDecorator{}
+
+	var config *guerrillaDBConfig
+	dc.loader = func(backendConfig BackendConfig) error {
+		configType := baseConfig(&guerrillaDBConfig{})
+		bcfg, err := ab.extractConfig(backendConfig, configType)
+		if err != nil {
+			return err
+		}
+		config = bcfg.(*guerrillaDBConfig)
+		decorator.config = config
+		return nil
+	}
+
+	var vals []interface{}
+	var db *sql.DB
+	var err error
+
+	mysqlConnect := func() (*sql.DB, error) {
+		conf := mysql.Config{
+			User:         config.MysqlUser,
+			Passwd:       config.MysqlPass,
+			DBName:       config.MysqlDB,
+			Net:          "tcp",
+			Addr:         config.MysqlHost,
+			ReadTimeout:  GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
+			WriteTimeout: GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
+			Params:       map[string]string{"collation": "utf8_general_ci"},
+		}
+		if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
+			mainlog.Error("cannot open mysql", err)
+			return nil, err
+		} else {
+			mainlog.Info("connected to mysql on tcp ", config.MysqlHost)
+			return db, nil
+		}
+
+	}
+
+	dc.initialize = func() error {
+		db, err = mysqlConnect()
+		if err != nil {
+			mainlog.Fatalf("cannot open mysql: %s", err)
+		}
+		return err
+	}
+
+	return func(c Processor) Processor {
+		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
+			var to, body string
+			to = trimToLimit(strings.TrimSpace(e.RcptTo[0].User)+"@"+config.PrimaryHost, 255)
+			hash := ""
+			if len(e.Hashes) > 0 {
+				hash = e.Hashes[0]
+			}
+
+			var compressor *compressedData
+			// a compressor was set
+			if c, ok := e.Meta["gzip"]; ok {
+				body = "gzip"
+				compressor = c.(*compressedData)
+			}
+
+			// was saved in redis
+			if _, ok := e.Meta["redis"]; ok {
+				body = "redis"
+			}
+
+			// build the values for the query
+			vals = []interface{}{} // clear the vals
+			vals = append(vals,
+				to,
+				trimToLimit(e.MailFrom.String(), 255),
+				trimToLimit(e.Subject, 255),
+				body)
+			if compressor != nil {
+				// use a compressor
+				vals = append(vals,
+					compressor.String())
+			} else {
+				vals = append(vals, e.Data.String())
+			}
+
+			vals = append(vals,
+				hash,
+				to,
+				e.RemoteAddress,
+				trimToLimit(e.MailFrom.String(), 255),
+				e.TLS)
+
+			stmt := decorator.prepareInsertQuery(1, db)
+			decorator.doQuery(1, db, stmt, &vals)
+			// continue to the next Processor in the decorator chain
+			return c.Process(e)
+		})
+	}
+}

+ 1 - 1
backends/guerrilla_db_redis.go

@@ -307,7 +307,7 @@ func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
 
 }
 
-func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePayload) {
+func (g *GuerrillaDBAndRedisBackend) saveMailWorker_old(saveMailChan chan *savePayload) {
 	var to, body string
 
 	var redisErr error

+ 36 - 0
backends/hasher.go

@@ -0,0 +1,36 @@
+package backends
+
+import (
+	"crypto/md5"
+	"fmt"
+	"github.com/flashmob/go-guerrilla/envelope"
+	"io"
+	"strings"
+	"time"
+)
+
+// The hasher decorator computes a hash of the email for each recipient
+// It appends the hashes to envelope's Hashes slice.
+func Hasher() Decorator {
+	return func(c Processor) Processor {
+		return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
+
+			// base hash
+			h := md5.New()
+			ts := fmt.Sprintf("%d", time.Now().UnixNano())
+			io.Copy(h, strings.NewReader(e.MailFrom.String()))
+			io.Copy(h, strings.NewReader(e.Subject))
+			io.Copy(h, strings.NewReader(ts))
+
+			// using the base hash, calculate a unique hash for each recipient
+			for i := range e.RcptTo {
+				h2 := h // copy
+				io.Copy(h2, strings.NewReader(e.RcptTo[i].String()))
+				sum := h2.Sum([]byte{})
+				e.Hashes = append(e.Hashes, fmt.Sprintf("%x", sum))
+			}
+
+			return c.Process(e)
+		})
+	}
+}

+ 0 - 10
backends/logger.go

@@ -1,10 +0,0 @@
-package backends
-
-type loggerConfig struct {
-	LogReceivedMails bool `json:"log_received_mails"`
-}
-
-// putting all the paces we need together
-type LoggerBackend struct {
-	config dummyConfig
-}

+ 13 - 8
envelope/envelope.go

@@ -46,6 +46,10 @@ type Envelope struct {
 	TLS bool
 	// Header stores the results from ParseHeaders()
 	Header textproto.MIMEHeader
+	// Hold the metadat
+	Meta map[string]interface{}
+	// Hashes of each email on the rcpt
+	Hashes []string
 }
 
 // ParseHeaders parses the headers into Header field of the Envelope struct.
@@ -57,18 +61,19 @@ func (e *Envelope) ParseHeaders() error {
 	if e.Header != nil {
 		return errors.New("Headers already parsed")
 	}
-	all := e.Data.Bytes()
-
+	b2 := bytes.NewBuffer(e.Data.Bytes())
 	// find where the header ends, assuming that over 30 kb would be max
 	max := 1024 * 30
-	if len(all) < max {
-		max = len(all) - 1
+	if b2.Len() < max {
+		max = b2.Len()
 	}
-	str := string(all[:max])
-	headerEnd := strings.Index(str, "\n\n")
-
+	// read in the chunk which we'll scan for the header
+	chunk := make([]byte, max)
+	b2.Read(chunk)
+	headerEnd := strings.Index(string(chunk), "\n\n") // the first two new-lines is the end of header
 	if headerEnd > -1 {
-		headerReader := textproto.NewReader(bufio.NewReader(bytes.NewBuffer(all[0:headerEnd])))
+		header := chunk[0:headerEnd]
+		headerReader := textproto.NewReader(bufio.NewReader(bytes.NewBuffer(header)))
 		e.Header, err = headerReader.ReadMIMEHeader()
 		if err != nil {
 			// decode the subject