|
@@ -2,6 +2,7 @@ package backends
|
|
|
|
|
|
import (
|
|
|
"fmt"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
@@ -19,7 +20,9 @@ func init() {
|
|
|
}
|
|
|
|
|
|
type GuerrillaDBAndRedisBackend struct {
|
|
|
- config guerrillaDBAndRedisConfig
|
|
|
+ config guerrillaDBAndRedisConfig
|
|
|
+ saveMailChan chan *savePayload
|
|
|
+ wg *sync.WaitGroup
|
|
|
}
|
|
|
|
|
|
type guerrillaDBAndRedisConfig struct {
|
|
@@ -38,7 +41,7 @@ func convertError(name string) error {
|
|
|
return fmt.Errorf("failed to load backend config field (%s)", name)
|
|
|
}
|
|
|
|
|
|
-func (g *GuerrillaDBAndRedisBackend) Initialize(backendConfig guerrilla.BackendConfig) error {
|
|
|
+func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig guerrilla.BackendConfig) error {
|
|
|
var converted bool
|
|
|
|
|
|
if g.config.NumberOfWorkers, converted = backendConfig["save_workers_size"].(int); !converted {
|
|
@@ -69,13 +72,23 @@ func (g *GuerrillaDBAndRedisBackend) Initialize(backendConfig guerrilla.BackendC
|
|
|
return convertError("primary_mail_host")
|
|
|
}
|
|
|
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (g *GuerrillaDBAndRedisBackend) Initialize(backendConfig guerrilla.BackendConfig) error {
|
|
|
+ err := g.loadConfig(backendConfig)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
if err := g.testDbConnections(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- SaveMailChan = make(chan *savePayload, g.config.NumberOfWorkers)
|
|
|
+ g.saveMailChan = make(chan *savePayload, g.config.NumberOfWorkers)
|
|
|
|
|
|
// start some savemail workers
|
|
|
+ g.wg.Add(g.config.NumberOfWorkers)
|
|
|
for i := 0; i < g.config.NumberOfWorkers; i++ {
|
|
|
go g.saveMail()
|
|
|
}
|
|
@@ -83,10 +96,16 @@ func (g *GuerrillaDBAndRedisBackend) Initialize(backendConfig guerrilla.BackendC
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+func (g *GuerrillaDBAndRedisBackend) Finalize() error {
|
|
|
+ close(g.saveMailChan)
|
|
|
+ g.wg.Wait()
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
func (g *GuerrillaDBAndRedisBackend) Process(client *guerrilla.Client, user, host string) string {
|
|
|
// to do: timeout when adding to SaveMailChan
|
|
|
// place on the channel so that one of the save mail workers can pick it up
|
|
|
- SaveMailChan <- &savePayload{client: client, user: user, host: host}
|
|
|
+ g.saveMailChan <- &savePayload{client: client, user: user, host: host}
|
|
|
// wait for the save to complete
|
|
|
// or timeout
|
|
|
select {
|
|
@@ -107,8 +126,6 @@ type savePayload struct {
|
|
|
host string
|
|
|
}
|
|
|
|
|
|
-var SaveMailChan chan *savePayload
|
|
|
-
|
|
|
type redisClient struct {
|
|
|
count int
|
|
|
conn redis.Conn
|
|
@@ -145,7 +162,13 @@ func (g *GuerrillaDBAndRedisBackend) saveMail() {
|
|
|
|
|
|
// receives values from the channel repeatedly until it is closed.
|
|
|
for {
|
|
|
- payload := <-SaveMailChan
|
|
|
+ payload := <-g.saveMailChan
|
|
|
+ if payload == nil {
|
|
|
+ log.Debug("No more payload")
|
|
|
+ g.wg.Done()
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
recipient = payload.user + "@" + payload.host
|
|
|
to = payload.user + "@" + g.config.PrimaryHost
|
|
|
length = len(payload.client.Data)
|