|
@@ -68,7 +68,7 @@ func convertError(name string) error {
|
|
// from the main config file 'backend' config "backend_config"
|
|
// from the main config file 'backend' config "backend_config"
|
|
// Now we need to convert each type and copy into the guerrillaDBAndRedisConfig struct
|
|
// Now we need to convert each type and copy into the guerrillaDBAndRedisConfig struct
|
|
func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig BackendConfig) (err error) {
|
|
func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig BackendConfig) (err error) {
|
|
- configType := baseConfig(&guerrillaDBAndRedisConfig{})
|
|
|
|
|
|
+ configType := BaseConfig(&guerrillaDBAndRedisConfig{})
|
|
bcfg, err := Svc.ExtractConfig(backendConfig, configType)
|
|
bcfg, err := Svc.ExtractConfig(backendConfig, configType)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
@@ -164,7 +164,7 @@ func (g *GuerrillaDBAndRedisBackend) prepareInsertQuery(rows int, db *sql.DB) *s
|
|
}
|
|
}
|
|
stmt, sqlErr := db.Prepare(sqlstr)
|
|
stmt, sqlErr := db.Prepare(sqlstr)
|
|
if sqlErr != nil {
|
|
if sqlErr != nil {
|
|
- mainlog.WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
|
|
|
|
|
|
+ Log().WithError(sqlErr).Fatalf("failed while db.Prepare(INSERT...)")
|
|
}
|
|
}
|
|
// cache it
|
|
// cache it
|
|
g.cache[rows-1] = stmt
|
|
g.cache[rows-1] = stmt
|
|
@@ -176,14 +176,14 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
|
|
defer func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
if r := recover(); r != nil {
|
|
//logln(1, fmt.Sprintf("Recovered in %v", r))
|
|
//logln(1, fmt.Sprintf("Recovered in %v", r))
|
|
- mainlog.Error("Recovered form panic:", r, string(debug.Stack()))
|
|
|
|
|
|
+ Log().Error("Recovered form panic:", r, string(debug.Stack()))
|
|
sum := 0
|
|
sum := 0
|
|
for _, v := range *vals {
|
|
for _, v := range *vals {
|
|
if str, ok := v.(string); ok {
|
|
if str, ok := v.(string); ok {
|
|
sum = sum + len(str)
|
|
sum = sum + len(str)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- mainlog.Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
|
|
|
|
|
|
+ Log().Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
|
|
panic("query failed")
|
|
panic("query failed")
|
|
}
|
|
}
|
|
}()
|
|
}()
|
|
@@ -191,7 +191,7 @@ func (g *GuerrillaDBAndRedisBackend) doQuery(c int, db *sql.DB, insertStmt *sql.
|
|
insertStmt = g.prepareInsertQuery(c, db)
|
|
insertStmt = g.prepareInsertQuery(c, db)
|
|
_, execErr = insertStmt.Exec(*vals...)
|
|
_, execErr = insertStmt.Exec(*vals...)
|
|
if execErr != nil {
|
|
if execErr != nil {
|
|
- mainlog.WithError(execErr).Error("There was a problem the insert")
|
|
|
|
|
|
+ Log().WithError(execErr).Error("There was a problem the insert")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -225,7 +225,7 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
|
|
}
|
|
}
|
|
defer func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
if r := recover(); r != nil {
|
|
- mainlog.Error("insertQueryBatcher caught a panic", r)
|
|
|
|
|
|
+ Log().Error("insertQueryBatcher caught a panic", r)
|
|
}
|
|
}
|
|
}()
|
|
}()
|
|
// Keep getting values from feeder and add to batch.
|
|
// Keep getting values from feeder and add to batch.
|
|
@@ -237,14 +237,14 @@ func (g *GuerrillaDBAndRedisBackend) insertQueryBatcher(feeder chan []interface{
|
|
// it may panic when reading on a closed feeder channel. feederOK detects if it was closed
|
|
// it may panic when reading on a closed feeder channel. feederOK detects if it was closed
|
|
case row, feederOk := <-feeder:
|
|
case row, feederOk := <-feeder:
|
|
if row == nil {
|
|
if row == nil {
|
|
- mainlog.Info("Query batchaer exiting")
|
|
|
|
|
|
+ Log().Info("Query batchaer exiting")
|
|
// Insert any remaining rows
|
|
// Insert any remaining rows
|
|
insert(count)
|
|
insert(count)
|
|
return feederOk
|
|
return feederOk
|
|
}
|
|
}
|
|
vals = append(vals, row...)
|
|
vals = append(vals, row...)
|
|
count++
|
|
count++
|
|
- mainlog.Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", workerId)
|
|
|
|
|
|
+ Log().Debug("new feeder row:", row, " cols:", len(row), " count:", count, " worker", workerId)
|
|
if count >= GuerrillaDBAndRedisBatchMax {
|
|
if count >= GuerrillaDBAndRedisBatchMax {
|
|
insert(GuerrillaDBAndRedisBatchMax)
|
|
insert(GuerrillaDBAndRedisBatchMax)
|
|
}
|
|
}
|
|
@@ -283,7 +283,7 @@ func (g *GuerrillaDBAndRedisBackend) mysqlConnect() (*sql.DB, error) {
|
|
Params: map[string]string{"collation": "utf8_general_ci"},
|
|
Params: map[string]string{"collation": "utf8_general_ci"},
|
|
}
|
|
}
|
|
if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
|
|
if db, err := sql.Open("mysql", conf.FormatDSN()); err != nil {
|
|
- mainlog.Error("cannot open mysql", err)
|
|
|
|
|
|
+ Log().Error("cannot open mysql", err)
|
|
return nil, err
|
|
return nil, err
|
|
} else {
|
|
} else {
|
|
return db, nil
|
|
return db, nil
|
|
@@ -326,7 +326,7 @@ func GuerrillaDbReddis() Decorator {
|
|
g.config = bcfg.(*guerrillaDBAndRedisConfig)
|
|
g.config = bcfg.(*guerrillaDBAndRedisConfig)
|
|
db, err = g.mysqlConnect()
|
|
db, err = g.mysqlConnect()
|
|
if err != nil {
|
|
if err != nil {
|
|
- mainlog.Fatalf("cannot open mysql: %s", err)
|
|
|
|
|
|
+ Log().Fatalf("cannot open mysql: %s", err)
|
|
}
|
|
}
|
|
return nil
|
|
return nil
|
|
}))
|
|
}))
|
|
@@ -338,11 +338,11 @@ func GuerrillaDbReddis() Decorator {
|
|
go func() {
|
|
go func() {
|
|
for {
|
|
for {
|
|
if feederOK := g.insertQueryBatcher(feeder, db); !feederOK {
|
|
if feederOK := g.insertQueryBatcher(feeder, db); !feederOK {
|
|
- mainlog.Debug("insertQueryBatcher exited")
|
|
|
|
|
|
+ Log().Debug("insertQueryBatcher exited")
|
|
return
|
|
return
|
|
}
|
|
}
|
|
// if insertQueryBatcher panics, it can recover and go in again
|
|
// if insertQueryBatcher panics, it can recover and go in again
|
|
- mainlog.Debug("resuming insertQueryBatcher")
|
|
|
|
|
|
+ Log().Debug("resuming insertQueryBatcher")
|
|
}
|
|
}
|
|
|
|
|
|
}()
|
|
}()
|
|
@@ -350,11 +350,11 @@ func GuerrillaDbReddis() Decorator {
|
|
defer func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
if r := recover(); r != nil {
|
|
//recover form closed channel
|
|
//recover form closed channel
|
|
- mainlog.Error("panic recovered in saveMailWorker", r)
|
|
|
|
|
|
+ Log().Error("panic recovered in saveMailWorker", r)
|
|
}
|
|
}
|
|
db.Close()
|
|
db.Close()
|
|
if redisClient.conn != nil {
|
|
if redisClient.conn != nil {
|
|
- mainlog.Infof("closed redis")
|
|
|
|
|
|
+ Log().Infof("closed redis")
|
|
redisClient.conn.Close()
|
|
redisClient.conn.Close()
|
|
}
|
|
}
|
|
// close the feeder & wait for query batcher to exit.
|
|
// close the feeder & wait for query batcher to exit.
|
|
@@ -368,7 +368,7 @@ func GuerrillaDbReddis() Decorator {
|
|
return func(c Processor) Processor {
|
|
return func(c Processor) Processor {
|
|
return ProcessWith(func(e *envelope.Envelope, task SelectTask) (Result, error) {
|
|
return ProcessWith(func(e *envelope.Envelope, task SelectTask) (Result, error) {
|
|
if task == TaskSaveMail {
|
|
if task == TaskSaveMail {
|
|
- mainlog.Debug("Got mail from chan", e.RemoteAddress)
|
|
|
|
|
|
+ Log().Debug("Got mail from chan", e.RemoteAddress)
|
|
to = trimToLimit(strings.TrimSpace(e.RcptTo[0].User)+"@"+g.config.PrimaryHost, 255)
|
|
to = trimToLimit(strings.TrimSpace(e.RcptTo[0].User)+"@"+g.config.PrimaryHost, 255)
|
|
e.Helo = trimToLimit(e.Helo, 255)
|
|
e.Helo = trimToLimit(e.Helo, 255)
|
|
e.RcptTo[0].Host = trimToLimit(e.RcptTo[0].Host, 255)
|
|
e.RcptTo[0].Host = trimToLimit(e.RcptTo[0].Host, 255)
|
|
@@ -402,7 +402,7 @@ func GuerrillaDbReddis() Decorator {
|
|
data.clear() // blank
|
|
data.clear() // blank
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- mainlog.WithError(redisErr).Warn("Error while connecting redis")
|
|
|
|
|
|
+ Log().WithError(redisErr).Warn("Error while connecting redis")
|
|
}
|
|
}
|
|
|
|
|
|
vals = []interface{}{} // clear the vals
|
|
vals = []interface{}{} // clear the vals
|