p_sql.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package backends
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "strings"
  6. "time"
  7. "github.com/flashmob/go-guerrilla/mail"
  8. "math/big"
  9. "net"
  10. "runtime/debug"
  11. "github.com/flashmob/go-guerrilla/response"
  12. )
  13. // ----------------------------------------------------------------------------------
  14. // Processor Name: sql
  15. // ----------------------------------------------------------------------------------
  16. // Description : Saves the e.Data (email data) and e.DeliveryHeader together in sql
  17. // : using the hash generated by the "hash" processor and stored in
  18. // : e.Hashes
  19. // ----------------------------------------------------------------------------------
  20. // Config Options: mail_table string - name of table for storing emails
  21. // : sql_driver string - database driver name, eg. mysql
  22. // : sql_dsn string - driver-specific data source name
  23. // : primary_mail_host string - primary host name
  24. // : sql_max_open_conns - sets the maximum number of open connections
  25. // : to the database. The default is 0 (unlimited)
  26. // : sql_max_idle_conns - sets the maximum number of connections in the
  27. // : idle connection pool. The default is 2
  28. // : sql_max_conn_lifetime - sets the maximum amount of time
  29. // : a connection may be reused
  30. // --------------:-------------------------------------------------------------------
  31. // Input : e.Data
  32. // : e.DeliveryHeader generated by ParseHeader() processor
  33. // : e.MailFrom
  34. // : e.Subject - generated by by ParseHeader() processor
  35. // ----------------------------------------------------------------------------------
  36. // Output : Sets e.QueuedId with the first item fromHashes[0]
  37. // ----------------------------------------------------------------------------------
  38. func init() {
  39. processors["sql"] = func() Decorator {
  40. return SQL()
  41. }
  42. }
  43. type SQLProcessorConfig struct {
  44. Table string `json:"mail_table"`
  45. Driver string `json:"sql_driver"`
  46. DSN string `json:"sql_dsn"`
  47. SQLInsert string `json:"sql_insert,omitempty"`
  48. SQLValues string `json:"sql_values,omitempty"`
  49. PrimaryHost string `json:"primary_mail_host"`
  50. MaxConnLifetime string `json:"sql_max_conn_lifetime,omitempty"`
  51. MaxOpenConns int `json:"sql_max_open_conns,omitempty"`
  52. MaxIdleConns int `json:"sql_max_idle_conns,omitempty"`
  53. }
  54. type SQLProcessor struct {
  55. cache stmtCache
  56. config *SQLProcessorConfig
  57. }
  58. func (s *SQLProcessor) connect() (*sql.DB, error) {
  59. var db *sql.DB
  60. var err error
  61. if db, err = sql.Open(s.config.Driver, s.config.DSN); err != nil {
  62. Log().Error("cannot open database: ", err)
  63. return nil, err
  64. }
  65. if s.config.MaxOpenConns != 0 {
  66. db.SetMaxOpenConns(s.config.MaxOpenConns)
  67. }
  68. if s.config.MaxIdleConns != 0 {
  69. db.SetMaxIdleConns(s.config.MaxIdleConns)
  70. }
  71. if s.config.MaxConnLifetime != "" {
  72. t, err := time.ParseDuration(s.config.MaxConnLifetime)
  73. if err != nil {
  74. return nil, err
  75. }
  76. db.SetConnMaxLifetime(t)
  77. }
  78. // do we have permission to access the table?
  79. _, err = db.Query("SELECT mail_id FROM " + s.config.Table + " LIMIT 1")
  80. if err != nil {
  81. return nil, err
  82. }
  83. return db, err
  84. }
  85. // prepares the sql query with the number of rows that can be batched with it
  86. func (s *SQLProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
  87. var sqlstr, values string
  88. if rows == 0 {
  89. panic("rows argument cannot be 0")
  90. }
  91. if s.cache[rows-1] != nil {
  92. return s.cache[rows-1]
  93. }
  94. if s.config.SQLInsert != "" {
  95. sqlstr = s.config.SQLInsert
  96. if !strings.HasSuffix(sqlstr, " ") {
  97. // Add a trailing space so we can concatinate our values string
  98. // without causing a syntax error
  99. sqlstr = sqlstr + " "
  100. }
  101. } else {
  102. // Default to MySQL SQL
  103. sqlstr = "INSERT INTO " + s.config.Table + " "
  104. sqlstr += "(`date`, `to`, `from`, `subject`, `body`, `mail`, `spam_score`, "
  105. sqlstr += "`hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, "
  106. sqlstr += "`return_path`, `is_tls`, `message_id`, `reply_to`, `sender`)"
  107. sqlstr += " VALUES "
  108. }
  109. if s.config.SQLValues != "" {
  110. values = s.config.SQLValues
  111. } else {
  112. values = "(NOW(), ?, ?, ?, ? , ?, 0, ?, ?, ?, 0, ?, ?, ?, ?, ?, ?)"
  113. }
  114. // add more rows
  115. comma := ""
  116. for i := 0; i < rows; i++ {
  117. sqlstr += comma + values
  118. if comma == "" {
  119. comma = ","
  120. }
  121. }
  122. stmt, sqlErr := db.Prepare(sqlstr)
  123. if sqlErr != nil {
  124. Log().WithError(sqlErr).Panic("failed while db.Prepare(INSERT...)")
  125. }
  126. // cache it
  127. s.cache[rows-1] = stmt
  128. return stmt
  129. }
  130. func (s *SQLProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) (execErr error) {
  131. defer func() {
  132. if r := recover(); r != nil {
  133. sum := 0
  134. for _, v := range *vals {
  135. if str, ok := v.(string); ok {
  136. sum = sum + len(str)
  137. }
  138. }
  139. Log().Fields(
  140. "panic", fmt.Sprintf("%v", r),
  141. "size", sum,
  142. "error", execErr,
  143. "stack", string(debug.Stack()),
  144. ).
  145. Error("panic while inserting query")
  146. panic("query failed")
  147. }
  148. }()
  149. // prepare the query used to insert when rows reaches batchMax
  150. insertStmt = s.prepareInsertQuery(c, db)
  151. _, execErr = insertStmt.Exec(*vals...)
  152. if execErr != nil {
  153. Log().WithError(execErr).Error("There was a problem the insert")
  154. }
  155. return
  156. }
  157. // for storing ip addresses in the ip_addr column
  158. func (s *SQLProcessor) ip2bint(ip string) *big.Int {
  159. bint := big.NewInt(0)
  160. addr := net.ParseIP(ip)
  161. if strings.Index(ip, "::") > 0 {
  162. bint.SetBytes(addr.To16())
  163. } else {
  164. bint.SetBytes(addr.To4())
  165. }
  166. return bint
  167. }
  168. func (s *SQLProcessor) fillAddressFromHeader(e *mail.Envelope, headerKey string) string {
  169. if v, ok := e.Header[headerKey]; ok {
  170. addr, err := mail.NewAddress(v[0])
  171. if err != nil {
  172. return ""
  173. }
  174. return addr.String()
  175. }
  176. return ""
  177. }
  178. func SQL() Decorator {
  179. var config *SQLProcessorConfig
  180. var vals []interface{}
  181. var db *sql.DB
  182. s := &SQLProcessor{}
  183. // open the database connection (it will also check if we can select the table)
  184. Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
  185. configType := BaseConfig(&SQLProcessorConfig{})
  186. bcfg, err := Svc.ExtractConfig(ConfigProcessors, "sql", backendConfig, configType)
  187. if err != nil {
  188. return err
  189. }
  190. config = bcfg.(*SQLProcessorConfig)
  191. s.config = config
  192. db, err = s.connect()
  193. if err != nil {
  194. return err
  195. }
  196. return nil
  197. }))
  198. // shutdown will close the database connection
  199. Svc.AddShutdowner(ShutdownWith(func() error {
  200. if db != nil {
  201. return db.Close()
  202. }
  203. return nil
  204. }))
  205. return func(p Processor) Processor {
  206. return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
  207. if task == TaskSaveMail {
  208. var to, body string
  209. hash := ""
  210. if len(e.Hashes) > 0 {
  211. hash = e.Hashes[0]
  212. e.QueuedId.FromHex(e.Hashes[0])
  213. }
  214. var co *DataCompressor
  215. // a compressor was set by the Compress processor
  216. if c, ok := e.Values["zlib-compressor"]; ok {
  217. body = "gzip"
  218. co = c.(*DataCompressor)
  219. }
  220. // was saved in redis by the Redis processor
  221. if _, ok := e.Values["redis"]; ok {
  222. body = "redis"
  223. }
  224. for i := range e.RcptTo {
  225. // use the To header, otherwise rcpt to
  226. to = trimToLimit(s.fillAddressFromHeader(e, "To"), 255)
  227. if to == "" {
  228. // trimToLimit(strings.TrimSpace(e.RcptTo[i].User)+"@"+config.PrimaryHost, 255)
  229. to = trimToLimit(strings.TrimSpace(e.RcptTo[i].String()), 255)
  230. }
  231. mid := trimToLimit(s.fillAddressFromHeader(e, "Message-Id"), 255)
  232. if mid == "" {
  233. mid = fmt.Sprintf("%s.%s@%s", hash, e.RcptTo[i].User, config.PrimaryHost)
  234. }
  235. // replyTo is the 'Reply-to' header, it may be blank
  236. replyTo := trimToLimit(s.fillAddressFromHeader(e, "Reply-To"), 255)
  237. // sender is the 'Sender' header, it may be blank
  238. sender := trimToLimit(s.fillAddressFromHeader(e, "Sender"), 255)
  239. recipient := trimToLimit(strings.TrimSpace(e.RcptTo[i].String()), 255)
  240. contentType := ""
  241. if v, ok := e.Header["Content-Type"]; ok {
  242. contentType = trimToLimit(v[0], 255)
  243. }
  244. // build the values for the query
  245. vals = []interface{}{} // clear the vals
  246. vals = append(vals,
  247. to,
  248. trimToLimit(e.MailFrom.String(), 255), // from
  249. trimToLimit(e.Subject, 255),
  250. body, // body describes how to interpret the data, eg 'redis' means stored in redis, and 'gzip' stored in mysql, using gzip compression
  251. )
  252. // `mail` column
  253. if body == "redis" {
  254. // data already saved in redis
  255. vals = append(vals, "")
  256. } else if co != nil {
  257. // use a compressor (automatically adds e.DeliveryHeader)
  258. vals = append(vals, co.String())
  259. } else {
  260. vals = append(vals, e.String())
  261. }
  262. vals = append(vals,
  263. hash, // hash (redis hash if saved in redis)
  264. contentType,
  265. recipient,
  266. s.ip2bint(e.RemoteIP).Bytes(), // ip_addr store as varbinary(16)
  267. trimToLimit(e.MailFrom.String(), 255), // return_path
  268. // is_tls
  269. e.TLS,
  270. // message_id
  271. mid,
  272. // reply_to
  273. replyTo,
  274. sender,
  275. )
  276. stmt := s.prepareInsertQuery(1, db)
  277. err := s.doQuery(1, db, stmt, &vals)
  278. if err != nil {
  279. return NewResult(fmt.Sprint("554 Error: could not save email")), StorageError
  280. }
  281. }
  282. // continue to the next Processor in the decorator chain
  283. return p.Process(e, task)
  284. } else if task == TaskValidateRcpt {
  285. // if you need to validate the e.Rcpt then change to:
  286. if len(e.RcptTo) > 0 {
  287. // since this is called each time a recipient is added
  288. // validate only the _last_ recipient that was appended
  289. last := e.RcptTo[len(e.RcptTo)-1]
  290. if len(last.User) > 255 {
  291. // return with an error
  292. return NewResult(response.Canned.FailRcptCmd), NoSuchUser
  293. }
  294. }
  295. // continue to the next processor
  296. return p.Process(e, task)
  297. } else {
  298. return p.Process(e, task)
  299. }
  300. })
  301. }
  302. }