p_sql.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. package backends
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/flashmob/go-guerrilla/mail"
  8. "math/big"
  9. "net"
  10. "runtime/debug"
  11. "github.com/flashmob/go-guerrilla/response"
  12. )
  13. // ----------------------------------------------------------------------------------
  14. // Processor Name: sql
  15. // ----------------------------------------------------------------------------------
  16. // Description : Saves the e.Data (email data) and e.DeliveryHeader together in sql
  17. // : using the hash generated by the "hash" processor and stored in
  18. // : e.Hashes
  19. // ----------------------------------------------------------------------------------
  20. // Config Options: mail_table string - name of table for storing emails
  21. // : sql_driver string - database driver name, eg. mysql
  22. // : sql_dsn string - driver-specific data source name
  23. // : primary_mail_host string - primary host name
  24. // : sql_max_open_conns - sets the maximum number of open connections
  25. // : to the database. The default is 0 (unlimited)
  26. // : sql_max_idle_conns - sets the maximum number of connections in the
  27. // : idle connection pool. The default is 2
  28. // : sql_max_conn_lifetime - sets the maximum amount of time
  29. // : a connection may be reused
  30. // --------------:-------------------------------------------------------------------
  31. // Input : e.Data
  32. // : e.DeliveryHeader generated by ParseHeader() processor
  33. // : e.MailFrom
  34. // : e.Subject - generated by by ParseHeader() processor
  35. // ----------------------------------------------------------------------------------
  36. // Output : Sets e.QueuedId with the first item fromHashes[0]
  37. // ----------------------------------------------------------------------------------
  38. func init() {
  39. processors["sql"] = func() Decorator {
  40. return SQL()
  41. }
  42. }
  43. type SQLProcessorConfig struct {
  44. Table string `json:"mail_table"`
  45. Driver string `json:"sql_driver"`
  46. DSN string `json:"sql_dsn"`
  47. SQLInsert string `json:"sql_insert,omitempty"`
  48. SQLValues string `json:"sql_values,omitempty"`
  49. PrimaryHost string `json:"primary_mail_host"`
  50. MaxConnLifetime string `json:"sql_max_conn_lifetime,omitempty"`
  51. MaxOpenConns int `json:"sql_max_open_conns,omitempty"`
  52. MaxIdleConns int `json:"sql_max_idle_conns,omitempty"`
  53. }
  54. type SQLProcessor struct {
  55. cache stmtCache
  56. config *SQLProcessorConfig
  57. }
  58. func (s *SQLProcessor) connect() (*sql.DB, error) {
  59. var db *sql.DB
  60. var err error
  61. if db, err = sql.Open(s.config.Driver, s.config.DSN); err != nil {
  62. Log().Error("cannot open database: ", err)
  63. return nil, err
  64. }
  65. if s.config.MaxOpenConns != 0 {
  66. db.SetMaxOpenConns(s.config.MaxOpenConns)
  67. }
  68. if s.config.MaxIdleConns != 0 {
  69. db.SetMaxIdleConns(s.config.MaxIdleConns)
  70. }
  71. if s.config.MaxConnLifetime != "" {
  72. t, err := time.ParseDuration(s.config.MaxConnLifetime)
  73. if err != nil {
  74. return nil, err
  75. }
  76. db.SetConnMaxLifetime(t)
  77. }
  78. // do we have permission to access the table?
  79. _, err = db.Query("SELECT mail_id FROM " + s.config.Table + " LIMIT 1")
  80. if err != nil {
  81. return nil, err
  82. }
  83. return db, err
  84. }
  85. // prepares the sql query with the number of rows that can be batched with it
  86. func (s *SQLProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
  87. var sqlstr, values string
  88. if rows == 0 {
  89. panic("rows argument cannot be 0")
  90. }
  91. if s.cache[rows-1] != nil {
  92. return s.cache[rows-1]
  93. }
  94. if s.config.SQLInsert != "" {
  95. sqlstr = s.config.SQLInsert
  96. if !strings.HasSuffix(sqlstr, " ") {
  97. // Add a trailing space so we can concatinate our values string
  98. // without causing a syntax error
  99. sqlstr = sqlstr + " "
  100. }
  101. } else {
  102. // Default to MySQL SQL
  103. sqlstr = "INSERT INTO " + s.config.Table + " "
  104. sqlstr += "(`date`, `to`, `from`, `subject`, `body`, `mail`, `spam_score`, "
  105. sqlstr += "`hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, "
  106. sqlstr += "`return_path`, `is_tls`, `message_id`, `reply_to`, `sender`)"
  107. sqlstr += " VALUES "
  108. }
  109. if s.config.SQLValues != "" {
  110. values = s.config.SQLValues
  111. } else {
  112. values = "(NOW(), ?, ?, ?, ? , ?, 0, ?, ?, ?, 0, ?, ?, ?, ?, ?, ?)"
  113. }
  114. // add more rows
  115. comma := ""
  116. for i := 0; i < rows; i++ {
  117. sqlstr += comma + values
  118. if comma == "" {
  119. comma = ","
  120. }
  121. }
  122. stmt, sqlErr := db.Prepare(sqlstr)
  123. if sqlErr != nil {
  124. Log().WithError(sqlErr).Panic("failed while db.Prepare(INSERT...)")
  125. }
  126. // cache it
  127. s.cache[rows-1] = stmt
  128. return stmt
  129. }
  130. func (s *SQLProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) (execErr error) {
  131. defer func() {
  132. if r := recover(); r != nil {
  133. Log().Error("Recovered form panic:", r, string(debug.Stack()))
  134. sum := 0
  135. for _, v := range *vals {
  136. if str, ok := v.(string); ok {
  137. sum = sum + len(str)
  138. }
  139. }
  140. Log().Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
  141. panic("query failed")
  142. }
  143. }()
  144. // prepare the query used to insert when rows reaches batchMax
  145. insertStmt = s.prepareInsertQuery(c, db)
  146. _, execErr = insertStmt.Exec(*vals...)
  147. if execErr != nil {
  148. Log().WithError(execErr).Error("There was a problem the insert")
  149. }
  150. return
  151. }
  152. // for storing ip addresses in the ip_addr column
  153. func (s *SQLProcessor) ip2bint(ip string) *big.Int {
  154. bint := big.NewInt(0)
  155. addr := net.ParseIP(ip)
  156. if strings.Index(ip, "::") > 0 {
  157. bint.SetBytes(addr.To16())
  158. } else {
  159. bint.SetBytes(addr.To4())
  160. }
  161. return bint
  162. }
  163. func (s *SQLProcessor) fillAddressFromHeader(e *mail.Envelope, headerKey string) string {
  164. if v, ok := e.Header[headerKey]; ok {
  165. addr, err := mail.NewAddress(v[0])
  166. if err != nil {
  167. return ""
  168. }
  169. return addr.String()
  170. }
  171. return ""
  172. }
  173. func SQL() Decorator {
  174. var config *SQLProcessorConfig
  175. var vals []interface{}
  176. var db *sql.DB
  177. s := &SQLProcessor{}
  178. // open the database connection (it will also check if we can select the table)
  179. Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
  180. configType := BaseConfig(&SQLProcessorConfig{})
  181. bcfg, err := Svc.ExtractConfig(backendConfig, configType)
  182. if err != nil {
  183. return err
  184. }
  185. config = bcfg.(*SQLProcessorConfig)
  186. s.config = config
  187. db, err = s.connect()
  188. if err != nil {
  189. return err
  190. }
  191. return nil
  192. }))
  193. // shutdown will close the database connection
  194. Svc.AddShutdowner(ShutdownWith(func() error {
  195. if db != nil {
  196. return db.Close()
  197. }
  198. return nil
  199. }))
  200. return func(p Processor) Processor {
  201. return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
  202. if task == TaskSaveMail {
  203. var to, body string
  204. hash := ""
  205. if len(e.Hashes) > 0 {
  206. hash = e.Hashes[0]
  207. e.QueuedId = e.Hashes[0]
  208. }
  209. var co *DataCompressor
  210. // a compressor was set by the Compress processor
  211. if c, ok := e.Values["zlib-compressor"]; ok {
  212. body = "gzip"
  213. co = c.(*DataCompressor)
  214. }
  215. // was saved in redis by the Redis processor
  216. if _, ok := e.Values["redis"]; ok {
  217. body = "redis"
  218. }
  219. for i := range e.RcptTo {
  220. // use the To header, otherwise rcpt to
  221. to = trimToLimit(s.fillAddressFromHeader(e, "To"), 255)
  222. if to == "" {
  223. // trimToLimit(strings.TrimSpace(e.RcptTo[i].User)+"@"+config.PrimaryHost, 255)
  224. to = trimToLimit(strings.TrimSpace(e.RcptTo[i].String()), 255)
  225. }
  226. mid := trimToLimit(s.fillAddressFromHeader(e, "Message-Id"), 255)
  227. if mid == "" {
  228. mid = fmt.Sprintf("%s.%s@%s", hash, e.RcptTo[i].User, config.PrimaryHost)
  229. }
  230. // replyTo is the 'Reply-to' header, it may be blank
  231. replyTo := trimToLimit(s.fillAddressFromHeader(e, "Reply-To"), 255)
  232. // sender is the 'Sender' header, it may be blank
  233. sender := trimToLimit(s.fillAddressFromHeader(e, "Sender"), 255)
  234. recipient := trimToLimit(strings.TrimSpace(e.RcptTo[i].String()), 255)
  235. contentType := ""
  236. if v, ok := e.Header["Content-Type"]; ok {
  237. contentType = trimToLimit(v[0], 255)
  238. }
  239. // build the values for the query
  240. vals = []interface{}{} // clear the vals
  241. vals = append(vals,
  242. to,
  243. trimToLimit(e.MailFrom.String(), 255), // from
  244. trimToLimit(e.Subject, 255),
  245. body, // body describes how to interpret the data, eg 'redis' means stored in redis, and 'gzip' stored in mysql, using gzip compression
  246. )
  247. // `mail` column
  248. if body == "redis" {
  249. // data already saved in redis
  250. vals = append(vals, "")
  251. } else if co != nil {
  252. // use a compressor (automatically adds e.DeliveryHeader)
  253. vals = append(vals, co.String())
  254. } else {
  255. vals = append(vals, e.String())
  256. }
  257. vals = append(vals,
  258. hash, // hash (redis hash if saved in redis)
  259. contentType,
  260. recipient,
  261. s.ip2bint(e.RemoteIP).Bytes(), // ip_addr store as varbinary(16)
  262. trimToLimit(e.MailFrom.String(), 255), // return_path
  263. // is_tls
  264. e.TLS,
  265. // message_id
  266. mid,
  267. // reply_to
  268. replyTo,
  269. sender,
  270. )
  271. stmt := s.prepareInsertQuery(1, db)
  272. err := s.doQuery(1, db, stmt, &vals)
  273. if err != nil {
  274. return NewResult(fmt.Sprint("554 Error: could not save email")), StorageError
  275. }
  276. }
  277. // continue to the next Processor in the decorator chain
  278. return p.Process(e, task)
  279. } else if task == TaskValidateRcpt {
  280. // if you need to validate the e.Rcpt then change to:
  281. if len(e.RcptTo) > 0 {
  282. // since this is called each time a recipient is added
  283. // validate only the _last_ recipient that was appended
  284. last := e.RcptTo[len(e.RcptTo)-1]
  285. if len(last.User) > 255 {
  286. // return with an error
  287. return NewResult(response.Canned.FailRcptCmd), NoSuchUser
  288. }
  289. }
  290. // continue to the next processor
  291. return p.Process(e, task)
  292. } else {
  293. return p.Process(e, task)
  294. }
  295. })
  296. }
  297. }