123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- package backends
- import (
- "database/sql"
- "fmt"
- "strings"
- "time"
- "github.com/flashmob/go-guerrilla/mail"
- "math/big"
- "net"
- "runtime/debug"
- "github.com/flashmob/go-guerrilla/response"
- )
- // ----------------------------------------------------------------------------------
- // Processor Name: sql
- // ----------------------------------------------------------------------------------
- // Description : Saves the e.Data (email data) and e.DeliveryHeader together in sql
- // : using the hash generated by the "hash" processor and stored in
- // : e.Hashes
- // ----------------------------------------------------------------------------------
- // Config Options: mail_table string - name of table for storing emails
- // : sql_driver string - database driver name, eg. mysql
- // : sql_dsn string - driver-specific data source name
- // : primary_mail_host string - primary host name
- // : sql_max_open_conns - sets the maximum number of open connections
- // : to the database. The default is 0 (unlimited)
- // : sql_max_idle_conns - sets the maximum number of connections in the
- // : idle connection pool. The default is 2
- // : sql_max_conn_lifetime - sets the maximum amount of time
- // : a connection may be reused
- // --------------:-------------------------------------------------------------------
- // Input : e.Data
- // : e.DeliveryHeader generated by ParseHeader() processor
- // : e.MailFrom
- // : e.Subject - generated by by ParseHeader() processor
- // ----------------------------------------------------------------------------------
- // Output : Sets e.QueuedId with the first item fromHashes[0]
- // ----------------------------------------------------------------------------------
- func init() {
- processors["sql"] = func() Decorator {
- return SQL()
- }
- }
- type SQLProcessorConfig struct {
- Table string `json:"mail_table"`
- Driver string `json:"sql_driver"`
- DSN string `json:"sql_dsn"`
- SQLInsert string `json:"sql_insert,omitempty"`
- SQLValues string `json:"sql_values,omitempty"`
- PrimaryHost string `json:"primary_mail_host"`
- MaxConnLifetime string `json:"sql_max_conn_lifetime,omitempty"`
- MaxOpenConns int `json:"sql_max_open_conns,omitempty"`
- MaxIdleConns int `json:"sql_max_idle_conns,omitempty"`
- }
- type SQLProcessor struct {
- cache stmtCache
- config *SQLProcessorConfig
- }
- func (s *SQLProcessor) connect() (*sql.DB, error) {
- var db *sql.DB
- var err error
- if db, err = sql.Open(s.config.Driver, s.config.DSN); err != nil {
- Log().Error("cannot open database: ", err)
- return nil, err
- }
- if s.config.MaxOpenConns != 0 {
- db.SetMaxOpenConns(s.config.MaxOpenConns)
- }
- if s.config.MaxIdleConns != 0 {
- db.SetMaxIdleConns(s.config.MaxIdleConns)
- }
- if s.config.MaxConnLifetime != "" {
- t, err := time.ParseDuration(s.config.MaxConnLifetime)
- if err != nil {
- return nil, err
- }
- db.SetConnMaxLifetime(t)
- }
- // do we have permission to access the table?
- _, err = db.Query("SELECT mail_id FROM " + s.config.Table + " LIMIT 1")
- if err != nil {
- return nil, err
- }
- return db, err
- }
- // prepares the sql query with the number of rows that can be batched with it
- func (s *SQLProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
- var sqlstr, values string
- if rows == 0 {
- panic("rows argument cannot be 0")
- }
- if s.cache[rows-1] != nil {
- return s.cache[rows-1]
- }
- if s.config.SQLInsert != "" {
- sqlstr = s.config.SQLInsert
- if !strings.HasSuffix(sqlstr, " ") {
- // Add a trailing space so we can concatinate our values string
- // without causing a syntax error
- sqlstr = sqlstr + " "
- }
- } else {
- // Default to MySQL SQL
- sqlstr = "INSERT INTO " + s.config.Table + " "
- sqlstr += "(`date`, `to`, `from`, `subject`, `body`, `mail`, `spam_score`, "
- sqlstr += "`hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, "
- sqlstr += "`return_path`, `is_tls`, `message_id`, `reply_to`, `sender`)"
- sqlstr += " VALUES "
- }
- if s.config.SQLValues != "" {
- values = s.config.SQLValues
- } else {
- values = "(NOW(), ?, ?, ?, ? , ?, 0, ?, ?, ?, 0, ?, ?, ?, ?, ?, ?)"
- }
- // add more rows
- comma := ""
- for i := 0; i < rows; i++ {
- sqlstr += comma + values
- if comma == "" {
- comma = ","
- }
- }
- stmt, sqlErr := db.Prepare(sqlstr)
- if sqlErr != nil {
- Log().WithError(sqlErr).Panic("failed while db.Prepare(INSERT...)")
- }
- // cache it
- s.cache[rows-1] = stmt
- return stmt
- }
- func (s *SQLProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) (execErr error) {
- defer func() {
- if r := recover(); r != nil {
- Log().Error("Recovered form panic:", r, string(debug.Stack()))
- sum := 0
- for _, v := range *vals {
- if str, ok := v.(string); ok {
- sum = sum + len(str)
- }
- }
- Log().Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
- panic("query failed")
- }
- }()
- // prepare the query used to insert when rows reaches batchMax
- insertStmt = s.prepareInsertQuery(c, db)
- _, execErr = insertStmt.Exec(*vals...)
- if execErr != nil {
- Log().WithError(execErr).Error("There was a problem the insert")
- }
- return
- }
- // for storing ip addresses in the ip_addr column
- func (s *SQLProcessor) ip2bint(ip string) *big.Int {
- bint := big.NewInt(0)
- addr := net.ParseIP(ip)
- if strings.Index(ip, "::") > 0 {
- bint.SetBytes(addr.To16())
- } else {
- bint.SetBytes(addr.To4())
- }
- return bint
- }
- func (s *SQLProcessor) fillAddressFromHeader(e *mail.Envelope, headerKey string) string {
- if v, ok := e.Header[headerKey]; ok {
- addr, err := mail.NewAddress(v[0])
- if err != nil {
- return ""
- }
- return addr.String()
- }
- return ""
- }
- func SQL() Decorator {
- var config *SQLProcessorConfig
- var vals []interface{}
- var db *sql.DB
- s := &SQLProcessor{}
- // open the database connection (it will also check if we can select the table)
- Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
- configType := BaseConfig(&SQLProcessorConfig{})
- bcfg, err := Svc.ExtractConfig(backendConfig, configType)
- if err != nil {
- return err
- }
- config = bcfg.(*SQLProcessorConfig)
- s.config = config
- db, err = s.connect()
- if err != nil {
- return err
- }
- return nil
- }))
- // shutdown will close the database connection
- Svc.AddShutdowner(ShutdownWith(func() error {
- if db != nil {
- return db.Close()
- }
- return nil
- }))
- return func(p Processor) Processor {
- return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
- if task == TaskSaveMail {
- var to, body string
- hash := ""
- if len(e.Hashes) > 0 {
- hash = e.Hashes[0]
- e.QueuedId = e.Hashes[0]
- }
- var co *DataCompressor
- // a compressor was set by the Compress processor
- if c, ok := e.Values["zlib-compressor"]; ok {
- body = "gzip"
- co = c.(*DataCompressor)
- }
- // was saved in redis by the Redis processor
- if _, ok := e.Values["redis"]; ok {
- body = "redis"
- }
- for i := range e.RcptTo {
- // use the To header, otherwise rcpt to
- to = trimToLimit(s.fillAddressFromHeader(e, "To"), 255)
- if to == "" {
- // trimToLimit(strings.TrimSpace(e.RcptTo[i].User)+"@"+config.PrimaryHost, 255)
- to = trimToLimit(strings.TrimSpace(e.RcptTo[i].String()), 255)
- }
- mid := trimToLimit(s.fillAddressFromHeader(e, "Message-Id"), 255)
- if mid == "" {
- mid = fmt.Sprintf("%s.%s@%s", hash, e.RcptTo[i].User, config.PrimaryHost)
- }
- // replyTo is the 'Reply-to' header, it may be blank
- replyTo := trimToLimit(s.fillAddressFromHeader(e, "Reply-To"), 255)
- // sender is the 'Sender' header, it may be blank
- sender := trimToLimit(s.fillAddressFromHeader(e, "Sender"), 255)
- recipient := trimToLimit(strings.TrimSpace(e.RcptTo[i].String()), 255)
- contentType := ""
- if v, ok := e.Header["Content-Type"]; ok {
- contentType = trimToLimit(v[0], 255)
- }
- // build the values for the query
- vals = []interface{}{} // clear the vals
- vals = append(vals,
- to,
- trimToLimit(e.MailFrom.String(), 255), // from
- trimToLimit(e.Subject, 255),
- body, // body describes how to interpret the data, eg 'redis' means stored in redis, and 'gzip' stored in mysql, using gzip compression
- )
- // `mail` column
- if body == "redis" {
- // data already saved in redis
- vals = append(vals, "")
- } else if co != nil {
- // use a compressor (automatically adds e.DeliveryHeader)
- vals = append(vals, co.String())
- } else {
- vals = append(vals, e.String())
- }
- vals = append(vals,
- hash, // hash (redis hash if saved in redis)
- contentType,
- recipient,
- s.ip2bint(e.RemoteIP).Bytes(), // ip_addr store as varbinary(16)
- trimToLimit(e.MailFrom.String(), 255), // return_path
- // is_tls
- e.TLS,
- // message_id
- mid,
- // reply_to
- replyTo,
- sender,
- )
- stmt := s.prepareInsertQuery(1, db)
- err := s.doQuery(1, db, stmt, &vals)
- if err != nil {
- return NewResult(fmt.Sprint("554 Error: could not save email")), StorageError
- }
- }
- // continue to the next Processor in the decorator chain
- return p.Process(e, task)
- } else if task == TaskValidateRcpt {
- // if you need to validate the e.Rcpt then change to:
- if len(e.RcptTo) > 0 {
- // since this is called each time a recipient is added
- // validate only the _last_ recipient that was appended
- last := e.RcptTo[len(e.RcptTo)-1]
- if len(last.User) > 255 {
- // return with an error
- return NewResult(response.Canned.FailRcptCmd), NoSuchUser
- }
- }
- // continue to the next processor
- return p.Process(e, task)
- } else {
- return p.Process(e, task)
- }
- })
- }
- }
|