p_sql.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package backends
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "strings"
  6. "github.com/flashmob/go-guerrilla/mail"
  7. "math/big"
  8. "net"
  9. "runtime/debug"
  10. "github.com/flashmob/go-guerrilla/response"
  11. )
  12. // ----------------------------------------------------------------------------------
  13. // Processor Name: sql
  14. // ----------------------------------------------------------------------------------
  15. // Description : Saves the e.Data (email data) and e.DeliveryHeader together in sql
  16. // : using the hash generated by the "hash" processor and stored in
  17. // : e.Hashes
  18. // ----------------------------------------------------------------------------------
  19. // Config Options: mail_table string - name of table for storing emails
  20. // : sql_driver string - database driver name, eg. mysql
  21. // : sql_dsn string - driver-specific data source name
  22. // : primary_mail_host string - primary host name
  23. // --------------:-------------------------------------------------------------------
  24. // Input : e.Data
  25. // : e.DeliveryHeader generated by ParseHeader() processor
  26. // : e.MailFrom
  27. // : e.Subject - generated by by ParseHeader() processor
  28. // ----------------------------------------------------------------------------------
  29. // Output : Sets e.QueuedId with the first item fromHashes[0]
  30. // ----------------------------------------------------------------------------------
  31. func init() {
  32. processors["sql"] = func() Decorator {
  33. return SQL()
  34. }
  35. }
  36. type SQLProcessorConfig struct {
  37. Table string `json:"mail_table"`
  38. Driver string `json:"sql_driver"`
  39. DSN string `json:"sql_dsn"`
  40. SQLInsert string `json:"sql_insert,omitempty"`
  41. SQLValues string `json:"sql_values,omitempty"`
  42. PrimaryHost string `json:"primary_mail_host"`
  43. }
  44. type SQLProcessor struct {
  45. cache stmtCache
  46. config *SQLProcessorConfig
  47. }
  48. func (s *SQLProcessor) connect() (*sql.DB, error) {
  49. var db *sql.DB
  50. var err error
  51. if db, err = sql.Open(s.config.Driver, s.config.DSN); err != nil {
  52. Log().Error("cannot open database: ", err)
  53. return nil, err
  54. }
  55. // do we have permission to access the table?
  56. _, err = db.Query("SELECT mail_id FROM " + s.config.Table + " LIMIT 1")
  57. if err != nil {
  58. return nil, err
  59. }
  60. return db, err
  61. }
  62. // prepares the sql query with the number of rows that can be batched with it
  63. func (s *SQLProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
  64. var sqlstr, values string
  65. if rows == 0 {
  66. panic("rows argument cannot be 0")
  67. }
  68. if s.cache[rows-1] != nil {
  69. return s.cache[rows-1]
  70. }
  71. if s.config.SQLInsert != "" {
  72. sqlstr = s.config.SQLInsert
  73. if !strings.HasSuffix(sqlstr, " ") {
  74. // Add a trailing space so we can concatinate our values string
  75. // without causing a syntax error
  76. sqlstr = sqlstr + " "
  77. }
  78. } else {
  79. // Default to MySQL SQL
  80. sqlstr = "INSERT INTO " + s.config.Table + " "
  81. sqlstr += "(`date`, `to`, `from`, `subject`, `body`, `mail`, `spam_score`, "
  82. sqlstr += "`hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, "
  83. sqlstr += "`return_path`, `is_tls`, `message_id`, `reply_to`, `sender`)"
  84. sqlstr += " VALUES "
  85. }
  86. if s.config.SQLValues != "" {
  87. values = s.config.SQLValues
  88. } else {
  89. values = "(NOW(), ?, ?, ?, ? , ?, 0, ?, ?, ?, 0, ?, ?, ?, ?, ?, ?)"
  90. }
  91. // add more rows
  92. comma := ""
  93. for i := 0; i < rows; i++ {
  94. sqlstr += comma + values
  95. if comma == "" {
  96. comma = ","
  97. }
  98. }
  99. stmt, sqlErr := db.Prepare(sqlstr)
  100. if sqlErr != nil {
  101. Log().WithError(sqlErr).Panic("failed while db.Prepare(INSERT...)")
  102. }
  103. // cache it
  104. s.cache[rows-1] = stmt
  105. return stmt
  106. }
  107. func (s *SQLProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) (execErr error) {
  108. defer func() {
  109. if r := recover(); r != nil {
  110. Log().Error("Recovered form panic:", r, string(debug.Stack()))
  111. sum := 0
  112. for _, v := range *vals {
  113. if str, ok := v.(string); ok {
  114. sum = sum + len(str)
  115. }
  116. }
  117. Log().Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
  118. panic("query failed")
  119. }
  120. }()
  121. // prepare the query used to insert when rows reaches batchMax
  122. insertStmt = s.prepareInsertQuery(c, db)
  123. _, execErr = insertStmt.Exec(*vals...)
  124. if execErr != nil {
  125. Log().WithError(execErr).Error("There was a problem the insert")
  126. }
  127. return
  128. }
  129. // for storing ip addresses in the ip_addr column
  130. func (s *SQLProcessor) ip2bint(ip string) *big.Int {
  131. bint := big.NewInt(0)
  132. addr := net.ParseIP(ip)
  133. if strings.Index(ip, "::") > 0 {
  134. bint.SetBytes(addr.To16())
  135. } else {
  136. bint.SetBytes(addr.To4())
  137. }
  138. return bint
  139. }
  140. func (s *SQLProcessor) fillAddressFromHeader(e *mail.Envelope, headerKey string) string {
  141. if v, ok := e.Header[headerKey]; ok {
  142. addr, err := mail.NewAddress(v[0])
  143. if err != nil {
  144. return ""
  145. }
  146. return addr.String()
  147. }
  148. return ""
  149. }
  150. func SQL() Decorator {
  151. var config *SQLProcessorConfig
  152. var vals []interface{}
  153. var db *sql.DB
  154. s := &SQLProcessor{}
  155. // open the database connection (it will also check if we can select the table)
  156. Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
  157. configType := BaseConfig(&SQLProcessorConfig{})
  158. bcfg, err := Svc.ExtractConfig(backendConfig, configType)
  159. if err != nil {
  160. return err
  161. }
  162. config = bcfg.(*SQLProcessorConfig)
  163. s.config = config
  164. db, err = s.connect()
  165. if err != nil {
  166. return err
  167. }
  168. return nil
  169. }))
  170. // shutdown will close the database connection
  171. Svc.AddShutdowner(ShutdownWith(func() error {
  172. if db != nil {
  173. return db.Close()
  174. }
  175. return nil
  176. }))
  177. return func(p Processor) Processor {
  178. return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
  179. if task == TaskSaveMail {
  180. var to, body string
  181. hash := ""
  182. if len(e.Hashes) > 0 {
  183. hash = e.Hashes[0]
  184. e.QueuedId = e.Hashes[0]
  185. }
  186. var co *compressor
  187. // a compressor was set by the Compress processor
  188. if c, ok := e.Values["zlib-compressor"]; ok {
  189. body = "gzip"
  190. co = c.(*compressor)
  191. }
  192. // was saved in redis by the Redis processor
  193. if _, ok := e.Values["redis"]; ok {
  194. body = "redis"
  195. }
  196. for i := range e.RcptTo {
  197. // use the To header, otherwise rcpt to
  198. to = trimToLimit(s.fillAddressFromHeader(e, "To"), 255)
  199. if to == "" {
  200. // trimToLimit(strings.TrimSpace(e.RcptTo[i].User)+"@"+config.PrimaryHost, 255)
  201. to = trimToLimit(strings.TrimSpace(e.RcptTo[i].String()), 255)
  202. }
  203. mid := trimToLimit(s.fillAddressFromHeader(e, "Message-Id"), 255)
  204. if mid == "" {
  205. mid = fmt.Sprintf("%s.%s@%s", hash, e.RcptTo[i].User, config.PrimaryHost)
  206. }
  207. // replyTo is the 'Reply-to' header, it may be blank
  208. replyTo := trimToLimit(s.fillAddressFromHeader(e, "Reply-To"), 255)
  209. // sender is the 'Sender' header, it may be blank
  210. sender := trimToLimit(s.fillAddressFromHeader(e, "Sender"), 255)
  211. recipient := trimToLimit(strings.TrimSpace(e.RcptTo[i].String()), 255)
  212. contentType := ""
  213. if v, ok := e.Header["Content-Type"]; ok {
  214. contentType = trimToLimit(v[0], 255)
  215. }
  216. // build the values for the query
  217. vals = []interface{}{} // clear the vals
  218. vals = append(vals,
  219. to,
  220. trimToLimit(e.MailFrom.String(), 255), // from
  221. trimToLimit(e.Subject, 255),
  222. body, // body describes how to interpret the data, eg 'redis' means stored in redis, and 'gzip' stored in mysql, using gzip compression
  223. )
  224. // `mail` column
  225. if body == "redis" {
  226. // data already saved in redis
  227. vals = append(vals, "")
  228. } else if co != nil {
  229. // use a compressor (automatically adds e.DeliveryHeader)
  230. vals = append(vals, co.String())
  231. } else {
  232. vals = append(vals, e.String())
  233. }
  234. vals = append(vals,
  235. hash, // hash (redis hash if saved in redis)
  236. contentType,
  237. recipient,
  238. s.ip2bint(e.RemoteIP).Bytes(), // ip_addr store as varbinary(16)
  239. trimToLimit(e.MailFrom.String(), 255), // return_path
  240. // is_tls
  241. e.TLS,
  242. // message_id
  243. mid,
  244. // reply_to
  245. replyTo,
  246. sender,
  247. )
  248. stmt := s.prepareInsertQuery(1, db)
  249. err := s.doQuery(1, db, stmt, &vals)
  250. if err != nil {
  251. return NewResult(fmt.Sprint("554 Error: could not save email")), StorageError
  252. }
  253. }
  254. // continue to the next Processor in the decorator chain
  255. return p.Process(e, task)
  256. } else if task == TaskValidateRcpt {
  257. // if you need to validate the e.Rcpt then change to:
  258. if len(e.RcptTo) > 0 {
  259. // since this is called each time a recipient is added
  260. // validate only the _last_ recipient that was appended
  261. last := e.RcptTo[len(e.RcptTo)-1]
  262. if len(last.User) > 255 {
  263. // return with an error
  264. return NewResult(response.Canned.FailRcptCmd), NoSuchUser
  265. }
  266. }
  267. // continue to the next processor
  268. return p.Process(e, task)
  269. } else {
  270. return p.Process(e, task)
  271. }
  272. })
  273. }
  274. }