store_sql.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. package chunk
  2. import (
  3. "database/sql"
  4. "encoding/binary"
  5. "encoding/json"
  6. "github.com/flashmob/go-guerrilla/backends"
  7. "net"
  8. )
  9. type chunkSaverSQLConfig struct {
  10. EmailTable string `json:"chunksaver_email_table,omitempty"`
  11. ChunkTable string `json:"chunksaver_chunk_table,omitempty"`
  12. Driver string `json:"chunksaver_sql_driver,omitempty"`
  13. DSN string `json:"chunksaver_sql_dsn,omitempty"`
  14. PrimaryHost string `json:"chunksaver_primary_mail_host,omitempty"`
  15. }
  16. // ChunkSaverSQL implements the ChunkSaverStorage interface
  17. type ChunkSaverSQL struct {
  18. config *chunkSaverSQLConfig
  19. statements map[string]*sql.Stmt
  20. db *sql.DB
  21. }
  22. func (c *ChunkSaverSQL) connect() (*sql.DB, error) {
  23. var err error
  24. if c.db, err = sql.Open(c.config.Driver, c.config.DSN); err != nil {
  25. backends.Log().Error("cannot open database: ", err)
  26. return nil, err
  27. }
  28. // do we have permission to access the table?
  29. _, err = c.db.Query("SELECT mail_id FROM " + c.config.EmailTable + " LIMIT 1")
  30. if err != nil {
  31. return nil, err
  32. }
  33. return c.db, err
  34. }
  35. func (c *ChunkSaverSQL) prepareSql() error {
  36. if c.statements == nil {
  37. c.statements = make(map[string]*sql.Stmt)
  38. }
  39. if stmt, err := c.db.Prepare(`INSERT INTO ` +
  40. c.config.EmailTable +
  41. ` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, is_tls)
  42. VALUES(?, ?, ?, ?, ?, ?, ?)`); err != nil {
  43. return err
  44. } else {
  45. c.statements["insertEmail"] = stmt
  46. }
  47. // begin inserting an email (before saving chunks)
  48. if stmt, err := c.db.Prepare(`INSERT INTO ` +
  49. c.config.ChunkTable +
  50. ` (data, hash)
  51. VALUES(?, ?)`); err != nil {
  52. return err
  53. } else {
  54. c.statements["insertChunk"] = stmt
  55. }
  56. // finalize the email (the connection closed)
  57. if stmt, err := c.db.Prepare(`
  58. UPDATE ` + c.config.EmailTable + `
  59. SET size=?, parts_info = ?, subject, delivery_id = ?, to = ?
  60. WHERE mail_id = ? `); err != nil {
  61. return err
  62. } else {
  63. c.statements["finalizeEmail"] = stmt
  64. }
  65. // Check the existence of a chunk (the reference_count col is incremented if it exists)
  66. // This means we can avoid re-inserting an existing chunk, only update its reference_count
  67. if stmt, err := c.db.Prepare(`
  68. UPDATE ` + c.config.ChunkTable + `
  69. SET reference_count=reference_count+1
  70. WHERE hash = ? `); err != nil {
  71. return err
  72. } else {
  73. c.statements["chunkReferenceIncr"] = stmt
  74. }
  75. // If the reference_count is 0 then it means the chunk has been deleted
  76. // Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
  77. if stmt, err := c.db.Prepare(`
  78. UPDATE ` + c.config.ChunkTable + `
  79. SET reference_count=reference_count-1
  80. WHERE hash = ? AND reference_count > 0`); err != nil {
  81. return err
  82. } else {
  83. c.statements["chunkReferenceDecr"] = stmt
  84. }
  85. // fetch an email
  86. if stmt, err := c.db.Prepare(`
  87. SELECT *
  88. from ` + c.config.EmailTable + `
  89. where mail_id=?`); err != nil {
  90. return err
  91. } else {
  92. c.statements["selectMail"] = stmt
  93. }
  94. // fetch a chunk
  95. if stmt, err := c.db.Prepare(`
  96. SELECT *
  97. from ` + c.config.ChunkTable + `
  98. where hash=?`); err != nil {
  99. return err
  100. } else {
  101. c.statements["selectChunk"] = stmt
  102. }
  103. // TODO sweep old chunks
  104. // TODO sweep incomplete emails
  105. return nil
  106. }
  107. // OpenMessage implements the ChunkSaverStorage interface
  108. func (c *ChunkSaverSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
  109. // if it's ipv4 then we want ipv6 to be 0, and vice-versa
  110. var ip4 uint32
  111. ip6 := make([]byte, 16)
  112. if ip := ipAddress.IP.To4(); ip != nil {
  113. ip4 = binary.BigEndian.Uint32(ip)
  114. } else {
  115. _ = copy(ip6, ipAddress.IP)
  116. }
  117. r, err := c.statements["insertEmail"].Exec(from, helo, recipient, ip4, ip6, returnPath, isTLS)
  118. if err != nil {
  119. return 0, err
  120. }
  121. id, err := r.LastInsertId()
  122. if err != nil {
  123. return 0, err
  124. }
  125. return uint64(id), err
  126. }
  127. // AddChunk implements the ChunkSaverStorage interface
  128. func (c *ChunkSaverSQL) AddChunk(data []byte, hash []byte) error {
  129. // attempt to increment the reference_count (it means the chunk is already in there)
  130. r, err := c.statements["chunkReferenceIncr"].Exec(hash)
  131. if err != nil {
  132. return err
  133. }
  134. affected, err := r.RowsAffected()
  135. if err != nil {
  136. return err
  137. }
  138. if affected == 0 {
  139. // chunk isn't in there, let's insert it
  140. _, err := c.statements["insertChunk"].Exec(data, hash)
  141. if err != nil {
  142. return err
  143. }
  144. }
  145. return nil
  146. }
  147. // CloseMessage implements the ChunkSaverStorage interface
  148. func (c *ChunkSaverSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
  149. partsInfoJson, err := json.Marshal(partsInfo)
  150. if err != nil {
  151. return err
  152. }
  153. _, err = c.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, deliveryID, to, mailID)
  154. if err != nil {
  155. return err
  156. }
  157. return nil
  158. }
  159. // Initialize loads the specific database config, connects to the db, prepares statements
  160. func (c *ChunkSaverSQL) Initialize(cfg backends.BackendConfig) error {
  161. configType := backends.BaseConfig(&chunkSaverSQLConfig{})
  162. bcfg, err := backends.Svc.ExtractConfig(cfg, configType)
  163. if err != nil {
  164. return err
  165. }
  166. c.config = bcfg.(*chunkSaverSQLConfig)
  167. c.db, err = c.connect()
  168. if err != nil {
  169. return err
  170. }
  171. err = c.prepareSql()
  172. if err != nil {
  173. return err
  174. }
  175. return nil
  176. }
  177. // Shutdown implements the ChunkSaverStorage interface
  178. func (c *ChunkSaverSQL) Shutdown() (err error) {
  179. defer func() {
  180. closeErr := c.db.Close()
  181. if closeErr != err {
  182. backends.Log().WithError(err).Error("failed to close sql database")
  183. err = closeErr
  184. }
  185. }()
  186. for i := range c.statements {
  187. if err = c.statements[i].Close(); err != nil {
  188. backends.Log().WithError(err).Error("failed to close sql statement")
  189. }
  190. }
  191. return err
  192. }
  193. // GetEmail implements the ChunkSaverStorage interface
  194. func (c *ChunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
  195. return &ChunkSaverEmail{}, nil
  196. }
  197. // GetChunk implements the ChunkSaverStorage interface
  198. func (c *ChunkSaverSQL) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
  199. result := make([]*ChunkSaverChunk, 0, len(hash))
  200. return result, nil
  201. }