|
@@ -4,29 +4,26 @@ import (
|
|
|
"database/sql"
|
|
|
"fmt"
|
|
|
"strings"
|
|
|
- "time"
|
|
|
|
|
|
"github.com/flashmob/go-guerrilla/mail"
|
|
|
- "github.com/go-sql-driver/mysql"
|
|
|
|
|
|
- "github.com/flashmob/go-guerrilla/response"
|
|
|
"math/big"
|
|
|
"net"
|
|
|
"runtime/debug"
|
|
|
+
|
|
|
+ "github.com/flashmob/go-guerrilla/response"
|
|
|
)
|
|
|
|
|
|
// ----------------------------------------------------------------------------------
|
|
|
-// Processor Name: mysql
|
|
|
+// Processor Name: sql
|
|
|
// ----------------------------------------------------------------------------------
|
|
|
-// Description : Saves the e.Data (email data) and e.DeliveryHeader together in mysql
|
|
|
+// Description : Saves the e.Data (email data) and e.DeliveryHeader together in sql
|
|
|
// : using the hash generated by the "hash" processor and stored in
|
|
|
// : e.Hashes
|
|
|
// ----------------------------------------------------------------------------------
|
|
|
-// Config Options: mail_table string - mysql table name
|
|
|
-// : mysql_db string - mysql database name
|
|
|
-// : mysql_host string - mysql host name, eg. 127.0.0.1
|
|
|
-// : mysql_pass string - mysql password
|
|
|
-// : mysql_user string - mysql username
|
|
|
+// Config Options: mail_table string - name of table for storing emails
|
|
|
+// : sql_driver string - database driver name, eg. mysql
|
|
|
+// : sql_dsn string - driver-specific data source name
|
|
|
// : primary_mail_host string - primary host name
|
|
|
// --------------:-------------------------------------------------------------------
|
|
|
// Input : e.Data
|
|
@@ -37,64 +34,47 @@ import (
|
|
|
// Output : Sets e.QueuedId with the first item fromHashes[0]
|
|
|
// ----------------------------------------------------------------------------------
|
|
|
func init() {
|
|
|
- processors["mysql"] = func() Decorator {
|
|
|
- return MySql()
|
|
|
+ processors["sql"] = func() Decorator {
|
|
|
+ return SQL()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-const procMySQLReadTimeout = time.Second * 10
|
|
|
-const procMySQLWriteTimeout = time.Second * 10
|
|
|
-
|
|
|
-type MysqlProcessorConfig struct {
|
|
|
- MysqlTable string `json:"mysql_mail_table"`
|
|
|
- MysqlDB string `json:"mysql_db"`
|
|
|
- MysqlHost string `json:"mysql_host"`
|
|
|
- MysqlPass string `json:"mysql_pass"`
|
|
|
- MysqlUser string `json:"mysql_user"`
|
|
|
+type SQLProcessorConfig struct {
|
|
|
+ Table string `json:"mail_table"`
|
|
|
+ Driver string `json:"sql_driver"`
|
|
|
+ DSN string `json:"sql_dsn"`
|
|
|
PrimaryHost string `json:"primary_mail_host"`
|
|
|
}
|
|
|
|
|
|
-type MysqlProcessor struct {
|
|
|
+type SQLProcessor struct {
|
|
|
cache stmtCache
|
|
|
- config *MysqlProcessorConfig
|
|
|
+ config *SQLProcessorConfig
|
|
|
}
|
|
|
|
|
|
-func (m *MysqlProcessor) connect(config *MysqlProcessorConfig) (*sql.DB, error) {
|
|
|
+func (s *SQLProcessor) connect() (*sql.DB, error) {
|
|
|
var db *sql.DB
|
|
|
var err error
|
|
|
- conf := mysql.Config{
|
|
|
- User: config.MysqlUser,
|
|
|
- Passwd: config.MysqlPass,
|
|
|
- DBName: config.MysqlDB,
|
|
|
- Net: "tcp",
|
|
|
- Addr: config.MysqlHost,
|
|
|
- ReadTimeout: procMySQLReadTimeout,
|
|
|
- WriteTimeout: procMySQLWriteTimeout,
|
|
|
- Params: map[string]string{"collation": "utf8_general_ci"},
|
|
|
- }
|
|
|
- if db, err = sql.Open("mysql", conf.FormatDSN()); err != nil {
|
|
|
- Log().Error("cannot open mysql", err)
|
|
|
+ if db, err = sql.Open(s.config.Driver, s.config.DSN); err != nil {
|
|
|
+ Log().Error("cannot open database: ", err)
|
|
|
return nil, err
|
|
|
}
|
|
|
// do we have permission to access the table?
|
|
|
- _, err = db.Query("SELECT mail_id FROM " + m.config.MysqlTable + " LIMIT 1")
|
|
|
+ _, err = db.Query("SELECT mail_id FROM " + s.config.Table + " LIMIT 1")
|
|
|
if err != nil {
|
|
|
- //Log().Error("cannot select table", err)
|
|
|
return nil, err
|
|
|
}
|
|
|
- Log().Info("connected to mysql on tcp ", config.MysqlHost)
|
|
|
return db, err
|
|
|
}
|
|
|
|
|
|
// prepares the sql query with the number of rows that can be batched with it
|
|
|
-func (g *MysqlProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
|
|
|
+func (s *SQLProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
|
|
|
if rows == 0 {
|
|
|
panic("rows argument cannot be 0")
|
|
|
}
|
|
|
- if g.cache[rows-1] != nil {
|
|
|
- return g.cache[rows-1]
|
|
|
+ if s.cache[rows-1] != nil {
|
|
|
+ return s.cache[rows-1]
|
|
|
}
|
|
|
- sqlstr := "INSERT INTO " + g.config.MysqlTable + " "
|
|
|
+ sqlstr := "INSERT INTO " + s.config.Table + " "
|
|
|
sqlstr += "(`date`, `to`, `from`, `subject`, `body`, `mail`, `spam_score`, "
|
|
|
sqlstr += "`hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, "
|
|
|
sqlstr += "`return_path`, `is_tls`, `message_id`, `reply_to`, `sender`)"
|
|
@@ -113,11 +93,11 @@ func (g *MysqlProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
|
|
|
Log().WithError(sqlErr).Panic("failed while db.Prepare(INSERT...)")
|
|
|
}
|
|
|
// cache it
|
|
|
- g.cache[rows-1] = stmt
|
|
|
+ s.cache[rows-1] = stmt
|
|
|
return stmt
|
|
|
}
|
|
|
|
|
|
-func (g *MysqlProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) (execErr error) {
|
|
|
+func (s *SQLProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) (execErr error) {
|
|
|
defer func() {
|
|
|
if r := recover(); r != nil {
|
|
|
Log().Error("Recovered form panic:", r, string(debug.Stack()))
|
|
@@ -132,7 +112,7 @@ func (g *MysqlProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *
|
|
|
}
|
|
|
}()
|
|
|
// prepare the query used to insert when rows reaches batchMax
|
|
|
- insertStmt = g.prepareInsertQuery(c, db)
|
|
|
+ insertStmt = s.prepareInsertQuery(c, db)
|
|
|
_, execErr = insertStmt.Exec(*vals...)
|
|
|
if execErr != nil {
|
|
|
Log().WithError(execErr).Error("There was a problem the insert")
|
|
@@ -141,7 +121,7 @@ func (g *MysqlProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *
|
|
|
}
|
|
|
|
|
|
// for storing ip addresses in the ip_addr column
|
|
|
-func (g *MysqlProcessor) ip2bint(ip string) *big.Int {
|
|
|
+func (s *SQLProcessor) ip2bint(ip string) *big.Int {
|
|
|
bint := big.NewInt(0)
|
|
|
addr := net.ParseIP(ip)
|
|
|
if strings.Index(ip, "::") > 0 {
|
|
@@ -152,7 +132,7 @@ func (g *MysqlProcessor) ip2bint(ip string) *big.Int {
|
|
|
return bint
|
|
|
}
|
|
|
|
|
|
-func (g *MysqlProcessor) fillAddressFromHeader(e *mail.Envelope, headerKey string) string {
|
|
|
+func (s *SQLProcessor) fillAddressFromHeader(e *mail.Envelope, headerKey string) string {
|
|
|
if v, ok := e.Header[headerKey]; ok {
|
|
|
addr, err := mail.NewAddress(v[0])
|
|
|
if err != nil {
|
|
@@ -163,23 +143,22 @@ func (g *MysqlProcessor) fillAddressFromHeader(e *mail.Envelope, headerKey strin
|
|
|
return ""
|
|
|
}
|
|
|
|
|
|
-func MySql() Decorator {
|
|
|
-
|
|
|
- var config *MysqlProcessorConfig
|
|
|
+func SQL() Decorator {
|
|
|
+ var config *SQLProcessorConfig
|
|
|
var vals []interface{}
|
|
|
var db *sql.DB
|
|
|
- m := &MysqlProcessor{}
|
|
|
+ s := &SQLProcessor{}
|
|
|
|
|
|
// open the database connection (it will also check if we can select the table)
|
|
|
Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
|
|
|
- configType := BaseConfig(&MysqlProcessorConfig{})
|
|
|
+ configType := BaseConfig(&SQLProcessorConfig{})
|
|
|
bcfg, err := Svc.ExtractConfig(backendConfig, configType)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- config = bcfg.(*MysqlProcessorConfig)
|
|
|
- m.config = config
|
|
|
- db, err = m.connect(config)
|
|
|
+ config = bcfg.(*SQLProcessorConfig)
|
|
|
+ s.config = config
|
|
|
+ db, err = s.connect()
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -221,19 +200,19 @@ func MySql() Decorator {
|
|
|
for i := range e.RcptTo {
|
|
|
|
|
|
// use the To header, otherwise rcpt to
|
|
|
- to = trimToLimit(m.fillAddressFromHeader(e, "To"), 255)
|
|
|
+ to = trimToLimit(s.fillAddressFromHeader(e, "To"), 255)
|
|
|
if to == "" {
|
|
|
// trimToLimit(strings.TrimSpace(e.RcptTo[i].User)+"@"+config.PrimaryHost, 255)
|
|
|
to = trimToLimit(strings.TrimSpace(e.RcptTo[i].String()), 255)
|
|
|
}
|
|
|
- mid := trimToLimit(m.fillAddressFromHeader(e, "Message-Id"), 255)
|
|
|
+ mid := trimToLimit(s.fillAddressFromHeader(e, "Message-Id"), 255)
|
|
|
if mid == "" {
|
|
|
mid = fmt.Sprintf("%s.%s@%s", hash, e.RcptTo[i].User, config.PrimaryHost)
|
|
|
}
|
|
|
// replyTo is the 'Reply-to' header, it may be blank
|
|
|
- replyTo := trimToLimit(m.fillAddressFromHeader(e, "Reply-To"), 255)
|
|
|
+ replyTo := trimToLimit(s.fillAddressFromHeader(e, "Reply-To"), 255)
|
|
|
// sender is the 'Sender' header, it may be blank
|
|
|
- sender := trimToLimit(m.fillAddressFromHeader(e, "Sender"), 255)
|
|
|
+ sender := trimToLimit(s.fillAddressFromHeader(e, "Sender"), 255)
|
|
|
|
|
|
recipient := trimToLimit(strings.TrimSpace(e.RcptTo[i].String()), 255)
|
|
|
contentType := ""
|
|
@@ -265,7 +244,7 @@ func MySql() Decorator {
|
|
|
hash, // hash (redis hash if saved in redis)
|
|
|
contentType,
|
|
|
recipient,
|
|
|
- m.ip2bint(e.RemoteIP).Bytes(), // ip_addr store as varbinary(16)
|
|
|
+ s.ip2bint(e.RemoteIP).Bytes(), // ip_addr store as varbinary(16)
|
|
|
trimToLimit(e.MailFrom.String(), 255), // return_path
|
|
|
e.TLS, // is_tls
|
|
|
mid, // message_id
|
|
@@ -273,8 +252,8 @@ func MySql() Decorator {
|
|
|
sender,
|
|
|
)
|
|
|
|
|
|
- stmt := m.prepareInsertQuery(1, db)
|
|
|
- err := m.doQuery(1, db, stmt, &vals)
|
|
|
+ stmt := s.prepareInsertQuery(1, db)
|
|
|
+ err := s.doQuery(1, db, stmt, &vals)
|
|
|
if err != nil {
|
|
|
return NewResult(fmt.Sprint("554 Error: could not save email")), StorageError
|
|
|
}
|