|
@@ -1,9 +1,6 @@
|
|
|
package backends
|
|
|
|
|
|
// This backend is presented here as an example only, please modify it to your needs.
|
|
|
-//
|
|
|
-// Deprecated: as of 14th Feb 2017, backends are composed via config, by chaining Processors (files prefixed with p_*)
|
|
|
-//
|
|
|
// The backend stores the email data in Redis.
|
|
|
// Other meta-information is stored in MySQL to be joined later.
|
|
|
// A lot of email gets discarded without viewing on Guerrilla Mail,
|
|
@@ -34,8 +31,8 @@ import (
|
|
|
"bytes"
|
|
|
"compress/zlib"
|
|
|
"database/sql"
|
|
|
+ _ "github.com/go-sql-driver/mysql"
|
|
|
|
|
|
- "github.com/flashmob/go-guerrilla/envelope"
|
|
|
"github.com/go-sql-driver/mysql"
|
|
|
"io"
|
|
|
"runtime/debug"
|
|
@@ -50,11 +47,12 @@ const GuerrillaDBAndRedisBatchMax = 2
|
|
|
const GuerrillaDBAndRedisBatchTimeout = time.Second * 3
|
|
|
|
|
|
func init() {
|
|
|
- backends["guerrilla-db-redis"] = &ProxyBackend{
|
|
|
+ backends["guerrilla-db-redis"] = &AbstractBackend{
|
|
|
extend: &GuerrillaDBAndRedisBackend{}}
|
|
|
}
|
|
|
|
|
|
type GuerrillaDBAndRedisBackend struct {
|
|
|
+ AbstractBackend
|
|
|
config guerrillaDBAndRedisConfig
|
|
|
batcherWg sync.WaitGroup
|
|
|
// cache prepared queries
|
|
@@ -83,10 +81,9 @@ func convertError(name string) error {
|
|
|
// Load the backend config for the backend. It has already been unmarshalled
|
|
|
// from the main config file 'backend' config "backend_config"
|
|
|
// Now we need to convert each type and copy into the guerrillaDBAndRedisConfig struct
|
|
|
-
|
|
|
func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig BackendConfig) (err error) {
|
|
|
- configType := BaseConfig(&guerrillaDBAndRedisConfig{})
|
|
|
- bcfg, err := Svc.ExtractConfig(backendConfig, configType)
|
|
|
+ configType := baseConfig(&guerrillaDBAndRedisConfig{})
|
|
|
+ bcfg, err := g.extractConfig(backendConfig, configType)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -99,11 +96,6 @@ func (g *GuerrillaDBAndRedisBackend) getNumberOfWorkers() int {
|
|
|
return g.config.NumberOfWorkers
|
|
|
}
|
|
|
|
|
|
-// ValidateRcpt not implemented
|
|
|
-func (g *GuerrillaDBAndRedisBackend) ValidateRcpt(e *envelope.Envelope) RcptError {
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
type redisClient struct {
|
|
|
isConnected bool
|
|
|
conn redis.Conn
|
|
@@ -186,7 +178,7 @@ func (g *GuerrillaDBAndRedisBackend) prepareInsertQuery(rows int, db *sql.DB) *s
|
|
|
}
|
|
|
stmt, sqlErr := db.Prepare(sqlstr)
|
|
|
if sqlErr != nil {
|
|
|
- Log().WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
|
|
|
+ mainlog.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
|
|
|
}
|
|
|
// cache it
|
|
|
g.cache[rows-1] = stmt
|
|
@@ -198,14 +190,14 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
|
|
|
defer func() {
|
|
|
if r := recover(); r != nil {
|
|
|
//logln(1, fmt.Sprintf("Recovered in %v", r))
|
|
|
- Log().Error("Recovered form panic:", r, string(debug.Stack()))
|
|
|
+ mainlog.Error("Recovered form panic:", r, string(debug.Stack()))
|
|
|
sum := 0
|
|
|
for _, v := range *vals {
|
|
|
if str, ok := v.(string); ok {
|
|
|
sum = sum + len(str)
|
|
|
}
|
|
|
}
|
|
|
- Log().Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
|
|
|
+ mainlog.Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
|
|
|
panic("query failed")
|
|
|
}
|
|
|
}()
|
|
@@ -213,7 +205,7 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
|
|
|
insertStmt = g.prepareInsertQuery(c, db)
|
|
|
_, execErr = insertStmt.Exec(*vals...)
|
|
|
if execErr != nil {
|
|
|
- Log().WithError(execErr).Error("There was a problem the insert")
|
|
|
+ mainlog.WithError(execErr).Error("There was a problem the insert")
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -247,7 +239,7 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
|
|
|
}
|
|
|
defer func() {
|
|
|
if r := recover(); r != nil {
|
|
|
- Log().Error("insertQueryBatcher caught a panic", r)
|
|
|
+ mainlog.Error("insertQueryBatcher caught a panic", r)
|
|
|
}
|
|
|
}()
|
|
|
// Keep getting values from feeder and add to batch.
|
|
@@ -259,14 +251,14 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
|
|
|
// it may panic when reading on a closed feeder channel. feederOK detects if it was closed
|
|
|
case row, feederOk := <-feeder:
|
|
|
if row == nil {
|
|
|
- Log().Info("Query batchaer exiting")
|
|
|
+ mainlog.Info("Query batchaer exiting")
|
|
|
// Insert any remaining rows
|
|
|
insert(count)
|
|
|
return feederOk
|
|
|
}
|
|
|
vals = append(vals, row...)
|
|
|
count++
|
|
|
- Log().Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", workerId)
|
|
|
+ mainlog.Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", workerId)
|
|
|
if count >= GuerrillaDBAndRedisBatchMax {
|
|
|
insert(GuerrillaDBAndRedisBatchMax)
|
|
|
}
|
|
@@ -307,7 +299,7 @@ func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
|
|
|
Params: map[string]string{"collation": "utf8_general_ci"},
|
|
|
}
|
|
|
if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
|
|
|
- Log().Error("cannot open mysql", err)
|
|
|
+ mainlog.Error("cannot open mysql", err)
|
|
|
return nil, err
|
|
|
} else {
|
|
|
return db, nil
|
|
@@ -315,7 +307,7 @@ func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
|
|
|
|
|
|
}
|
|
|
|
|
|
-func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *workerMsg) {
|
|
|
+func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *savePayload) {
|
|
|
var to, body string
|
|
|
|
|
|
var redisErr error
|
|
@@ -327,7 +319,7 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *workerMsg
|
|
|
var err error
|
|
|
db, err = g.mysqlConnect()
|
|
|
if err != nil {
|
|
|
- Log().Fatalf("cannot open mysql: %s", err)
|
|
|
+ mainlog.Fatalf("cannot open mysql: %s", err)
|
|
|
}
|
|
|
|
|
|
// start the query SQL batching where we will send data via the feeder channel
|
|
@@ -335,11 +327,11 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *workerMsg
|
|
|
go func() {
|
|
|
for {
|
|
|
if feederOK := g.insertQueryBatcher(feeder, db); !feederOK {
|
|
|
- Log().Debug("insertQueryBatcher exited")
|
|
|
+ mainlog.Debug("insertQueryBatcher exited")
|
|
|
return
|
|
|
}
|
|
|
// if insertQueryBatcher panics, it can recover and go in again
|
|
|
- Log().Debug("resuming insertQueryBatcher")
|
|
|
+ mainlog.Debug("resuming insertQueryBatcher")
|
|
|
}
|
|
|
|
|
|
}()
|
|
@@ -347,11 +339,11 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *workerMsg
|
|
|
defer func() {
|
|
|
if r := recover(); r != nil {
|
|
|
//recover form closed channel
|
|
|
- Log().Error("panic recovered in saveMailWorker", r)
|
|
|
+ mainlog.Error("panic recovered in saveMailWorker", r)
|
|
|
}
|
|
|
db.Close()
|
|
|
if redisClient.conn != nil {
|
|
|
- Log().Infof("closed redis")
|
|
|
+ mainlog.Infof("closed redis")
|
|
|
redisClient.conn.Close()
|
|
|
}
|
|
|
// close the feeder & wait for query batcher to exit.
|
|
@@ -366,13 +358,13 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *workerMsg
|
|
|
for {
|
|
|
payload := <-saveMailChan
|
|
|
if payload == nil {
|
|
|
- Log().Debug("No more saveMailChan payload")
|
|
|
+ mainlog.Debug("No more saveMailChan payload")
|
|
|
return
|
|
|
}
|
|
|
- Log().Debug("Got mail from chan", payload.mail.RemoteAddress)
|
|
|
- to = trimToLimit(strings.TrimSpace(payload.mail.RcptTo[0].User)+"@"+g.config.PrimaryHost, 255)
|
|
|
+ mainlog.Debug("Got mail from chan", payload.mail.RemoteAddress)
|
|
|
+ to = trimToLimit(strings.TrimSpace(payload.recipient.User)+"@"+g.config.PrimaryHost, 255)
|
|
|
payload.mail.Helo = trimToLimit(payload.mail.Helo, 255)
|
|
|
- host := trimToLimit(payload.mail.RcptTo[0].Host, 255)
|
|
|
+ payload.recipient.Host = trimToLimit(payload.recipient.Host, 255)
|
|
|
ts := fmt.Sprintf("%d", time.Now().UnixNano())
|
|
|
payload.mail.ParseHeaders()
|
|
|
hash := MD5Hex(
|
|
@@ -384,7 +376,7 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *workerMsg
|
|
|
var addHead string
|
|
|
addHead += "Delivered-To: " + to + "\r\n"
|
|
|
addHead += "Received: from " + payload.mail.Helo + " (" + payload.mail.Helo + " [" + payload.mail.RemoteAddress + "])\r\n"
|
|
|
- addHead += " by " + host + " with SMTP id " + hash + "@" + host + ";\r\n"
|
|
|
+ addHead += " by " + payload.recipient.Host + " with SMTP id " + hash + "@" + payload.recipient.Host + ";\r\n"
|
|
|
addHead += " " + time.Now().Format(time.RFC1123Z) + "\r\n"
|
|
|
|
|
|
// data will be compressed when printed, with addHead added to beginning
|
|
@@ -403,7 +395,7 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *workerMsg
|
|
|
data.clear() // blank
|
|
|
}
|
|
|
} else {
|
|
|
- Log().WithError(redisErr).Warn("Error while connecting redis")
|
|
|
+ mainlog.WithError(redisErr).Warn("Error while connecting redis")
|
|
|
}
|
|
|
|
|
|
vals = []interface{}{} // clear the vals
|
|
@@ -419,7 +411,7 @@ func (g *GuerrillaDBAndRedisBackend) saveMailWorker(saveMailChan chan *workerMsg
|
|
|
trimToLimit(payload.mail.MailFrom.String(), 255),
|
|
|
payload.mail.TLS)
|
|
|
feeder <- vals
|
|
|
- payload.notifyMe <- ¬ifyMsg{nil, hash}
|
|
|
+ payload.savedNotify <- &saveStatus{nil, hash}
|
|
|
|
|
|
}
|
|
|
}
|