p_mysql.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package backends
  2. import (
  3. "database/sql"
  4. "strings"
  5. "time"
  6. "github.com/flashmob/go-guerrilla/envelope"
  7. "github.com/go-sql-driver/mysql"
  8. "runtime/debug"
  9. )
  10. // ----------------------------------------------------------------------------------
  11. // Processor Name: mysql
  12. // ----------------------------------------------------------------------------------
  13. // Description : Saves the e.Data (email data) and e.DeliveryHeader together in mysql
  14. // : using the hash generated by the "hash" processor and stored in
  15. // : e.Hashes
  16. // ----------------------------------------------------------------------------------
  17. // Config Options: mail_table string - mysql table name
  18. // : mysql_db string - mysql database name
  19. // : mysql_host string - mysql host name, eg. 127.0.0.1
  20. // : mysql_pass string - mysql password
  21. // : mysql_user string - mysql username
  22. // : primary_mail_host string - primary host name
  23. // --------------:-------------------------------------------------------------------
  24. // Input : e.Data
  25. // : e.DeliveryHeader generated by ParseHeader() processor
  26. // : e.MailFrom
  27. // : e.Subject - generated by by ParseHeader() processor
  28. // ----------------------------------------------------------------------------------
  29. // Output : Sets e.QueuedId with the first item fromHashes[0]
  30. // ----------------------------------------------------------------------------------
  31. func init() {
  32. Processors["mysql"] = func() Decorator {
  33. return MySql()
  34. }
  35. }
  36. type MysqlProcessorConfig struct {
  37. MysqlTable string `json:"mail_table"`
  38. MysqlDB string `json:"mysql_db"`
  39. MysqlHost string `json:"mysql_host"`
  40. MysqlPass string `json:"mysql_pass"`
  41. MysqlUser string `json:"mysql_user"`
  42. PrimaryHost string `json:"primary_mail_host"`
  43. }
  44. type MysqlProcessor struct {
  45. cache stmtCache
  46. config *MysqlProcessorConfig
  47. }
  48. func (m *MysqlProcessor) connect(config *MysqlProcessorConfig) (*sql.DB, error) {
  49. var db *sql.DB
  50. var err error
  51. conf := mysql.Config{
  52. User: config.MysqlUser,
  53. Passwd: config.MysqlPass,
  54. DBName: config.MysqlDB,
  55. Net: "tcp",
  56. Addr: config.MysqlHost,
  57. ReadTimeout: GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
  58. WriteTimeout: GuerrillaDBAndRedisBatchTimeout + (time.Second * 10),
  59. Params: map[string]string{"collation": "utf8_general_ci"},
  60. }
  61. if db, err = sql.Open("mysql", conf.FormatDSN()); err != nil {
  62. Log().Error("cannot open mysql", err)
  63. return nil, err
  64. }
  65. Log().Info("connected to mysql on tcp ", config.MysqlHost)
  66. return db, err
  67. }
  68. // prepares the sql query with the number of rows that can be batched with it
  69. func (g *MysqlProcessor) prepareInsertQuery(rows int, db *sql.DB) *sql.Stmt {
  70. if rows == 0 {
  71. panic("rows argument cannot be 0")
  72. }
  73. if g.cache[rows-1] != nil {
  74. return g.cache[rows-1]
  75. }
  76. sqlstr := "INSERT INTO " + g.config.MysqlTable + " "
  77. sqlstr += "(`date`, `to`, `from`, `subject`, `body`, `charset`, `mail`, `spam_score`, `hash`, `content_type`, `recipient`, `has_attach`, `ip_addr`, `return_path`, `is_tls`)"
  78. sqlstr += " values "
  79. values := "(NOW(), ?, ?, ?, ? , 'UTF-8' , ?, 0, ?, '', ?, 0, ?, ?, ?)"
  80. // add more rows
  81. comma := ""
  82. for i := 0; i < rows; i++ {
  83. sqlstr += comma + values
  84. if comma == "" {
  85. comma = ","
  86. }
  87. }
  88. stmt, sqlErr := db.Prepare(sqlstr)
  89. if sqlErr != nil {
  90. Log().WithError(sqlErr).Panic("failed while db.Prepare(INSERT...)")
  91. }
  92. // cache it
  93. g.cache[rows-1] = stmt
  94. return stmt
  95. }
  96. func (g *MysqlProcessor) doQuery(c int, db *sql.DB, insertStmt *sql.Stmt, vals *[]interface{}) {
  97. var execErr error
  98. defer func() {
  99. if r := recover(); r != nil {
  100. Log().Error("Recovered form panic:", r, string(debug.Stack()))
  101. sum := 0
  102. for _, v := range *vals {
  103. if str, ok := v.(string); ok {
  104. sum = sum + len(str)
  105. }
  106. }
  107. Log().Errorf("panic while inserting query [%s] size:%d, err %v", r, sum, execErr)
  108. panic("query failed")
  109. }
  110. }()
  111. // prepare the query used to insert when rows reaches batchMax
  112. insertStmt = g.prepareInsertQuery(c, db)
  113. _, execErr = insertStmt.Exec(*vals...)
  114. if execErr != nil {
  115. Log().WithError(execErr).Error("There was a problem the insert")
  116. }
  117. }
  118. func MySql() Decorator {
  119. var config *MysqlProcessorConfig
  120. var vals []interface{}
  121. var db *sql.DB
  122. mp := &MysqlProcessor{}
  123. Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
  124. configType := BaseConfig(&MysqlProcessorConfig{})
  125. bcfg, err := Service.ExtractConfig(backendConfig, configType)
  126. if err != nil {
  127. return err
  128. }
  129. config = bcfg.(*MysqlProcessorConfig)
  130. mp.config = config
  131. db, err = mp.connect(config)
  132. if err != nil {
  133. Log().Error("cannot open mysql: %s", err)
  134. return err
  135. }
  136. return nil
  137. }))
  138. // shutdown
  139. Service.AddShutdowner(Shutdown(func() error {
  140. if db != nil {
  141. return db.Close()
  142. }
  143. return nil
  144. }))
  145. return func(c Processor) Processor {
  146. return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
  147. var to, body string
  148. to = trimToLimit(strings.TrimSpace(e.RcptTo[0].User)+"@"+config.PrimaryHost, 255)
  149. hash := ""
  150. if len(e.Hashes) > 0 {
  151. hash = e.Hashes[0]
  152. e.QueuedId = e.Hashes[0]
  153. }
  154. var co *compressor
  155. // a compressor was set
  156. if c, ok := e.Info["zlib-compressor"]; ok {
  157. body = "gzip"
  158. co = c.(*compressor)
  159. }
  160. // was saved in redis
  161. if _, ok := e.Info["redis"]; ok {
  162. body = "redis"
  163. }
  164. // build the values for the query
  165. vals = []interface{}{} // clear the vals
  166. vals = append(vals,
  167. to,
  168. trimToLimit(e.MailFrom.String(), 255),
  169. trimToLimit(e.Subject, 255),
  170. body)
  171. if body == "redis" {
  172. // data already saved in redis
  173. vals = append(vals, "")
  174. } else if co != nil {
  175. // use a compressor (automatically adds e.DeliveryHeader)
  176. vals = append(vals, co.String())
  177. //co.clear()
  178. } else {
  179. vals = append(vals, e.String())
  180. }
  181. vals = append(vals,
  182. hash,
  183. to,
  184. e.RemoteAddress,
  185. trimToLimit(e.MailFrom.String(), 255),
  186. e.TLS)
  187. stmt := mp.prepareInsertQuery(1, db)
  188. mp.doQuery(1, db, stmt, &vals)
  189. // continue to the next Processor in the decorator chain
  190. return c.Process(e)
  191. })
  192. }
  193. }