store_sql.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. package chunk
  2. import (
  3. "bytes"
  4. "database/sql"
  5. "database/sql/driver"
  6. "encoding/binary"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "github.com/flashmob/go-guerrilla/backends"
  11. "github.com/flashmob/go-guerrilla/mail"
  12. "github.com/flashmob/go-guerrilla/mail/smtp"
  13. "github.com/go-sql-driver/mysql"
  14. "net"
  15. "strings"
  16. "time"
  17. )
  18. /*
  19. SQL schema
  20. ```
  21. create schema gmail collate utf8mb4_unicode_ci;
  22. CREATE TABLE `in_emails` (
  23. `mail_id` bigint unsigned NOT NULL AUTO_INCREMENT,
  24. `created_at` datetime NOT NULL,
  25. `size` int unsigned NOT NULL,
  26. `from` varbinary(255) NOT NULL,
  27. `to` varbinary(255) NOT NULL,
  28. `parts_info` text COLLATE utf8mb4_unicode_ci,
  29. `helo` varchar(255) COLLATE latin1_swedish_ci NOT NULL,
  30. `subject` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL,
  31. `queued_id` binary(16) NOT NULL,
  32. `recipient` varbinary(255) NOT NULL,
  33. `ipv4_addr` int unsigned DEFAULT NULL,
  34. `ipv6_addr` varbinary(16) DEFAULT NULL,
  35. `return_path` varbinary(255) NOT NULL,
  36. `protocol` set('SMTP','SMTPS','ESMTP','ESMTPS','LMTP','LMTPS') COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'SMTP',
  37. `transport` set('7bit','8bit','unknown','invalid') COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT 'unknown',
  38. PRIMARY KEY (`mail_id`)
  39. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
  40. CREATE TABLE `in_emails_chunks` (
  41. `modified_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  42. `reference_count` int unsigned DEFAULT '1',
  43. `data` mediumblob NOT NULL,
  44. `hash` varbinary(16) NOT NULL,
  45. UNIQUE KEY `in_emails_chunks_hash_uindex` (`hash`)
  46. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
  47. ```
  48. ipv6_addr is big endian
  49. TODO compression, configurable SQL strings, logger
  50. */
  51. func init() {
  52. StorageEngines["sql"] = func() Storage {
  53. return new(StoreSQL)
  54. }
  55. }
  56. type sqlConfig struct {
  57. // EmailTable is the name of the main database table for the headers
  58. EmailTable string `json:"email_table,omitempty"`
  59. // EmailChunkTable stores the data of the emails in de-duplicated chunks
  60. EmailChunkTable string `json:"email_table_chunks,omitempty"`
  61. // Connection settings
  62. // Driver to use, eg "mysql"
  63. Driver string `json:"sql_driver,omitempty"`
  64. // DSN (required) is the connection string, eg.
  65. // "user:passt@tcp(127.0.0.1:3306)/db_name?readTimeout=10s&writeTimeout=10s&charset=utf8mb4&collation=utf8mb4_unicode_ci"
  66. DSN string `json:"sql_dsn,omitempty"`
  67. // MaxConnLifetime (optional) is a duration, eg. "30s"
  68. MaxConnLifetime string `json:"sql_max_conn_lifetime,omitempty"`
  69. // MaxOpenConns (optional) specifies the number of maximum open connections
  70. MaxOpenConns int `json:"sql_max_open_conns,omitempty"`
  71. // MaxIdleConns
  72. MaxIdleConns int `json:"sql_max_idle_conns,omitempty"`
  73. // CompressLevel controls the gzip compression level of email chunks.
  74. // 0 = no compression, 1 == best speed, 9 == best compression, -1 == default, -2 == huffman only
  75. CompressLevel int `json:"compress_level,omitempty"`
  76. }
  77. // StoreSQL implements the Storage interface
  78. type StoreSQL struct {
  79. config sqlConfig
  80. statements map[string]*sql.Stmt
  81. db *sql.DB
  82. }
  83. func (s *StoreSQL) StartWorker() (stop chan bool) {
  84. timeo := time.Second * 1
  85. stop = make(chan bool)
  86. go func() {
  87. select {
  88. case <-stop:
  89. return
  90. case <-time.After(timeo):
  91. t1 := int64(time.Now().UnixNano())
  92. // do stuff here
  93. if (time.Now().UnixNano())-t1 > int64(time.Second*3) {
  94. }
  95. }
  96. }()
  97. return stop
  98. }
  99. func (s *StoreSQL) connect() (*sql.DB, error) {
  100. var err error
  101. if s.db, err = sql.Open(s.config.Driver, s.config.DSN); err != nil {
  102. backends.Log().Error("cannot open database: ", err)
  103. return nil, err
  104. }
  105. if s.config.MaxOpenConns != 0 {
  106. s.db.SetMaxOpenConns(s.config.MaxOpenConns)
  107. }
  108. if s.config.MaxIdleConns != 0 {
  109. s.db.SetMaxIdleConns(s.config.MaxIdleConns)
  110. }
  111. if s.config.MaxConnLifetime != "" {
  112. t, err := time.ParseDuration(s.config.MaxConnLifetime)
  113. if err != nil {
  114. return nil, err
  115. }
  116. s.db.SetConnMaxLifetime(t)
  117. }
  118. stats := s.db.Stats()
  119. fmt.Println(stats)
  120. // do we have permission to access the table?
  121. _, err = s.db.Query("SELECT mail_id FROM " + s.config.EmailTable + " LIMIT 1")
  122. if err != nil {
  123. return nil, err
  124. }
  125. return s.db, err
  126. }
  127. func (s *StoreSQL) prepareSql() error {
  128. if s.statements == nil {
  129. s.statements = make(map[string]*sql.Stmt)
  130. }
  131. // begin inserting an email (before saving chunks)
  132. if stmt, err := s.db.Prepare(`INSERT INTO ` +
  133. s.config.EmailTable +
  134. ` (queued_id, created_at, ` + "`from`" + `, helo, recipient, ipv4_addr, ipv6_addr, return_path, transport, protocol)
  135. VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`); err != nil {
  136. return err
  137. } else {
  138. s.statements["insertEmail"] = stmt
  139. }
  140. // insert a chunk of email's data
  141. if stmt, err := s.db.Prepare(`INSERT INTO ` +
  142. s.config.EmailChunkTable +
  143. ` (data, hash)
  144. VALUES(?, ?)`); err != nil {
  145. return err
  146. } else {
  147. s.statements["insertChunk"] = stmt
  148. }
  149. // finalize the email (the connection closed)
  150. if stmt, err := s.db.Prepare(`
  151. UPDATE ` + s.config.EmailTable + `
  152. SET size=?, parts_info=?, subject=?, ` + "`to`" + `=?, ` + "`from`" + `=?
  153. WHERE mail_id = ? `); err != nil {
  154. return err
  155. } else {
  156. s.statements["finalizeEmail"] = stmt
  157. }
  158. // Check the existence of a chunk (the reference_count col is incremented if it exists)
  159. // This means we can avoid re-inserting an existing chunk, only update its reference_count
  160. // check the "affected rows" count after executing query
  161. if stmt, err := s.db.Prepare(`
  162. UPDATE ` + s.config.EmailChunkTable + `
  163. SET reference_count=reference_count+1
  164. WHERE hash = ? `); err != nil {
  165. return err
  166. } else {
  167. s.statements["chunkReferenceIncr"] = stmt
  168. }
  169. // If the reference_count is 0 then it means the chunk has been deleted
  170. // Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
  171. if stmt, err := s.db.Prepare(`
  172. UPDATE ` + s.config.EmailChunkTable + `
  173. SET reference_count=reference_count-1
  174. WHERE hash = ? AND reference_count > 0`); err != nil {
  175. return err
  176. } else {
  177. s.statements["chunkReferenceDecr"] = stmt
  178. }
  179. // fetch an email
  180. if stmt, err := s.db.Prepare(`
  181. SELECT *
  182. from ` + s.config.EmailTable + `
  183. where mail_id=?`); err != nil {
  184. return err
  185. } else {
  186. s.statements["selectMail"] = stmt
  187. }
  188. // fetch a chunk
  189. if stmt, err := s.db.Prepare(`
  190. SELECT *
  191. from ` + s.config.EmailChunkTable + `
  192. where hash=?`); err != nil {
  193. return err
  194. } else {
  195. s.statements["selectChunk"] = stmt
  196. }
  197. // TODO sweep old chunks
  198. // TODO sweep incomplete emails
  199. return nil
  200. }
  201. const mysqlYYYY_m_d_s_H_i_s = "2006-01-02 15:04:05"
  202. // OpenMessage implements the Storage interface
  203. func (s *StoreSQL) OpenMessage(
  204. queuedID mail.Hash128,
  205. from string,
  206. helo string,
  207. recipient string,
  208. ipAddress IPAddr,
  209. returnPath string,
  210. protocol mail.Protocol,
  211. transport smtp.TransportType,
  212. ) (mailID uint64, err error) {
  213. // if it's ipv4 then we want ipv6 to be 0, and vice-versa
  214. var ip4 uint32
  215. ip6 := make([]byte, 16)
  216. if ip := ipAddress.IP.To4(); ip != nil {
  217. ip4 = binary.BigEndian.Uint32(ip)
  218. } else {
  219. copy(ip6, ipAddress.IP)
  220. }
  221. r, err := s.statements["insertEmail"].Exec(
  222. queuedID.Bytes(),
  223. time.Now().Format(mysqlYYYY_m_d_s_H_i_s),
  224. from,
  225. helo,
  226. recipient,
  227. ip4,
  228. ip6,
  229. returnPath,
  230. transport.String(),
  231. protocol.String())
  232. if err != nil {
  233. return 0, err
  234. }
  235. id, err := r.LastInsertId()
  236. if err != nil {
  237. return 0, err
  238. }
  239. return uint64(id), err
  240. }
  241. // AddChunk implements the Storage interface
  242. func (s *StoreSQL) AddChunk(data []byte, hash []byte) error {
  243. // attempt to increment the reference_count (it means the chunk is already in there)
  244. r, err := s.statements["chunkReferenceIncr"].Exec(hash)
  245. if err != nil {
  246. return err
  247. }
  248. affected, err := r.RowsAffected()
  249. if err != nil {
  250. return err
  251. }
  252. if affected == 0 {
  253. // chunk isn't in there, let's insert it
  254. _, err := s.statements["insertChunk"].Exec(data, hash)
  255. if err != nil {
  256. return err
  257. }
  258. }
  259. return nil
  260. }
  261. // CloseMessage implements the Storage interface
  262. func (s *StoreSQL) CloseMessage(
  263. mailID uint64,
  264. size int64,
  265. partsInfo *PartsInfo,
  266. subject string,
  267. to string, from string) error {
  268. partsInfoJson, err := json.Marshal(partsInfo)
  269. if err != nil {
  270. return err
  271. }
  272. _, err = s.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, to, from, mailID)
  273. if err != nil {
  274. return err
  275. }
  276. return nil
  277. }
  278. // Initialize loads the specific database config, connects to the db, prepares statements
  279. func (s *StoreSQL) Initialize(cfg backends.ConfigGroup) error {
  280. sd := backends.StreamDecorator{}
  281. err := sd.ExtractConfig(cfg, &s.config)
  282. if err != nil {
  283. return err
  284. }
  285. if s.config.EmailTable == "" {
  286. s.config.EmailTable = "in_emails"
  287. }
  288. if s.config.EmailChunkTable == "" {
  289. s.config.EmailChunkTable = "in_emails_chunks"
  290. }
  291. if s.config.Driver == "" {
  292. s.config.Driver = "mysql"
  293. }
  294. s.db, err = s.connect()
  295. if err != nil {
  296. return err
  297. }
  298. err = s.prepareSql()
  299. if err != nil {
  300. return err
  301. }
  302. return nil
  303. }
  304. // Shutdown implements the Storage interface
  305. func (s *StoreSQL) Shutdown() (err error) {
  306. defer func() {
  307. closeErr := s.db.Close()
  308. if closeErr != err {
  309. backends.Log().WithError(err).Error("failed to close sql database")
  310. err = closeErr
  311. }
  312. }()
  313. for i := range s.statements {
  314. if err = s.statements[i].Close(); err != nil {
  315. backends.Log().WithError(err).Error("failed to close sql statement")
  316. }
  317. }
  318. return err
  319. }
  320. // GetEmail implements the Storage interface
  321. func (s *StoreSQL) GetEmail(mailID uint64) (*Email, error) {
  322. email := &Email{}
  323. var createdAt mysql.NullTime
  324. var transport transportType
  325. var protocol protocol
  326. err := s.statements["selectMail"].QueryRow(mailID).Scan(
  327. &email.mailID,
  328. &createdAt,
  329. &email.size,
  330. &email.from,
  331. &email.to,
  332. &email.partsInfo,
  333. &email.helo,
  334. &email.subject,
  335. &email.queuedID,
  336. &email.recipient,
  337. &email.ipv4,
  338. &email.ipv6,
  339. &email.returnPath,
  340. &protocol,
  341. &transport,
  342. )
  343. email.createdAt = createdAt.Time
  344. email.protocol = protocol.Protocol
  345. email.transport = transport.TransportType
  346. if err != nil {
  347. return email, err
  348. }
  349. return email, nil
  350. }
  351. // Value implements the driver.Valuer interface
  352. func (h HashKey) Value() (driver.Value, error) {
  353. return h[:], nil
  354. }
  355. func (h *HashKey) Scan(value interface{}) error {
  356. b := value.([]uint8)
  357. h.Pack(b)
  358. return nil
  359. }
  360. type chunkData []uint8
  361. func (v chunkData) Value() (driver.Value, error) {
  362. return v[:], nil
  363. }
  364. // GetChunks implements the Storage interface
  365. func (s *StoreSQL) GetChunks(hash ...HashKey) ([]*Chunk, error) {
  366. result := make([]*Chunk, len(hash))
  367. // we need to wrap these in an interface{} so that they can be passed to db.Query
  368. args := make([]interface{}, len(hash))
  369. for i := range hash {
  370. args[i] = &hash[i]
  371. }
  372. query := fmt.Sprintf("SELECT modified_at, reference_count, data, `hash` FROM %s WHERE `hash` in (%s)",
  373. s.config.EmailChunkTable,
  374. "?"+strings.Repeat(",?", len(hash)-1),
  375. )
  376. rows, err := s.db.Query(query, args...)
  377. defer func() {
  378. if rows != nil {
  379. _ = rows.Close()
  380. }
  381. }()
  382. if err != nil {
  383. return result, err
  384. }
  385. // temp is a lookup table for hash -> chunk
  386. // since rows can come in different order, we need to make sure
  387. // that result is sorted in the order of args
  388. temp := make(map[HashKey]*Chunk, len(hash))
  389. i := 0
  390. for rows.Next() {
  391. var createdAt mysql.NullTime
  392. var data chunkData
  393. var h HashKey
  394. c := Chunk{}
  395. if err := rows.Scan(
  396. &createdAt,
  397. &c.referenceCount,
  398. &data,
  399. &h,
  400. ); err != nil {
  401. return result, err
  402. }
  403. c.data = bytes.NewBuffer(data)
  404. c.modifiedAt = createdAt.Time
  405. temp[h] = &c
  406. i++
  407. }
  408. // re-order the rows according to the order of the args (very important)
  409. for i := range args {
  410. b := args[i].(*HashKey)
  411. if _, ok := temp[*b]; ok {
  412. result[i] = temp[*b]
  413. }
  414. }
  415. if err := rows.Err(); err != nil || i == 0 {
  416. return result, errors.New("data chunks not found")
  417. }
  418. return result, nil
  419. }
  420. // zap is used in testing, purges everything
  421. func (s *StoreSQL) zap() error {
  422. if r, err := s.db.Exec("DELETE from " + s.config.EmailTable + " "); err != nil {
  423. return err
  424. } else {
  425. affected, _ := r.RowsAffected()
  426. fmt.Println(fmt.Sprintf("deleted %v emails", affected))
  427. }
  428. if r, err := s.db.Exec("DELETE from " + s.config.EmailChunkTable + " "); err != nil {
  429. return err
  430. } else {
  431. affected, _ := r.RowsAffected()
  432. fmt.Println(fmt.Sprintf("deleted %v chunks", affected))
  433. }
  434. return nil
  435. }
  436. // Scan implements database/sql scanner interface, for parsing PartsInfo
  437. func (info *PartsInfo) Scan(value interface{}) error {
  438. if value == nil {
  439. return errors.New("parts_info is null")
  440. }
  441. if data, ok := value.([]byte); !ok {
  442. return errors.New("parts_info is not str")
  443. } else {
  444. if err := json.Unmarshal(data, info); err != nil {
  445. return err
  446. }
  447. }
  448. return nil
  449. }
  450. // /Scan implements database/sql scanner interface, for parsing net.IPAddr
  451. func (ip *IPAddr) Scan(value interface{}) error {
  452. if value == nil {
  453. return nil
  454. }
  455. if data, ok := value.([]uint8); ok {
  456. if len(data) == 16 { // 128 bits
  457. // ipv6
  458. ipv6 := make(net.IP, 16)
  459. copy(ipv6, data)
  460. ip.IPAddr.IP = ipv6
  461. }
  462. }
  463. if data, ok := value.(int64); ok {
  464. // ipv4
  465. ipv4 := make(net.IP, 4)
  466. binary.BigEndian.PutUint32(ipv4, uint32(data))
  467. ip.IPAddr.IP = ipv4
  468. }
  469. return nil
  470. }
  471. type transportType struct {
  472. smtp.TransportType
  473. }
  474. type protocol struct {
  475. mail.Protocol
  476. }
  477. // todo scanners for protocol & transport
  478. // Scan implements database/sql scanner interface, for parsing smtp.TransportType
  479. func (t *transportType) Scan(value interface{}) error {
  480. if data, ok := value.([]uint8); ok {
  481. v := smtp.ParseTransportType(string(data))
  482. t.TransportType = v
  483. }
  484. return nil
  485. }
  486. // Scan implements database/sql scanner interface, for parsing mail.Protocol
  487. func (p *protocol) Scan(value interface{}) error {
  488. if data, ok := value.([]uint8); ok {
  489. v := mail.ParseProtocolType(string(data))
  490. p.Protocol = v
  491. }
  492. return nil
  493. }