ソースを参照

revise and optimise guerrilla-db-redis backend (#50)

* revise and optimise guerrilla-db-redis backend

* travis ci: gofmt - ignore .glide
Guerrilla Mail 8 年 前
コミット
b9befb4310
3 ファイル変更150 行追加42 行削除
  1. 1 1
      .travis.gofmt.sh
  2. 6 5
      README.md
  3. 143 36
      backends/guerrilla_db_redis.go

+ 1 - 1
.travis.gofmt.sh

@@ -1,6 +1,6 @@
 #!/bin/bash
 #!/bin/bash
 
 
-if [[ -n $(find . -path '*/vendor/*' -prune -o -name '*.go' -type f -exec gofmt -l {} \;) ]]; then
+if [[ -n $(find . -path '*/vendor/*' -prune -o -path '*.glide/*' -prune -o -name '*.go' -type f -exec gofmt -l {} \;) ]]; then
     echo "Go code is not formatted:"
     echo "Go code is not formatted:"
     gofmt -d .
     gofmt -d .
     exit 1
     exit 1

+ 6 - 5
README.md

@@ -123,7 +123,7 @@ If you want to build on the sample `guerrilla-db-redis` module, setup the follow
 in MySQL:
 in MySQL:
 
 
 	CREATE TABLE IF NOT EXISTS `new_mail` (
 	CREATE TABLE IF NOT EXISTS `new_mail` (
-	  `mail_id` int(11) NOT NULL auto_increment,
+	  `mail_id` BIGINT(20) unsigned NOT NULL AUTO_INCREMENT,
 	  `date` datetime NOT NULL,
 	  `date` datetime NOT NULL,
 	  `from` varchar(128) character set latin1 NOT NULL,
 	  `from` varchar(128) character set latin1 NOT NULL,
 	  `to` varchar(128) character set latin1 NOT NULL,
 	  `to` varchar(128) character set latin1 NOT NULL,
@@ -137,9 +137,8 @@ in MySQL:
 	  `recipient` varchar(128) character set latin1 NOT NULL,
 	  `recipient` varchar(128) character set latin1 NOT NULL,
 	  `has_attach` int(11) NOT NULL,
 	  `has_attach` int(11) NOT NULL,
 	  `ip_addr` varchar(15) NOT NULL,
 	  `ip_addr` varchar(15) NOT NULL,
-	  `delivered` bit(1) NOT NULL default b'0',
-	  `attach_info` text NOT NULL,
-	  `dkim_valid` tinyint(4) default NULL,
+	  `return_path` VARCHAR(255) NOT NULL,
+	  `is_tls` BIT(1) DEFAULT b'0' NOT NULL,
 	  PRIMARY KEY  (`mail_id`),
 	  PRIMARY KEY  (`mail_id`),
 	  KEY `to` (`to`),
 	  KEY `to` (`to`),
 	  KEY `hash` (`hash`),
 	  KEY `hash` (`hash`),
@@ -148,7 +147,9 @@ in MySQL:
 
 
 The above table does not store the body of the email which makes it quick
 The above table does not store the body of the email which makes it quick
 to query and join, while the body of the email is fetched from Redis
 to query and join, while the body of the email is fetched from Redis
-if needed.
+for future processing. The `mail` field can contain data in case Redis is down.
+Otherwise, if data is in Redis, the `mail` will be blank, and
+the `body` field will contain the word 'redis'.
 
 
 You can implement your own saveMail function to use whatever storage /
 You can implement your own saveMail function to use whatever storage /
 backend fits for you. Please share them ^_^, in particular, we would 
 backend fits for you. Please share them ^_^, in particular, we would 

+ 143 - 36
backends/guerrilla_db_redis.go

@@ -1,7 +1,19 @@
 package backends
 package backends
 
 
+// This backend is presented here as an example only, please modify it to your needs.
+// The backend stores the email data in Redis.
+// Other meta-information is stored in MySQL to be joined later.
+// A lot of email gets discarded without viewing on Guerrilla Mail,
+// so it's much faster to put in Redis, where other programs can
+// process it later, without touching the disk.
+// Short history:
+// Started with issuing an insert query for each single email and another query to update the tally
+// Then applied the following optimizations:
+// - Moved tally updates to another background process which does the tallying in a single query
+// - Changed the MySQL queries to insert in batch
+// - Made a Compressor that recycles buffers using sync.Pool
+// The result was around 400% speed improvement. If you know of any more improvements, please share!
 import (
 import (
-	"errors"
 	"fmt"
 	"fmt"
 
 
 	"time"
 	"time"
@@ -18,6 +30,12 @@ import (
 	"sync"
 	"sync"
 )
 )
 
 
+// how many rows to batch at a time
+const GuerrillaDBAndRedisBatchMax = 500
+
+// tick on every...
+const GuerrillaDBAndRedisBatchTimeout = time.Second * 3
+
 func init() {
 func init() {
 	backends["guerrilla-db-redis"] = &AbstractBackend{
 	backends["guerrilla-db-redis"] = &AbstractBackend{
 		extend: &GuerrillaDBAndRedisBackend{}}
 		extend: &GuerrillaDBAndRedisBackend{}}
@@ -25,9 +43,15 @@ func init() {
 
 
 type GuerrillaDBAndRedisBackend struct {
 type GuerrillaDBAndRedisBackend struct {
 	AbstractBackend
 	AbstractBackend
-	config guerrillaDBAndRedisConfig
+	config    guerrillaDBAndRedisConfig
+	batcherWg sync.WaitGroup
+	// cache prepared queries
+	cache stmtCache
 }
 }
 
 
+// statement cache. It's an array, not slice
+type stmtCache [GuerrillaDBAndRedisBatchMax]*autorc.Stmt
+
 type guerrillaDBAndRedisConfig struct {
 type guerrillaDBAndRedisConfig struct {
 	NumberOfWorkers    int    `json:"save_workers_size"`
 	NumberOfWorkers    int    `json:"save_workers_size"`
 	MysqlTable         string `json:"mail_table"`
 	MysqlTable         string `json:"mail_table"`
@@ -131,12 +155,111 @@ func (c *compressedData) clear() {
 	c.data = nil
 	c.data = nil
 }
 }
 
 
+// prepares the sql query with the number of rows that can be batched with it
+func (g *GuerrillaDBAndRedisBackend) prepareInsertQuery(rows int, db *autorc.Conn) *autorc.Stmt {
+	if rows == 0 {
+		panic("rows argument cannot be 0")
+	}
+	if g.cache[rows-1] != nil {
+		return g.cache[rows-1]
+	}
+	sql := "INSERT INTO " + g.config.MysqlTable + " "
+	sql += "(`date`, `to`, `from`, `subject`, `body`, `charset`, `mail`, `spam_score`, `hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, `return_path`, `is_tls`)"
+	sql += " values "
+	values := "(NOW(), ?, ?, ?, ? , 'UTF-8' , ?, 0, ?, '', ?, 0, ?, ?, ?)"
+	// add more rows
+	comma := ""
+	for i := 0; i < rows; i++ {
+		sql += comma + values
+		if comma == "" {
+			comma = ","
+		}
+	}
+	//log.Debug("Prepared SQL", rows, sql)
+	stmt, sqlErr := db.Prepare(sql)
+	if sqlErr != nil {
+		log.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
+	}
+	// cache it
+	g.cache[rows-1] = stmt
+	return stmt
+}
+
+// Batches the rows from the feeder chan in to a single INSERT statement.
+// Execute the batches query when:
+// - number of batched rows reaches a threshold, i.e. count n = threshold
+// - or, no new rows within a certain time, i.e. times out
+func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{}, db *autorc.Conn) {
+	// controls shutdown
+	defer g.batcherWg.Done()
+	g.batcherWg.Add(1)
+	// vals is where values are batched to
+	var vals []interface{}
+	// how many rows were batched
+	count := 0
+	// The timer will tick every second.
+	// Interrupting the select clause when there's no data on the feeder channel
+	t := time.NewTimer(GuerrillaDBAndRedisBatchTimeout)
+	// prepare the query used to insert when rows reaches batchMax
+	insertStmt := g.prepareInsertQuery(GuerrillaDBAndRedisBatchMax, db)
+	// inserts executes a batched insert query, clears the vals and resets the count
+	insert := func(c int) {
+		if c > 0 {
+			insertStmt = g.prepareInsertQuery(c, db)
+			insertStmt.Bind(vals...)
+			_, _, err := insertStmt.Exec()
+			if err != nil {
+				log.WithError(err).Error("There was a problem the insert")
+			} else {
+				//log.Debugf("Inserted %d rows ", count)
+			}
+		}
+		vals = nil
+		count = 0
+	}
+	// Keep getting values from feeder and add to batch.
+	// if feeder times out, execute the batched query
+	// otherwise, execute the batched query once it reaches the GuerrillaDBAndRedisBatchMax threshold
+	for {
+		select {
+		case row := <-feeder:
+			log.Info("row form chan is", row, "cols:", len(row))
+			if row == nil {
+				log.Debug("Query batchaer exiting")
+				// Insert any remaining rows
+				insert(count)
+				return
+			}
+			vals = append(vals, row...)
+			count++
+			//log.Debug("apend vals", count, vals)
+			if count == GuerrillaDBAndRedisBatchMax {
+				insert(GuerrillaDBAndRedisBatchMax)
+			}
+			// stop timer from firing (reset the interrupt)
+			if !t.Stop() {
+				<-t.C
+			}
+			t.Reset(GuerrillaDBAndRedisBatchTimeout)
+		case <-t.C:
+			//log.Debugf("Query batcher timer fired! [%d]", len(vals))
+			//log.Debug("Contents:", count, vals)
+			// anything to insert?
+			if n := len(vals); n > 0 {
+				insert(count)
+			}
+			t.Reset(GuerrillaDBAndRedisBatchTimeout)
+		}
+	}
+}
+
 func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePayload) {
 func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePayload) {
 	var to, body string
 	var to, body string
-	var err error
+	//var length int
+	//var err error
 
 
 	var redisErr error
 	var redisErr error
-	var length int
+
 	redisClient := &redisClient{}
 	redisClient := &redisClient{}
 	db := autorc.New(
 	db := autorc.New(
 		"tcp",
 		"tcp",
@@ -146,18 +269,10 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePaylo
 		g.config.MysqlPass,
 		g.config.MysqlPass,
 		g.config.MysqlDB)
 		g.config.MysqlDB)
 	db.Register("set names utf8")
 	db.Register("set names utf8")
-	sql := "INSERT INTO " + g.config.MysqlTable + " "
-	sql += "(`date`, `to`, `from`, `subject`, `body`, `charset`, `mail`, `spam_score`, `hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, `return_path`, `is_tls`)"
-	sql += " values (NOW(), ?, ?, ?, ? , 'UTF-8' , ?, 0, ?, '', ?, 0, ?, ?, ?)"
-	ins, sqlErr := db.Prepare(sql)
-	if sqlErr != nil {
-		log.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
-	}
-	sql = "UPDATE gm2_setting SET `setting_value` = `setting_value`+1 WHERE `setting_name`='received_emails' LIMIT 1"
-	incr, sqlErr := db.Prepare(sql)
-	if sqlErr != nil {
-		log.WithError(sqlErr).Fatalf("failed while db.Prepare(UPDATE...)")
-	}
+	// start the query SQL batching where we will send data via the feeder channel
+	feeder := make(chan []interface{}, 1)
+	go g.insertQueryBatcher(feeder, db)
+
 	defer func() {
 	defer func() {
 		if r := recover(); r != nil {
 		if r := recover(); r != nil {
 			//recover form closed channel
 			//recover form closed channel
@@ -170,10 +285,15 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePaylo
 			log.Infof("closed redis")
 			log.Infof("closed redis")
 			redisClient.conn.Close()
 			redisClient.conn.Close()
 		}
 		}
-	}()
+		// close the feeder & wait for query batcher to exit.
+		close(feeder)
+		g.batcherWg.Wait()
 
 
+	}()
+	var vals []interface{}
 	data := newCompressedData()
 	data := newCompressedData()
 	//  receives values from the channel repeatedly until it is closed.
 	//  receives values from the channel repeatedly until it is closed.
+
 	for {
 	for {
 		payload := <-saveMailChan
 		payload := <-saveMailChan
 		if payload == nil {
 		if payload == nil {
@@ -181,7 +301,6 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePaylo
 			return
 			return
 		}
 		}
 		to = payload.recipient.User + "@" + g.config.PrimaryHost
 		to = payload.recipient.User + "@" + g.config.PrimaryHost
-		length = payload.mail.Data.Len()
 
 
 		ts := fmt.Sprintf("%d", time.Now().UnixNano())
 		ts := fmt.Sprintf("%d", time.Now().UnixNano())
 		payload.mail.ParseHeaders()
 		payload.mail.ParseHeaders()
@@ -218,8 +337,8 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePaylo
 			log.WithError(redisErr).Warn("Error while SETEX on redis")
 			log.WithError(redisErr).Warn("Error while SETEX on redis")
 		}
 		}
 
 
-		// bind data to cursor
-		ins.Bind(
+		vals = []interface{}{} // clear the vals
+		vals = append(vals,
 			to,
 			to,
 			payload.mail.MailFrom.String(),
 			payload.mail.MailFrom.String(),
 			payload.mail.Subject,
 			payload.mail.Subject,
@@ -229,22 +348,10 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePaylo
 			to,
 			to,
 			payload.mail.RemoteAddress,
 			payload.mail.RemoteAddress,
 			payload.mail.MailFrom.String(),
 			payload.mail.MailFrom.String(),
-			payload.mail.TLS,
-		)
-		// save, discard result
-		_, _, err = ins.Exec()
-		if err != nil {
-			errMsg := "Database error while inserting"
-			log.WithError(err).Warn(errMsg)
-			payload.savedNotify <- &saveStatus{errors.New(errMsg), hash}
-		} else {
-			log.Debugf("Email saved %s (len=%d)", hash, length)
-			_, _, err = incr.Exec()
-			if err != nil {
-				log.WithError(err).Warn("Database error while incr count")
-			}
-			payload.savedNotify <- &saveStatus{nil, hash}
-		}
+			payload.mail.TLS)
+		feeder <- vals
+		payload.savedNotify <- &saveStatus{nil, hash}
+
 	}
 	}
 }
 }