store_sql.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package chunk
  2. import (
  3. "database/sql"
  4. "encoding/binary"
  5. "encoding/json"
  6. "github.com/flashmob/go-guerrilla/backends"
  7. "github.com/flashmob/go-guerrilla/mail/smtp"
  8. "net"
  9. )
  10. func init() {
  11. StorageEngines["sql"] = func() Storage {
  12. return new(StoreSQL)
  13. }
  14. }
  15. type sqlConfig struct {
  16. EmailTable string `json:"email_table,omitempty"`
  17. ChunkTable string `json:"chunk_table,omitempty"`
  18. Driver string `json:"sql_driver,omitempty"`
  19. DSN string `json:"sql_dsn,omitempty"`
  20. PrimaryHost string `json:"primary_mail_host,omitempty"`
  21. CompressLevel int `json:"compress_level,omitempty"`
  22. }
  23. // StoreSQL implements the Storage interface
  24. type StoreSQL struct {
  25. config sqlConfig
  26. statements map[string]*sql.Stmt
  27. db *sql.DB
  28. }
  29. func (s *StoreSQL) connect() (*sql.DB, error) {
  30. var err error
  31. if s.db, err = sql.Open(s.config.Driver, s.config.DSN); err != nil {
  32. backends.Log().Error("cannot open database: ", err)
  33. return nil, err
  34. }
  35. // do we have permission to access the table?
  36. _, err = s.db.Query("SELECT mail_id FROM " + s.config.EmailTable + " LIMIT 1")
  37. if err != nil {
  38. return nil, err
  39. }
  40. return s.db, err
  41. }
  42. func (s *StoreSQL) prepareSql() error {
  43. if s.statements == nil {
  44. s.statements = make(map[string]*sql.Stmt)
  45. }
  46. // begin inserting an email (before saving chunks)
  47. if stmt, err := s.db.Prepare(`INSERT INTO ` +
  48. s.config.EmailTable +
  49. ` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, is_tls, is_8bit)
  50. VALUES(?, ?, ?, ?, ?, ?, ?, ?)`); err != nil {
  51. return err
  52. } else {
  53. s.statements["insertEmail"] = stmt
  54. }
  55. // insert a chunk of email's data
  56. if stmt, err := s.db.Prepare(`INSERT INTO ` +
  57. s.config.ChunkTable +
  58. ` (data, hash)
  59. VALUES(?, ?)`); err != nil {
  60. return err
  61. } else {
  62. s.statements["insertChunk"] = stmt
  63. }
  64. // finalize the email (the connection closed)
  65. if stmt, err := s.db.Prepare(`
  66. UPDATE ` + s.config.EmailTable + `
  67. SET size=?, parts_info = ?, subject, delivery_id = ?, to = ?
  68. WHERE mail_id = ? `); err != nil {
  69. return err
  70. } else {
  71. s.statements["finalizeEmail"] = stmt
  72. }
  73. // Check the existence of a chunk (the reference_count col is incremented if it exists)
  74. // This means we can avoid re-inserting an existing chunk, only update its reference_count
  75. // check the "affected rows" count after executing query
  76. if stmt, err := s.db.Prepare(`
  77. UPDATE ` + s.config.ChunkTable + `
  78. SET reference_count=reference_count+1
  79. WHERE hash = ? `); err != nil {
  80. return err
  81. } else {
  82. s.statements["chunkReferenceIncr"] = stmt
  83. }
  84. // If the reference_count is 0 then it means the chunk has been deleted
  85. // Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
  86. if stmt, err := s.db.Prepare(`
  87. UPDATE ` + s.config.ChunkTable + `
  88. SET reference_count=reference_count-1
  89. WHERE hash = ? AND reference_count > 0`); err != nil {
  90. return err
  91. } else {
  92. s.statements["chunkReferenceDecr"] = stmt
  93. }
  94. // fetch an email
  95. if stmt, err := s.db.Prepare(`
  96. SELECT *
  97. from ` + s.config.EmailTable + `
  98. where mail_id=?`); err != nil {
  99. return err
  100. } else {
  101. s.statements["selectMail"] = stmt
  102. }
  103. // fetch a chunk
  104. if stmt, err := s.db.Prepare(`
  105. SELECT *
  106. from ` + s.config.ChunkTable + `
  107. where hash=?`); err != nil {
  108. return err
  109. } else {
  110. s.statements["selectChunk"] = stmt
  111. }
  112. // TODO sweep old chunks
  113. // TODO sweep incomplete emails
  114. return nil
  115. }
  116. // OpenMessage implements the Storage interface
  117. func (s *StoreSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool, transport smtp.TransportType) (mailID uint64, err error) {
  118. // if it's ipv4 then we want ipv6 to be 0, and vice-versa
  119. var ip4 uint32
  120. ip6 := make([]byte, 16)
  121. if ip := ipAddress.IP.To4(); ip != nil {
  122. ip4 = binary.BigEndian.Uint32(ip)
  123. } else {
  124. _ = copy(ip6, ipAddress.IP)
  125. }
  126. r, err := s.statements["insertEmail"].Exec(from, helo, recipient, ip4, ip6, returnPath, isTLS, transport)
  127. if err != nil {
  128. return 0, err
  129. }
  130. id, err := r.LastInsertId()
  131. if err != nil {
  132. return 0, err
  133. }
  134. return uint64(id), err
  135. }
  136. // AddChunk implements the Storage interface
  137. func (s *StoreSQL) AddChunk(data []byte, hash []byte) error {
  138. // attempt to increment the reference_count (it means the chunk is already in there)
  139. r, err := s.statements["chunkReferenceIncr"].Exec(hash)
  140. if err != nil {
  141. return err
  142. }
  143. affected, err := r.RowsAffected()
  144. if err != nil {
  145. return err
  146. }
  147. if affected == 0 {
  148. // chunk isn't in there, let's insert it
  149. _, err := s.statements["insertChunk"].Exec(data, hash)
  150. if err != nil {
  151. return err
  152. }
  153. }
  154. return nil
  155. }
  156. // CloseMessage implements the Storage interface
  157. func (s *StoreSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
  158. partsInfoJson, err := json.Marshal(partsInfo)
  159. if err != nil {
  160. return err
  161. }
  162. _, err = s.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, deliveryID, to, mailID)
  163. if err != nil {
  164. return err
  165. }
  166. return nil
  167. }
  168. // Initialize loads the specific database config, connects to the db, prepares statements
  169. func (s *StoreSQL) Initialize(cfg backends.ConfigGroup) error {
  170. sd := backends.StreamDecorator{}
  171. err := sd.ExtractConfig(cfg, &s.config)
  172. if err != nil {
  173. return err
  174. }
  175. s.db, err = s.connect()
  176. if err != nil {
  177. return err
  178. }
  179. err = s.prepareSql()
  180. if err != nil {
  181. return err
  182. }
  183. return nil
  184. }
  185. // Shutdown implements the Storage interface
  186. func (s *StoreSQL) Shutdown() (err error) {
  187. defer func() {
  188. closeErr := s.db.Close()
  189. if closeErr != err {
  190. backends.Log().WithError(err).Error("failed to close sql database")
  191. err = closeErr
  192. }
  193. }()
  194. for i := range s.statements {
  195. if err = s.statements[i].Close(); err != nil {
  196. backends.Log().WithError(err).Error("failed to close sql statement")
  197. }
  198. }
  199. return err
  200. }
  201. // GetEmail implements the Storage interface
  202. func (s *StoreSQL) GetEmail(mailID uint64) (*Email, error) {
  203. return &Email{}, nil
  204. }
  205. // GetChunk implements the Storage interface
  206. func (s *StoreSQL) GetChunks(hash ...HashKey) ([]*Chunk, error) {
  207. result := make([]*Chunk, 0, len(hash))
  208. return result, nil
  209. }