|
@@ -2,6 +2,7 @@ package backends
|
|
|
|
|
|
import (
|
|
import (
|
|
"fmt"
|
|
"fmt"
|
|
|
|
+ "sync"
|
|
"time"
|
|
"time"
|
|
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
log "github.com/Sirupsen/logrus"
|
|
@@ -19,7 +20,9 @@ func init() {
|
|
}
|
|
}
|
|
|
|
|
|
type GuerrillaDBAndRedisBackend struct {
|
|
type GuerrillaDBAndRedisBackend struct {
|
|
- config guerrillaDBAndRedisConfig
|
|
|
|
|
|
+ config guerrillaDBAndRedisConfig
|
|
|
|
+ saveMailChan chan *savePayload
|
|
|
|
+ wg *sync.WaitGroup
|
|
}
|
|
}
|
|
|
|
|
|
type guerrillaDBAndRedisConfig struct {
|
|
type guerrillaDBAndRedisConfig struct {
|
|
@@ -34,16 +37,58 @@ type guerrillaDBAndRedisConfig struct {
|
|
PrimaryHost string `json:"primary_mail_host"`
|
|
PrimaryHost string `json:"primary_mail_host"`
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func convertError(name string) error {
|
|
|
|
+ return fmt.Errorf("failed to load backend config field (%s)", name)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig guerrilla.BackendConfig) error {
|
|
|
|
+ var converted bool
|
|
|
|
+
|
|
|
|
+ if g.config.NumberOfWorkers, converted = backendConfig["save_workers_size"].(int); !converted {
|
|
|
|
+ return convertError("save_workers_size")
|
|
|
|
+ }
|
|
|
|
+ if g.config.MysqlTable, converted = backendConfig["mail_table"].(string); !converted {
|
|
|
|
+ return convertError("mail_table")
|
|
|
|
+ }
|
|
|
|
+ if g.config.MysqlDB, converted = backendConfig["mysql_db"].(string); !converted {
|
|
|
|
+ return convertError("mysql_db")
|
|
|
|
+ }
|
|
|
|
+ if g.config.MysqlHost, converted = backendConfig["mysql_host"].(string); !converted {
|
|
|
|
+ return convertError("mysql_host")
|
|
|
|
+ }
|
|
|
|
+ if g.config.MysqlPass, converted = backendConfig["mysql_pass"].(string); !converted {
|
|
|
|
+ return convertError("mysql_pass")
|
|
|
|
+ }
|
|
|
|
+ if g.config.MysqlUser, converted = backendConfig["mysql_user"].(string); !converted {
|
|
|
|
+ return convertError("mysql_user")
|
|
|
|
+ }
|
|
|
|
+ if g.config.RedisExpireSeconds, converted = backendConfig["redis_expire_seconds"].(int); !converted {
|
|
|
|
+ return convertError("redis_expire_seconds")
|
|
|
|
+ }
|
|
|
|
+ if g.config.RedisInterface, converted = backendConfig["redis_interface"].(string); !converted {
|
|
|
|
+ return convertError("redis_interface")
|
|
|
|
+ }
|
|
|
|
+ if g.config.PrimaryHost, converted = backendConfig["primary_mail_host"].(string); !converted {
|
|
|
|
+ return convertError("primary_mail_host")
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
func (g *GuerrillaDBAndRedisBackend) Initialize(backendConfig guerrilla.BackendConfig) error {
|
|
func (g *GuerrillaDBAndRedisBackend) Initialize(backendConfig guerrilla.BackendConfig) error {
|
|
- // TODO: load config
|
|
|
|
|
|
+ err := g.loadConfig(backendConfig)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
|
|
if err := g.testDbConnections(); err != nil {
|
|
if err := g.testDbConnections(); err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
|
|
|
|
- SaveMailChan = make(chan *savePayload, g.config.NumberOfWorkers)
|
|
|
|
|
|
+ g.saveMailChan = make(chan *savePayload, g.config.NumberOfWorkers)
|
|
|
|
|
|
// start some savemail workers
|
|
// start some savemail workers
|
|
|
|
+ g.wg.Add(g.config.NumberOfWorkers)
|
|
for i := 0; i < g.config.NumberOfWorkers; i++ {
|
|
for i := 0; i < g.config.NumberOfWorkers; i++ {
|
|
go g.saveMail()
|
|
go g.saveMail()
|
|
}
|
|
}
|
|
@@ -51,10 +96,21 @@ func (g *GuerrillaDBAndRedisBackend) Initialize(backendConfig guerrilla.BackendC
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
-func (g *GuerrillaDBAndRedisBackend) Process(client *guerrilla.Client, user, host string) string {
|
|
|
|
|
|
+func (g *GuerrillaDBAndRedisBackend) Finalize() error {
|
|
|
|
+ close(g.saveMailChan)
|
|
|
|
+ g.wg.Wait()
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (g *GuerrillaDBAndRedisBackend) Process(client *guerrilla.Client, from *guerrilla.EmailParts, to []*guerrilla.EmailParts) string {
|
|
|
|
+ if len(to) == 0 {
|
|
|
|
+ return "554 Error: no recipient"
|
|
|
|
+ }
|
|
|
|
+
|
|
// to do: timeout when adding to SaveMailChan
|
|
// to do: timeout when adding to SaveMailChan
|
|
// place on the channel so that one of the save mail workers can pick it up
|
|
// place on the channel so that one of the save mail workers can pick it up
|
|
- SaveMailChan <- &savePayload{client: client, user: user, host: host}
|
|
|
|
|
|
+ // TODO: support multiple recipients
|
|
|
|
+ g.saveMailChan <- &savePayload{client: client, from: from, recipient: to[0]}
|
|
// wait for the save to complete
|
|
// wait for the save to complete
|
|
// or timeout
|
|
// or timeout
|
|
select {
|
|
select {
|
|
@@ -70,13 +126,11 @@ func (g *GuerrillaDBAndRedisBackend) Process(client *guerrilla.Client, user, hos
|
|
}
|
|
}
|
|
|
|
|
|
type savePayload struct {
|
|
type savePayload struct {
|
|
- client *guerrilla.Client
|
|
|
|
- user string
|
|
|
|
- host string
|
|
|
|
|
|
+ client *guerrilla.Client
|
|
|
|
+ from *guerrilla.EmailParts
|
|
|
|
+ recipient *guerrilla.EmailParts
|
|
}
|
|
}
|
|
|
|
|
|
-var SaveMailChan chan *savePayload
|
|
|
|
-
|
|
|
|
type redisClient struct {
|
|
type redisClient struct {
|
|
count int
|
|
count int
|
|
conn redis.Conn
|
|
conn redis.Conn
|
|
@@ -113,9 +167,13 @@ func (g *GuerrillaDBAndRedisBackend) saveMail() {
|
|
|
|
|
|
// receives values from the channel repeatedly until it is closed.
|
|
// receives values from the channel repeatedly until it is closed.
|
|
for {
|
|
for {
|
|
- payload := <-SaveMailChan
|
|
|
|
- recipient = payload.user + "@" + payload.host
|
|
|
|
- to = payload.user + "@" + g.config.PrimaryHost
|
|
|
|
|
|
+ payload := <-g.saveMailChan
|
|
|
|
+ if payload == nil {
|
|
|
|
+ log.Debug("No more payload")
|
|
|
|
+ g.wg.Done()
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ to = payload.recipient.User + "@" + g.config.PrimaryHost
|
|
length = len(payload.client.Data)
|
|
length = len(payload.client.Data)
|
|
ts := fmt.Sprintf("%d", time.Now().UnixNano())
|
|
ts := fmt.Sprintf("%d", time.Now().UnixNano())
|
|
payload.client.Subject = util.MimeHeaderDecode(payload.client.Subject)
|
|
payload.client.Subject = util.MimeHeaderDecode(payload.client.Subject)
|
|
@@ -128,7 +186,7 @@ func (g *GuerrillaDBAndRedisBackend) saveMail() {
|
|
var addHead string
|
|
var addHead string
|
|
addHead += "Delivered-To: " + to + "\r\n"
|
|
addHead += "Delivered-To: " + to + "\r\n"
|
|
addHead += "Received: from " + payload.client.Helo + " (" + payload.client.Helo + " [" + payload.client.Address + "])\r\n"
|
|
addHead += "Received: from " + payload.client.Helo + " (" + payload.client.Helo + " [" + payload.client.Address + "])\r\n"
|
|
- addHead += " by " + payload.host + " with SMTP id " + payload.client.Hash + "@" + payload.host + ";\r\n"
|
|
|
|
|
|
+ addHead += " by " + payload.recipient.Host + " with SMTP id " + payload.client.Hash + "@" + payload.recipient.Host + ";\r\n"
|
|
addHead += " " + time.Now().Format(time.RFC1123Z) + "\r\n"
|
|
addHead += " " + time.Now().Format(time.RFC1123Z) + "\r\n"
|
|
// compress to save space
|
|
// compress to save space
|
|
payload.client.Data = util.Compress(&addHead, &payload.client.Data)
|
|
payload.client.Data = util.Compress(&addHead, &payload.client.Data)
|