|
@@ -16,7 +16,7 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
// ----------------------------------------------------------------------------------
|
|
// ----------------------------------------------------------------------------------
|
|
-// Processor Name: Guerrilla-reds-db
|
|
|
|
|
|
+// Processor Name: GuerrillaRedsDB
|
|
// ----------------------------------------------------------------------------------
|
|
// ----------------------------------------------------------------------------------
|
|
// Description : Saves the body to redis, meta data to mysql. Example
|
|
// Description : Saves the body to redis, meta data to mysql. Example
|
|
// ----------------------------------------------------------------------------------
|
|
// ----------------------------------------------------------------------------------
|
|
@@ -27,7 +27,7 @@ import (
|
|
// Output :
|
|
// Output :
|
|
// ----------------------------------------------------------------------------------
|
|
// ----------------------------------------------------------------------------------
|
|
func init() {
|
|
func init() {
|
|
- processors["GuerrillaRedisDB"] = func() Decorator {
|
|
|
|
|
|
+ processors["guerrillaredisdb"] = func() Decorator {
|
|
return GuerrillaDbReddis()
|
|
return GuerrillaDbReddis()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -279,12 +279,17 @@ func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
|
|
Params: map[string]string{"collation": "utf8_general_ci"},
|
|
Params: map[string]string{"collation": "utf8_general_ci"},
|
|
}
|
|
}
|
|
if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
|
|
if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
|
|
- Log().Error("cannot open mysql", err)
|
|
|
|
|
|
+ Log().Error("cannot open mysql", err, "]")
|
|
return nil, err
|
|
return nil, err
|
|
} else {
|
|
} else {
|
|
|
|
+ // do we have access?
|
|
|
|
+ _, err = db.Query("SELECT mail_id FROM " + g.config.MysqlTable + " LIMIT 1")
|
|
|
|
+ if err != nil {
|
|
|
|
+ Log().Error("cannot select table", err)
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
return db, nil
|
|
return db, nil
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
func (c *redisClient) redisConnection(redisInterface string) (err error) {
|
|
func (c *redisClient) redisConnection(redisInterface string) (err error) {
|
|
@@ -313,6 +318,9 @@ func GuerrillaDbReddis() Decorator {
|
|
|
|
|
|
var redisErr error
|
|
var redisErr error
|
|
|
|
|
|
|
|
+ workerId++
|
|
|
|
+ feeder := make(chan []interface{}, 1)
|
|
|
|
+
|
|
Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
|
|
Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
|
|
configType := BaseConfig(&guerrillaDBAndRedisConfig{})
|
|
configType := BaseConfig(&guerrillaDBAndRedisConfig{})
|
|
bcfg, err := Svc.ExtractConfig(backendConfig, configType)
|
|
bcfg, err := Svc.ExtractConfig(backendConfig, configType)
|
|
@@ -322,32 +330,23 @@ func GuerrillaDbReddis() Decorator {
|
|
g.config = bcfg.(*guerrillaDBAndRedisConfig)
|
|
g.config = bcfg.(*guerrillaDBAndRedisConfig)
|
|
db, err = g.mysqlConnect()
|
|
db, err = g.mysqlConnect()
|
|
if err != nil {
|
|
if err != nil {
|
|
- Log().Fatalf("cannot open mysql: %s", err)
|
|
|
|
|
|
+ return err
|
|
}
|
|
}
|
|
|
|
+ // start the query SQL batching where we will send data via the feeder channel
|
|
|
|
+ go func() {
|
|
|
|
+ for {
|
|
|
|
+ if feederOK := g.insertQueryBatcher(feeder, db); !feederOK {
|
|
|
|
+ Log().Debug("insertQueryBatcher exited")
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ // if insertQueryBatcher panics, it can recover and go in again
|
|
|
|
+ Log().Debug("resuming insertQueryBatcher")
|
|
|
|
+ }
|
|
|
|
+ }()
|
|
return nil
|
|
return nil
|
|
}))
|
|
}))
|
|
|
|
|
|
- workerId++
|
|
|
|
-
|
|
|
|
- // start the query SQL batching where we will send data via the feeder channel
|
|
|
|
- feeder := make(chan []interface{}, 1)
|
|
|
|
- go func() {
|
|
|
|
- for {
|
|
|
|
- if feederOK := g.insertQueryBatcher(feeder, db); !feederOK {
|
|
|
|
- Log().Debug("insertQueryBatcher exited")
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- // if insertQueryBatcher panics, it can recover and go in again
|
|
|
|
- Log().Debug("resuming insertQueryBatcher")
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- }()
|
|
|
|
-
|
|
|
|
- defer func() {
|
|
|
|
- if r := recover(); r != nil {
|
|
|
|
- //recover form closed channel
|
|
|
|
- Log().Error("panic recovered in saveMailWorker", r)
|
|
|
|
- }
|
|
|
|
|
|
+ Svc.AddShutdowner(ShutdownWith(func() error {
|
|
db.Close()
|
|
db.Close()
|
|
if redisClient.conn != nil {
|
|
if redisClient.conn != nil {
|
|
Log().Infof("closed redis")
|
|
Log().Infof("closed redis")
|
|
@@ -356,15 +355,16 @@ func GuerrillaDbReddis() Decorator {
|
|
// close the feeder & wait for query batcher to exit.
|
|
// close the feeder & wait for query batcher to exit.
|
|
close(feeder)
|
|
close(feeder)
|
|
g.batcherWg.Wait()
|
|
g.batcherWg.Wait()
|
|
|
|
+ return nil
|
|
|
|
+ }))
|
|
|
|
|
|
- }()
|
|
|
|
var vals []interface{}
|
|
var vals []interface{}
|
|
data := newCompressedData()
|
|
data := newCompressedData()
|
|
|
|
|
|
return func(c Processor) Processor {
|
|
return func(c Processor) Processor {
|
|
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
|
|
return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
|
|
if task == TaskSaveMail {
|
|
if task == TaskSaveMail {
|
|
- Log().Debug("Got mail from chan", e.RemoteIP)
|
|
|
|
|
|
+ Log().Debug("Got mail from chan,", e.RemoteIP)
|
|
to = trimToLimit(strings.TrimSpace(e.RcptTo[0].User)+"@"+g.config.PrimaryHost, 255)
|
|
to = trimToLimit(strings.TrimSpace(e.RcptTo[0].User)+"@"+g.config.PrimaryHost, 255)
|
|
e.Helo = trimToLimit(e.Helo, 255)
|
|
e.Helo = trimToLimit(e.Helo, 255)
|
|
e.RcptTo[0].Host = trimToLimit(e.RcptTo[0].Host, 255)
|
|
e.RcptTo[0].Host = trimToLimit(e.RcptTo[0].Host, 255)
|
|
@@ -413,6 +413,8 @@ func GuerrillaDbReddis() Decorator {
|
|
e.RemoteIP,
|
|
e.RemoteIP,
|
|
trimToLimit(e.MailFrom.String(), 255),
|
|
trimToLimit(e.MailFrom.String(), 255),
|
|
e.TLS)
|
|
e.TLS)
|
|
|
|
+ // give the values to the query batcher
|
|
|
|
+ feeder <- vals
|
|
return c.Process(e, task)
|
|
return c.Process(e, task)
|
|
|
|
|
|
} else {
|
|
} else {
|