store_sql.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. package chunk
  2. import (
  3. "bytes"
  4. "database/sql"
  5. "database/sql/driver"
  6. "encoding/binary"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "github.com/flashmob/go-guerrilla/backends"
  11. "github.com/flashmob/go-guerrilla/mail"
  12. "github.com/flashmob/go-guerrilla/mail/smtp"
  13. "github.com/go-sql-driver/mysql"
  14. "net"
  15. "strings"
  16. "time"
  17. )
  18. /*
  19. SQL schema
  20. ```
  21. create schema gmail collate utf8mb4_unicode_ci;
  22. CREATE TABLE `in_emails` (
  23. `mail_id` bigint unsigned NOT NULL AUTO_INCREMENT,
  24. `created_at` datetime NOT NULL,
  25. `size` int unsigned NOT NULL,
  26. `from` varbinary(255) NOT NULL,
  27. `to` varbinary(255) NOT NULL,
  28. `parts_info` text COLLATE utf8mb4_unicode_ci,
  29. `helo` varchar(255) COLLATE latin1_swedish_ci NOT NULL,
  30. `subject` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  31. `queued_id` binary(16) NOT NULL,
  32. `recipient` varbinary(255) NOT NULL,
  33. `ipv4_addr` int unsigned DEFAULT NULL,
  34. `ipv6_addr` varbinary(16) DEFAULT NULL,
  35. `return_path` varbinary(255) NOT NULL,
  36. `protocol` set('SMTP','SMTPS','ESMTP','ESMTPS','LMTP','LMTPS') COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'SMTP',
  37. `transport` set('7bit','8bit','unknown','invalid') COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'unknown',
  38. PRIMARY KEY (`mail_id`)
  39. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
  40. CREATE TABLE `in_emails_chunks` (
  41. `modified_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  42. `reference_count` int unsigned DEFAULT '1',
  43. `data` mediumblob NOT NULL,
  44. `hash` varbinary(16) NOT NULL,
  45. UNIQUE KEY `in_emails_chunks_hash_uindex` (`hash`)
  46. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
  47. ```
  48. ipv6_addr is big endian
  49. TODO compression, configurable SQL strings, logger
  50. */
  51. func init() {
  52. StorageEngines["sql"] = func() Storage {
  53. return new(StoreSQL)
  54. }
  55. }
  56. type sqlConfig struct {
  57. // EmailTable is the name of the main database table for the headers
  58. EmailTable string `json:"email_table,omitempty"`
  59. // EmailChunkTable stores the data of the emails in de-duplicated chunks
  60. EmailChunkTable string `json:"email_table_chunks,omitempty"`
  61. // Connection settings
  62. // Driver to use, eg "mysql"
  63. Driver string `json:"sql_driver,omitempty"`
  64. // DSN (required) is the connection string, eg.
  65. // "user:passt@tcp(127.0.0.1:3306)/db_name?readTimeout=10s&writeTimeout=10s&charset=utf8mb4&collation=utf8mb4_unicode_ci"
  66. DSN string `json:"sql_dsn,omitempty"`
  67. // MaxConnLifetime (optional) is a duration, eg. "30s"
  68. MaxConnLifetime string `json:"sql_max_conn_lifetime,omitempty"`
  69. // MaxOpenConns (optional) specifies the number of maximum open connections
  70. MaxOpenConns int `json:"sql_max_open_conns,omitempty"`
  71. // MaxIdleConns
  72. MaxIdleConns int `json:"sql_max_idle_conns,omitempty"`
  73. // CompressLevel controls the gzip compression level of email chunks.
  74. // 0 = no compression, 1 == best speed, 9 == best compression, -1 == default, -2 == huffman only
  75. CompressLevel int `json:"compress_level,omitempty"`
  76. }
  77. // StoreSQL implements the Storage interface
  78. type StoreSQL struct {
  79. config sqlConfig
  80. statements map[string]*sql.Stmt
  81. db *sql.DB
  82. }
  83. func (s *StoreSQL) StartWorker() (stop chan bool) {
  84. timeo := time.Second * 1
  85. stop = make(chan bool)
  86. go func() {
  87. select {
  88. case <-stop:
  89. return
  90. case <-time.After(timeo):
  91. t1 := int64(time.Now().UnixNano())
  92. // do stuff here
  93. if (time.Now().UnixNano())-t1 > int64(time.Second*3) {
  94. }
  95. }
  96. }()
  97. return stop
  98. }
  99. func (s *StoreSQL) connect() (*sql.DB, error) {
  100. var err error
  101. if s.db, err = sql.Open(s.config.Driver, s.config.DSN); err != nil {
  102. backends.Log().Error("cannot open database: ", err)
  103. return nil, err
  104. }
  105. if s.config.MaxOpenConns != 0 {
  106. s.db.SetMaxOpenConns(s.config.MaxOpenConns)
  107. }
  108. if s.config.MaxIdleConns != 0 {
  109. s.db.SetMaxIdleConns(s.config.MaxIdleConns)
  110. }
  111. if s.config.MaxConnLifetime != "" {
  112. t, err := time.ParseDuration(s.config.MaxConnLifetime)
  113. if err != nil {
  114. return nil, err
  115. }
  116. s.db.SetConnMaxLifetime(t)
  117. }
  118. // do we have permission to access the table?
  119. _, err = s.db.Query("SELECT mail_id FROM " + s.config.EmailTable + " LIMIT 1")
  120. if err != nil {
  121. return nil, err
  122. }
  123. return s.db, err
  124. }
  125. func (s *StoreSQL) prepareSql() error {
  126. if s.statements == nil {
  127. s.statements = make(map[string]*sql.Stmt)
  128. }
  129. // begin inserting an email (before saving chunks)
  130. if stmt, err := s.db.Prepare(`INSERT INTO ` +
  131. s.config.EmailTable +
  132. ` (queued_id, created_at, ` + "`from`" + `, helo, recipient, ipv4_addr, ipv6_addr, return_path, transport, protocol)
  133. VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`); err != nil {
  134. return err
  135. } else {
  136. s.statements["insertEmail"] = stmt
  137. }
  138. // insert a chunk of email's data
  139. if stmt, err := s.db.Prepare(`INSERT INTO ` +
  140. s.config.EmailChunkTable +
  141. ` (data, hash)
  142. VALUES(?, ?)`); err != nil {
  143. return err
  144. } else {
  145. s.statements["insertChunk"] = stmt
  146. }
  147. // finalize the email (the connection closed)
  148. if stmt, err := s.db.Prepare(`
  149. UPDATE ` + s.config.EmailTable + `
  150. SET size=?, parts_info=?, subject=?, ` + "`to`" + `=?, ` + "`from`" + `=?
  151. WHERE mail_id = ? `); err != nil {
  152. return err
  153. } else {
  154. s.statements["finalizeEmail"] = stmt
  155. }
  156. // Check the existence of a chunk (the reference_count col is incremented if it exists)
  157. // This means we can avoid re-inserting an existing chunk, only update its reference_count
  158. // check the "affected rows" count after executing query
  159. if stmt, err := s.db.Prepare(`
  160. UPDATE ` + s.config.EmailChunkTable + `
  161. SET reference_count=reference_count+1
  162. WHERE hash = ? `); err != nil {
  163. return err
  164. } else {
  165. s.statements["chunkReferenceIncr"] = stmt
  166. }
  167. // If the reference_count is 0 then it means the chunk has been deleted
  168. // Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
  169. if stmt, err := s.db.Prepare(`
  170. UPDATE ` + s.config.EmailChunkTable + `
  171. SET reference_count=reference_count-1
  172. WHERE hash = ? AND reference_count > 0`); err != nil {
  173. return err
  174. } else {
  175. s.statements["chunkReferenceDecr"] = stmt
  176. }
  177. // fetch an email
  178. if stmt, err := s.db.Prepare(`
  179. SELECT *
  180. from ` + s.config.EmailTable + `
  181. where mail_id=?`); err != nil {
  182. return err
  183. } else {
  184. s.statements["selectMail"] = stmt
  185. }
  186. // fetch a chunk
  187. if stmt, err := s.db.Prepare(`
  188. SELECT *
  189. from ` + s.config.EmailChunkTable + `
  190. where hash=?`); err != nil {
  191. return err
  192. } else {
  193. s.statements["selectChunk"] = stmt
  194. }
  195. // TODO sweep old chunks
  196. // TODO sweep incomplete emails
  197. return nil
  198. }
  199. const mysqlYYYY_m_d_s_H_i_s = "2006-01-02 15:04:05"
  200. // OpenMessage implements the Storage interface
  201. func (s *StoreSQL) OpenMessage(
  202. queuedID mail.Hash128,
  203. from string,
  204. helo string,
  205. recipient string,
  206. ipAddress IPAddr,
  207. returnPath string,
  208. protocol mail.Protocol,
  209. transport smtp.TransportType,
  210. ) (mailID uint64, err error) {
  211. // if it's ipv4 then we want ipv6 to be 0, and vice-versa
  212. var ip4 uint32
  213. ip6 := make([]byte, 16)
  214. if ip := ipAddress.IP.To4(); ip != nil {
  215. ip4 = binary.BigEndian.Uint32(ip)
  216. } else {
  217. copy(ip6, ipAddress.IP)
  218. }
  219. r, err := s.statements["insertEmail"].Exec(
  220. queuedID.Bytes(),
  221. time.Now().Format(mysqlYYYY_m_d_s_H_i_s),
  222. from,
  223. helo,
  224. recipient,
  225. ip4,
  226. ip6,
  227. returnPath,
  228. transport.String(),
  229. protocol.String())
  230. if err != nil {
  231. return 0, err
  232. }
  233. id, err := r.LastInsertId()
  234. if err != nil {
  235. return 0, err
  236. }
  237. return uint64(id), err
  238. }
  239. // AddChunk implements the Storage interface
  240. func (s *StoreSQL) AddChunk(data []byte, hash []byte) error {
  241. // attempt to increment the reference_count (it means the chunk is already in there)
  242. r, err := s.statements["chunkReferenceIncr"].Exec(hash)
  243. if err != nil {
  244. return err
  245. }
  246. affected, err := r.RowsAffected()
  247. if err != nil {
  248. return err
  249. }
  250. if affected == 0 {
  251. // chunk isn't in there, let's insert it
  252. _, err := s.statements["insertChunk"].Exec(data, hash)
  253. if err != nil {
  254. return err
  255. }
  256. }
  257. return nil
  258. }
  259. // CloseMessage implements the Storage interface
  260. func (s *StoreSQL) CloseMessage(
  261. mailID uint64,
  262. size int64,
  263. partsInfo *PartsInfo,
  264. subject string,
  265. to string, from string) error {
  266. partsInfoJson, err := json.Marshal(partsInfo)
  267. if err != nil {
  268. return err
  269. }
  270. _, err = s.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, to, from, mailID)
  271. if err != nil {
  272. return err
  273. }
  274. return nil
  275. }
  276. // Initialize loads the specific database config, connects to the db, prepares statements
  277. func (s *StoreSQL) Initialize(cfg backends.ConfigGroup) error {
  278. sd := backends.StreamDecorator{}
  279. err := sd.ExtractConfig(cfg, &s.config)
  280. if err != nil {
  281. return err
  282. }
  283. if s.config.EmailTable == "" {
  284. s.config.EmailTable = "in_emails"
  285. }
  286. if s.config.EmailChunkTable == "" {
  287. s.config.EmailChunkTable = "in_emails_chunks"
  288. }
  289. if s.config.Driver == "" {
  290. s.config.Driver = "mysql"
  291. }
  292. s.db, err = s.connect()
  293. if err != nil {
  294. return err
  295. }
  296. err = s.prepareSql()
  297. if err != nil {
  298. return err
  299. }
  300. return nil
  301. }
  302. // Shutdown implements the Storage interface
  303. func (s *StoreSQL) Shutdown() (err error) {
  304. defer func() {
  305. closeErr := s.db.Close()
  306. if closeErr != err {
  307. backends.Log().WithError(err).Error("failed to close sql database")
  308. err = closeErr
  309. }
  310. }()
  311. for i := range s.statements {
  312. if err = s.statements[i].Close(); err != nil {
  313. backends.Log().WithError(err).Error("failed to close sql statement")
  314. }
  315. }
  316. return err
  317. }
  318. // GetEmail implements the Storage interface
  319. func (s *StoreSQL) GetEmail(mailID uint64) (*Email, error) {
  320. email := &Email{}
  321. var createdAt mysql.NullTime
  322. var transport transportType
  323. var protocol protocol
  324. err := s.statements["selectMail"].QueryRow(mailID).Scan(
  325. &email.mailID,
  326. &createdAt,
  327. &email.size,
  328. &email.from,
  329. &email.to,
  330. &email.partsInfo,
  331. &email.helo,
  332. &email.subject,
  333. &email.queuedID,
  334. &email.recipient,
  335. &email.ipv4,
  336. &email.ipv6,
  337. &email.returnPath,
  338. &protocol,
  339. &transport,
  340. )
  341. email.createdAt = createdAt.Time
  342. email.protocol = protocol.Protocol
  343. email.transport = transport.TransportType
  344. if err != nil {
  345. return email, err
  346. }
  347. return email, nil
  348. }
  349. // Value implements the driver.Valuer interface
  350. func (h HashKey) Value() (driver.Value, error) {
  351. return h[:], nil
  352. }
  353. func (h *HashKey) Scan(value interface{}) error {
  354. b := value.([]uint8)
  355. h.Pack(b)
  356. return nil
  357. }
  358. type chunkData []uint8
  359. func (v chunkData) Value() (driver.Value, error) {
  360. return v[:], nil
  361. }
  362. // GetChunks implements the Storage interface
  363. func (s *StoreSQL) GetChunks(hash ...HashKey) ([]*Chunk, error) {
  364. result := make([]*Chunk, len(hash))
  365. // we need to wrap these in an interface{} so that they can be passed to db.Query
  366. args := make([]interface{}, len(hash))
  367. for i := range hash {
  368. args[i] = &hash[i]
  369. }
  370. query := fmt.Sprintf("SELECT modified_at, reference_count, data, `hash` FROM %s WHERE `hash` in (%s)",
  371. s.config.EmailChunkTable,
  372. "?"+strings.Repeat(",?", len(hash)-1),
  373. )
  374. rows, err := s.db.Query(query, args...)
  375. defer func() {
  376. if rows != nil {
  377. _ = rows.Close()
  378. }
  379. }()
  380. if err != nil {
  381. return result, err
  382. }
  383. // temp is a lookup table for hash -> chunk
  384. // since rows can come in different order, we need to make sure
  385. // that result is sorted in the order of args
  386. temp := make(map[HashKey]*Chunk, len(hash))
  387. i := 0
  388. for rows.Next() {
  389. var createdAt mysql.NullTime
  390. var data chunkData
  391. var h HashKey
  392. c := Chunk{}
  393. if err := rows.Scan(
  394. &createdAt,
  395. &c.referenceCount,
  396. &data,
  397. &h,
  398. ); err != nil {
  399. return result, err
  400. }
  401. c.data = bytes.NewBuffer(data)
  402. c.modifiedAt = createdAt.Time
  403. temp[h] = &c
  404. i++
  405. }
  406. // re-order the rows according to the order of the args (very important)
  407. for i := range args {
  408. b := args[i].(*HashKey)
  409. if _, ok := temp[*b]; ok {
  410. result[i] = temp[*b]
  411. }
  412. }
  413. if err := rows.Err(); err != nil || i == 0 {
  414. return result, errors.New("data chunks not found")
  415. }
  416. return result, nil
  417. }
  418. // zap is used in testing, purges everything
  419. func (s *StoreSQL) zap() error {
  420. if r, err := s.db.Exec("DELETE from " + s.config.EmailTable + " "); err != nil {
  421. return err
  422. } else {
  423. affected, _ := r.RowsAffected()
  424. fmt.Println(fmt.Sprintf("deleted %v emails", affected))
  425. }
  426. if r, err := s.db.Exec("DELETE from " + s.config.EmailChunkTable + " "); err != nil {
  427. return err
  428. } else {
  429. affected, _ := r.RowsAffected()
  430. fmt.Println(fmt.Sprintf("deleted %v chunks", affected))
  431. }
  432. return nil
  433. }
  434. // Scan implements database/sql scanner interface, for parsing PartsInfo
  435. func (info *PartsInfo) Scan(value interface{}) error {
  436. if value == nil {
  437. return errors.New("parts_info is null")
  438. }
  439. if data, ok := value.([]byte); !ok {
  440. return errors.New("parts_info is not str")
  441. } else {
  442. if err := json.Unmarshal(data, info); err != nil {
  443. return err
  444. }
  445. }
  446. return nil
  447. }
  448. // /Scan implements database/sql scanner interface, for parsing net.IPAddr
  449. func (ip *IPAddr) Scan(value interface{}) error {
  450. if value == nil {
  451. return nil
  452. }
  453. if data, ok := value.([]uint8); ok {
  454. if len(data) == 16 { // 128 bits
  455. // ipv6
  456. ipv6 := make(net.IP, 16)
  457. copy(ipv6, data)
  458. ip.IPAddr.IP = ipv6
  459. }
  460. }
  461. if data, ok := value.(int64); ok {
  462. // ipv4
  463. ipv4 := make(net.IP, 4)
  464. binary.BigEndian.PutUint32(ipv4, uint32(data))
  465. ip.IPAddr.IP = ipv4
  466. }
  467. return nil
  468. }
  469. type transportType struct {
  470. smtp.TransportType
  471. }
  472. type protocol struct {
  473. mail.Protocol
  474. }
  475. // Scan implements database/sql scanner interface, for parsing smtp.TransportType
  476. func (t *transportType) Scan(value interface{}) error {
  477. if data, ok := value.([]uint8); ok {
  478. v := smtp.ParseTransportType(string(data))
  479. t.TransportType = v
  480. }
  481. return nil
  482. }
  483. // Scan implements database/sql scanner interface, for parsing mail.Protocol
  484. func (p *protocol) Scan(value interface{}) error {
  485. if data, ok := value.([]uint8); ok {
  486. v := mail.ParseProtocolType(string(data))
  487. p.Protocol = v
  488. }
  489. return nil
  490. }