s_chunksaver.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922
  1. package backends
  2. // ----------------------------------------------------------------------------------
  3. // Processor Name: ChunkSaver
  4. // ----------------------------------------------------------------------------------
  5. // Description : Takes the stream and saves it in chunks. Chunks are split on the
  6. // : chunksaver_chunk_size config setting, and also at the end of MIME parts,
  7. // : and after a header. This allows for basic de-duplication: we can take a
  8. // : hash of each chunk, then check the database to see if we have it already.
  9. // : We don't need to write it to the database, but take the reference of the
  10. // : previously saved chunk and only increment the reference count.
  11. // : The rationale to put headers and bodies into separate chunks is
  12. // : due to headers often containing more unique data, while the bodies are
  13. // : often duplicated, especially for messages that are CC'd or forwarded
  14. // ----------------------------------------------------------------------------------
  15. // Requires : "mimeanalyzer" stream processor to be enabled before it
  16. // ----------------------------------------------------------------------------------
  17. // Config Options: chunksaver_chunk_size - maximum chunk size, in bytes
  18. // --------------:-------------------------------------------------------------------
  19. // Input : e.Values["MimeParts"] Which is of type *[]*mime.Part, as populated by "mimeanalyzer"
  20. // ----------------------------------------------------------------------------------
  21. // Output :
  22. // ----------------------------------------------------------------------------------
  23. import (
  24. "bytes"
  25. "compress/zlib"
  26. "crypto/md5"
  27. "database/sql"
  28. "encoding/binary"
  29. "encoding/json"
  30. "errors"
  31. "fmt"
  32. "github.com/flashmob/go-guerrilla/mail"
  33. "github.com/flashmob/go-guerrilla/mail/mime"
  34. "hash"
  35. "io"
  36. "net"
  37. "strings"
  38. "time"
  39. )
  40. type chunkSaverConfig struct {
  41. // ChunkMaxBytes controls the maximum buffer size for saving
  42. // 16KB default.
  43. ChunkMaxBytes int `json:"chunksaver_chunk_size"`
  44. StorageEngine string `json:"chunksaver_storage_engine"`
  45. CompressLevel int `json:"chunksaver_compress_level,omitempty"`
  46. }
  47. func init() {
  48. streamers["chunksaver"] = func() *StreamDecorator {
  49. return Chunksaver()
  50. }
  51. }
  52. type PartsInfo struct {
  53. Count uint32 `json:"c"` // number of parts
  54. TextPart int `json:"tp"` // id of the main text part to display
  55. HTMLPart int `json:"hp"` // id of the main html part to display (if any)
  56. HasAttach bool `json:"a"`
  57. Parts []chunkedPart `json:"p"`
  58. Dictionary []byte `json:"d"` // zlib dictionary
  59. }
  60. type chunkedPart struct {
  61. PartId string `json:"i"`
  62. ChunkHash [][16]byte `json:"h"` // sequence of hashes the data is stored at
  63. ContentType string `json:"t"`
  64. Charset string `json:"c"`
  65. TransferEncoding string `json:"e"`
  66. ContentDisposition string `json:"d"`
  67. }
  68. type flushEvent func() error
  69. type chunkedBytesBuffer struct {
  70. buf []byte
  71. flushTrigger flushEvent
  72. }
  73. // flush signals that it's time to write the buffer out to storage
  74. func (c *chunkedBytesBuffer) flush() error {
  75. if len(c.buf) == 0 {
  76. return nil
  77. }
  78. fmt.Print(string(c.buf))
  79. if c.flushTrigger != nil {
  80. if err := c.flushTrigger(); err != nil {
  81. return err
  82. }
  83. }
  84. c.Reset()
  85. return nil
  86. }
  87. // Reset sets the length back to 0, making it re-usable
  88. func (c *chunkedBytesBuffer) Reset() {
  89. c.buf = c.buf[:0] // set the length back to 0
  90. }
  91. // Write takes a p slice of bytes and writes it to the buffer.
  92. // It will never grow the buffer, flushing it as soon as it's full.
  93. func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
  94. remaining := len(p)
  95. bufCap := cap(c.buf)
  96. for {
  97. free := bufCap - len(c.buf)
  98. if free > remaining {
  99. // enough of room in the buffer
  100. c.buf = append(c.buf, p[i:i+remaining]...)
  101. i += remaining
  102. return
  103. } else {
  104. // fill the buffer to the 'brim' with a slice from p
  105. c.buf = append(c.buf, p[i:i+free]...)
  106. remaining -= free
  107. i += free
  108. err = c.flush()
  109. if err != nil {
  110. return i, err
  111. }
  112. if remaining == 0 {
  113. return
  114. }
  115. }
  116. }
  117. }
  118. // capTo caps the internal buffer to specified number of bytes, sets the length back to 0
  119. func (c *chunkedBytesBuffer) capTo(n int) {
  120. if cap(c.buf) == n {
  121. return
  122. }
  123. c.buf = make([]byte, 0, n)
  124. }
  125. type chunkedBytesBufferMime struct {
  126. chunkedBytesBuffer
  127. current *mime.Part
  128. info PartsInfo
  129. md5 hash.Hash
  130. database ChunkSaverStorage
  131. }
  132. func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
  133. b := new(chunkedBytesBufferMime)
  134. b.chunkedBytesBuffer.flushTrigger = func() error {
  135. return b.onFlush()
  136. }
  137. b.md5 = md5.New()
  138. b.buf = make([]byte, 0, chunkMaxBytes)
  139. return b
  140. }
  141. func (b *chunkedBytesBufferMime) setDatabase(database ChunkSaverStorage) {
  142. b.database = database
  143. }
  144. func (b *chunkedBytesBufferMime) onFlush() error {
  145. b.md5.Write(b.buf)
  146. var chash [16]byte
  147. copy(chash[:], b.md5.Sum([]byte{}))
  148. if b.current != nil {
  149. if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
  150. // existing part, just append the hash
  151. lastPart := &b.info.Parts[size-1]
  152. lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
  153. b.fillInfo(lastPart, size-1)
  154. } else {
  155. // add it as a new part
  156. part := chunkedPart{
  157. PartId: b.current.Node,
  158. ChunkHash: [][16]byte{chash},
  159. }
  160. b.fillInfo(&part, 0)
  161. b.info.Parts = append(b.info.Parts, part)
  162. b.info.Count++
  163. }
  164. if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
  165. return err
  166. }
  167. }
  168. return nil
  169. }
  170. func (b *chunkedBytesBufferMime) fillInfo(cp *chunkedPart, index int) {
  171. if cp.ContentType == "" && b.current.ContentType != nil {
  172. cp.ContentType = b.current.ContentType.String()
  173. }
  174. if cp.Charset == "" && b.current.Charset != "" {
  175. cp.Charset = b.current.Charset
  176. }
  177. if cp.TransferEncoding == "" && b.current.TransferEncoding != "" {
  178. cp.TransferEncoding = b.current.TransferEncoding
  179. }
  180. if cp.ContentDisposition == "" && b.current.ContentDisposition != "" {
  181. cp.ContentDisposition = b.current.ContentDisposition
  182. if strings.Contains(cp.ContentDisposition, "attach") {
  183. b.info.HasAttach = true
  184. }
  185. }
  186. if cp.ContentType != "" {
  187. if b.info.TextPart == -1 && strings.Contains(cp.ContentType, "text/plain") {
  188. b.info.TextPart = index
  189. } else if b.info.HTMLPart == -1 && strings.Contains(cp.ContentType, "text/html") {
  190. b.info.HTMLPart = index
  191. }
  192. }
  193. }
  194. func (b *chunkedBytesBufferMime) Reset() {
  195. b.md5.Reset()
  196. b.chunkedBytesBuffer.Reset()
  197. }
  198. func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
  199. if b.current == nil {
  200. b.info = PartsInfo{Parts: make([]chunkedPart, 0, 3), TextPart: -1, HTMLPart: -1}
  201. }
  202. b.current = cp
  203. }
  204. // ChunkSaverStorage defines an interface to the storage layer (the database)
  205. type ChunkSaverStorage interface {
  206. OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error)
  207. CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error
  208. AddChunk(data []byte, hash []byte) error
  209. GetEmail(mailID uint64) (*ChunkSaverEmail, error)
  210. GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error)
  211. Initialize(cfg BackendConfig) error
  212. Shutdown() (err error)
  213. }
  214. type ChunkSaverEmail struct {
  215. mailID uint64
  216. createdAt time.Time
  217. size int64
  218. from string
  219. to string
  220. partsInfo PartsInfo
  221. helo string
  222. subject string
  223. deliveryID string
  224. recipient string
  225. ipv4 net.IPAddr
  226. ipv6 net.IPAddr
  227. returnPath string
  228. isTLS bool
  229. }
  230. type ChunkSaverChunk struct {
  231. modifiedAt time.Time
  232. referenceCount uint
  233. data io.Reader
  234. }
  235. type chunkSaverMemoryEmail struct {
  236. mailID uint64
  237. createdAt time.Time
  238. size int64
  239. from string
  240. to string
  241. partsInfo []byte
  242. helo string
  243. subject string
  244. deliveryID string
  245. recipient string
  246. ipv4 net.IPAddr
  247. ipv6 net.IPAddr
  248. returnPath string
  249. isTLS bool
  250. }
  251. type chunkSaverMemoryChunk struct {
  252. modifiedAt time.Time
  253. referenceCount uint
  254. data []byte
  255. }
  256. type chunkSaverMemory struct {
  257. chunks map[[16]byte]*chunkSaverMemoryChunk
  258. emails []*chunkSaverMemoryEmail
  259. nextID uint64
  260. IDOffset uint64
  261. compressLevel int
  262. }
  263. func (m *chunkSaverMemory) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
  264. var ip4, ip6 net.IPAddr
  265. if ip := ipAddress.IP.To4(); ip != nil {
  266. ip4 = ipAddress
  267. } else {
  268. ip6 = ipAddress
  269. }
  270. email := chunkSaverMemoryEmail{
  271. mailID: m.nextID,
  272. createdAt: time.Now(),
  273. from: from,
  274. helo: helo,
  275. recipient: recipient,
  276. ipv4: ip4,
  277. ipv6: ip6,
  278. returnPath: returnPath,
  279. isTLS: isTLS,
  280. }
  281. m.emails = append(m.emails, &email)
  282. m.nextID++
  283. return email.mailID, nil
  284. }
  285. func (m *chunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
  286. if email := m.emails[mailID-m.IDOffset]; email == nil {
  287. return errors.New("email not found")
  288. } else {
  289. email.size = size
  290. if info, err := json.Marshal(partsInfo); err != nil {
  291. return err
  292. } else {
  293. email.partsInfo = info
  294. }
  295. email.subject = subject
  296. email.deliveryID = deliveryID
  297. email.to = to
  298. email.from = from
  299. email.size = size
  300. }
  301. return nil
  302. }
  303. func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
  304. var key [16]byte
  305. if len(hash) != 16 {
  306. return errors.New("invalid hash")
  307. }
  308. copy(key[:], hash[0:16])
  309. var compressed bytes.Buffer
  310. zlibw, err := zlib.NewWriterLevel(&compressed, m.compressLevel)
  311. if err != nil {
  312. return err
  313. }
  314. if chunk, ok := m.chunks[key]; ok {
  315. // only update the counters and update time
  316. chunk.referenceCount++
  317. chunk.modifiedAt = time.Now()
  318. } else {
  319. if _, err := zlibw.Write(data); err != nil {
  320. return err
  321. }
  322. if err := zlibw.Close(); err != nil {
  323. return err
  324. }
  325. // add a new chunk
  326. newChunk := chunkSaverMemoryChunk{
  327. modifiedAt: time.Now(),
  328. referenceCount: 1,
  329. data: compressed.Bytes(),
  330. }
  331. m.chunks[key] = &newChunk
  332. }
  333. return nil
  334. }
  335. func (m *chunkSaverMemory) Initialize(cfg BackendConfig) error {
  336. m.IDOffset = 1
  337. m.nextID = m.IDOffset
  338. m.emails = make([]*chunkSaverMemoryEmail, 0, 100)
  339. m.chunks = make(map[[16]byte]*chunkSaverMemoryChunk, 1000)
  340. m.compressLevel = zlib.NoCompression
  341. return nil
  342. }
  343. func (m *chunkSaverMemory) Shutdown() (err error) {
  344. m.emails = nil
  345. m.chunks = nil
  346. return nil
  347. }
  348. func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
  349. if size := uint64(len(m.emails)) - m.IDOffset; size > mailID-m.IDOffset {
  350. return nil, errors.New("mail not found")
  351. }
  352. email := m.emails[mailID-m.IDOffset]
  353. pi := &PartsInfo{}
  354. if err := json.Unmarshal(email.partsInfo, pi); err != nil {
  355. return nil, err
  356. }
  357. return &ChunkSaverEmail{
  358. mailID: email.mailID,
  359. createdAt: email.createdAt,
  360. size: email.size,
  361. from: email.from,
  362. to: email.to,
  363. partsInfo: *pi,
  364. helo: email.helo,
  365. subject: email.subject,
  366. deliveryID: email.deliveryID,
  367. recipient: email.recipient,
  368. ipv4: email.ipv4,
  369. ipv6: email.ipv6,
  370. returnPath: email.returnPath,
  371. isTLS: email.isTLS,
  372. }, nil
  373. }
  374. func (m *chunkSaverMemory) GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error) {
  375. result := make([]*ChunkSaverChunk, 0, len(hash))
  376. var key [16]byte
  377. for i := range hash {
  378. copy(key[:], hash[i][:16])
  379. if c, ok := m.chunks[key]; ok {
  380. zwr, err := zlib.NewReader(bytes.NewReader(c.data))
  381. if err != nil {
  382. return nil, err
  383. }
  384. result = append(result, &ChunkSaverChunk{
  385. modifiedAt: c.modifiedAt,
  386. referenceCount: c.referenceCount,
  387. data: zwr,
  388. })
  389. }
  390. }
  391. return result, nil
  392. }
  393. type chunkSaverSQLConfig struct {
  394. EmailTable string `json:"email_table"`
  395. ChunkTable string `json:"chunk_table"`
  396. Driver string `json:"sql_driver"`
  397. DSN string `json:"sql_dsn"`
  398. PrimaryHost string `json:"primary_mail_host"`
  399. }
  400. // chunkSaverSQL implements the ChunkSaverStorage interface
  401. type chunkSaverSQL struct {
  402. config *chunkSaverSQLConfig
  403. statements map[string]*sql.Stmt
  404. db *sql.DB
  405. }
  406. func (c *chunkSaverSQL) connect() (*sql.DB, error) {
  407. var err error
  408. if c.db, err = sql.Open(c.config.Driver, c.config.DSN); err != nil {
  409. Log().Error("cannot open database: ", err)
  410. return nil, err
  411. }
  412. // do we have permission to access the table?
  413. _, err = c.db.Query("SELECT mail_id FROM " + c.config.EmailTable + " LIMIT 1")
  414. if err != nil {
  415. return nil, err
  416. }
  417. return c.db, err
  418. }
  419. func (c *chunkSaverSQL) prepareSql() error {
  420. if c.statements == nil {
  421. c.statements = make(map[string]*sql.Stmt)
  422. }
  423. if stmt, err := c.db.Prepare(`INSERT INTO ` +
  424. c.config.EmailTable +
  425. ` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, is_tls)
  426. VALUES(?, ?, ?, ?, ?, ?, ?)`); err != nil {
  427. return err
  428. } else {
  429. c.statements["insertEmail"] = stmt
  430. }
  431. // begin inserting an email (before saving chunks)
  432. if stmt, err := c.db.Prepare(`INSERT INTO ` +
  433. c.config.ChunkTable +
  434. ` (data, hash)
  435. VALUES(?, ?)`); err != nil {
  436. return err
  437. } else {
  438. c.statements["insertChunk"] = stmt
  439. }
  440. // finalize the email (the connection closed)
  441. if stmt, err := c.db.Prepare(`
  442. UPDATE ` + c.config.EmailTable + `
  443. SET size=?, parts_info = ?, subject, delivery_id = ?, to = ?
  444. WHERE mail_id = ? `); err != nil {
  445. return err
  446. } else {
  447. c.statements["finalizeEmail"] = stmt
  448. }
  449. // Check the existence of a chunk (the reference_count col is incremented if it exists)
  450. // This means we can avoid re-inserting an existing chunk, only update its reference_count
  451. if stmt, err := c.db.Prepare(`
  452. UPDATE ` + c.config.ChunkTable + `
  453. SET reference_count=reference_count+1
  454. WHERE hash = ? `); err != nil {
  455. return err
  456. } else {
  457. c.statements["chunkReferenceIncr"] = stmt
  458. }
  459. // If the reference_count is 0 then it means the chunk has been deleted
  460. // Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
  461. if stmt, err := c.db.Prepare(`
  462. UPDATE ` + c.config.ChunkTable + `
  463. SET reference_count=reference_count-1
  464. WHERE hash = ? AND reference_count > 0`); err != nil {
  465. return err
  466. } else {
  467. c.statements["chunkReferenceDecr"] = stmt
  468. }
  469. // fetch an email
  470. if stmt, err := c.db.Prepare(`
  471. SELECT *
  472. from ` + c.config.EmailTable + `
  473. where mail_id=?`); err != nil {
  474. return err
  475. } else {
  476. c.statements["selectMail"] = stmt
  477. }
  478. // fetch a chunk
  479. if stmt, err := c.db.Prepare(`
  480. SELECT *
  481. from ` + c.config.ChunkTable + `
  482. where hash=?`); err != nil {
  483. return err
  484. } else {
  485. c.statements["selectChunk"] = stmt
  486. }
  487. // sweep old chunks
  488. // sweep incomplete emails
  489. return nil
  490. }
  491. func (c *chunkSaverSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
  492. // if it's ipv4 then we want ipv6 to be 0, and vice-versa
  493. var ip4 uint32
  494. ip6 := make([]byte, 16)
  495. if ip := ipAddress.IP.To4(); ip != nil {
  496. ip4 = binary.BigEndian.Uint32(ip)
  497. } else {
  498. _ = copy(ip6, []byte(ipAddress.IP))
  499. }
  500. r, err := c.statements["insertEmail"].Exec(from, helo, recipient, ip4, ip6, returnPath, isTLS)
  501. if err != nil {
  502. return 0, err
  503. }
  504. id, err := r.LastInsertId()
  505. if err != nil {
  506. return 0, err
  507. }
  508. return uint64(id), err
  509. }
  510. func (c *chunkSaverSQL) AddChunk(data []byte, hash []byte) error {
  511. // attempt to increment the reference_count (it means the chunk is already in there)
  512. r, err := c.statements["chunkReferenceIncr"].Exec(hash)
  513. if err != nil {
  514. return err
  515. }
  516. affected, err := r.RowsAffected()
  517. if err != nil {
  518. return err
  519. }
  520. if affected == 0 {
  521. // chunk isn't in there, let's insert it
  522. _, err := c.statements["insertChunk"].Exec(data, hash)
  523. if err != nil {
  524. return err
  525. }
  526. }
  527. return nil
  528. }
  529. func (c *chunkSaverSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
  530. partsInfoJson, err := json.Marshal(partsInfo)
  531. if err != nil {
  532. return err
  533. }
  534. _, err = c.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, deliveryID, to, mailID)
  535. if err != nil {
  536. return err
  537. }
  538. return nil
  539. }
  540. // Initialize loads the specific database config, connects to the db, prepares statements
  541. func (c *chunkSaverSQL) Initialize(cfg BackendConfig) error {
  542. configType := BaseConfig(&chunkSaverSQLConfig{})
  543. bcfg, err := Svc.ExtractConfig(cfg, configType)
  544. if err != nil {
  545. return err
  546. }
  547. c.config = bcfg.(*chunkSaverSQLConfig)
  548. c.db, err = c.connect()
  549. if err != nil {
  550. return err
  551. }
  552. err = c.prepareSql()
  553. if err != nil {
  554. return err
  555. }
  556. return nil
  557. }
  558. func (c *chunkSaverSQL) Shutdown() (err error) {
  559. defer func() {
  560. closeErr := c.db.Close()
  561. if closeErr != err {
  562. Log().WithError(err).Error("failed to close sql database")
  563. err = closeErr
  564. }
  565. }()
  566. for i := range c.statements {
  567. if err = c.statements[i].Close(); err != nil {
  568. Log().WithError(err).Error("failed to close sql statement")
  569. }
  570. }
  571. return err
  572. }
  573. func (c *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
  574. return &ChunkSaverEmail{}, nil
  575. }
  576. func (c *chunkSaverSQL) GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error) {
  577. result := make([]*ChunkSaverChunk, 0, len(hash))
  578. return result, nil
  579. }
  580. type chunkMailReader struct {
  581. db ChunkSaverStorage
  582. email *ChunkSaverEmail
  583. part int
  584. i, j int
  585. }
  586. // NewChunkMailReader loads the email and selects which mime-part Read will return using the part argument
  587. // if part is -1, Read will read in the entire message
  588. func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int) (*chunkMailReader, error) {
  589. r := new(chunkMailReader)
  590. r.db = db
  591. r.part = part
  592. if email == nil {
  593. return nil, errors.New("nil email")
  594. } else {
  595. r.email = email
  596. }
  597. if err := r.SeekPart(part); err != nil {
  598. return nil, err
  599. }
  600. return r, nil
  601. }
  602. func (r *chunkMailReader) SeekPart(part int) error {
  603. if parts := len(r.email.partsInfo.Parts); parts == 0 {
  604. return errors.New("email has mime parts missing")
  605. } else if part > parts {
  606. return errors.New("no such part available")
  607. }
  608. r.i = part
  609. r.j = 0
  610. return nil
  611. }
  612. func (r *chunkMailReader) Read(p []byte) (n int, err error) {
  613. var chunks []*ChunkSaverChunk
  614. if r.part < 90 {
  615. for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
  616. chunks, err = r.db.GetChunks(r.email.partsInfo.Parts[r.i].ChunkHash...)
  617. if err != nil {
  618. return
  619. }
  620. var nRead int
  621. for r.j < len(chunks) {
  622. nRead, err = chunks[r.j].data.Read(p)
  623. if err == io.EOF {
  624. r.j++ // advance to the next chunk
  625. err = nil
  626. }
  627. if r.j == len(chunks) { // last chunk in a part?
  628. r.j = 0 // reset chunk index
  629. r.i++ // advance to the next part
  630. if r.i == len(r.email.partsInfo.Parts) || r.part > 0 {
  631. // there are no more parts to return
  632. err = io.EOF
  633. }
  634. }
  635. // unless there's an error, the next time this function will be
  636. // called, it will read the next chunk
  637. return nRead, err
  638. }
  639. }
  640. err = io.EOF
  641. }
  642. return n, err
  643. }
  644. const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
  645. /**
  646. *
  647. * A chunk ends ether:
  648. * after xKB or after end of a part, or end of header
  649. *
  650. * - buffer first chunk
  651. * - if didn't receive first chunk for more than x bytes, save normally
  652. *
  653. */
  654. func Chunksaver() *StreamDecorator {
  655. sd := &StreamDecorator{}
  656. sd.Decorate =
  657. func(sp StreamProcessor, a ...interface{}) StreamProcessor {
  658. var (
  659. envelope *mail.Envelope
  660. chunkBuffer *chunkedBytesBufferMime
  661. msgPos uint
  662. database ChunkSaverStorage
  663. written int64
  664. // just some headers from the first mime-part
  665. subject string
  666. to string
  667. from string
  668. )
  669. var config *chunkSaverConfig
  670. // optional dependency injection
  671. for i := range a {
  672. if db, ok := a[i].(ChunkSaverStorage); ok {
  673. database = db
  674. }
  675. if buff, ok := a[i].(*chunkedBytesBufferMime); ok {
  676. chunkBuffer = buff
  677. }
  678. }
  679. Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
  680. configType := BaseConfig(&chunkSaverConfig{})
  681. bcfg, err := Svc.ExtractConfig(backendConfig, configType)
  682. if err != nil {
  683. return err
  684. }
  685. config = bcfg.(*chunkSaverConfig)
  686. if chunkBuffer == nil {
  687. chunkBuffer = newChunkedBytesBufferMime()
  688. }
  689. // configure storage if none was injected
  690. if database == nil {
  691. if config.StorageEngine == "memory" {
  692. db := new(chunkSaverMemory)
  693. db.compressLevel = config.CompressLevel
  694. database = db
  695. } else {
  696. db := new(chunkSaverSQL)
  697. database = db
  698. }
  699. }
  700. err = database.Initialize(backendConfig)
  701. if err != nil {
  702. return err
  703. }
  704. // configure the chunks buffer
  705. if config.ChunkMaxBytes > 0 {
  706. chunkBuffer.capTo(config.ChunkMaxBytes)
  707. } else {
  708. chunkBuffer.capTo(chunkMaxBytes)
  709. }
  710. chunkBuffer.setDatabase(database)
  711. return nil
  712. }))
  713. Svc.AddShutdowner(ShutdownWith(func() error {
  714. err := database.Shutdown()
  715. return err
  716. }))
  717. sd.Open = func(e *mail.Envelope) error {
  718. // create a new entry & grab the id
  719. written = 0
  720. var ip net.IPAddr
  721. if ret := net.ParseIP(e.RemoteIP); ret != nil {
  722. ip = net.IPAddr{IP: ret}
  723. }
  724. mid, err := database.OpenMessage(
  725. e.MailFrom.String(),
  726. e.Helo,
  727. e.RcptTo[0].String(),
  728. ip,
  729. e.MailFrom.String(),
  730. e.TLS)
  731. if err != nil {
  732. return err
  733. }
  734. e.Values["messageID"] = mid
  735. envelope = e
  736. return nil
  737. }
  738. sd.Close = func() (err error) {
  739. err = chunkBuffer.flush()
  740. if err != nil {
  741. // TODO we could delete the half saved message here
  742. return err
  743. }
  744. defer chunkBuffer.Reset()
  745. if mid, ok := envelope.Values["messageID"].(uint64); ok {
  746. err = database.CloseMessage(
  747. mid,
  748. written,
  749. &chunkBuffer.info,
  750. subject,
  751. envelope.QueuedId,
  752. to,
  753. from,
  754. )
  755. if err != nil {
  756. return err
  757. }
  758. }
  759. return nil
  760. }
  761. fillVars := func(parts *[]*mime.Part, subject, to, from string) (string, string, string) {
  762. if len(*parts) > 0 {
  763. if subject == "" {
  764. if val, ok := (*parts)[0].Headers["Subject"]; ok {
  765. subject = val[0]
  766. }
  767. }
  768. if to == "" {
  769. if val, ok := (*parts)[0].Headers["To"]; ok {
  770. addr, err := mail.NewAddress(val[0])
  771. if err == nil {
  772. to = addr.String()
  773. }
  774. }
  775. }
  776. if from == "" {
  777. if val, ok := (*parts)[0].Headers["From"]; ok {
  778. addr, err := mail.NewAddress(val[0])
  779. if err == nil {
  780. from = addr.String()
  781. }
  782. }
  783. }
  784. }
  785. return subject, to, from
  786. }
  787. return StreamProcessWith(func(p []byte) (count int, err error) {
  788. if envelope.Values == nil {
  789. return count, errors.New("no message headers found")
  790. }
  791. if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok {
  792. var (
  793. pos int
  794. progress int
  795. )
  796. if len(*parts) > 2 {
  797. // todo: progress is a bit buggy
  798. // todo: do not flush empty buffer
  799. //progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already out
  800. }
  801. subject, to, from = fillVars(parts, subject, to, from)
  802. offset := msgPos
  803. for i := progress; i < len(*parts); i++ {
  804. part := (*parts)[i]
  805. chunkBuffer.currentPart(part)
  806. // break chunk on new part
  807. if part.StartingPos > 0 && part.StartingPos > msgPos {
  808. count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
  809. written += int64(count)
  810. err = chunkBuffer.flush()
  811. if err != nil {
  812. return count, err
  813. }
  814. fmt.Println("->N")
  815. pos += count
  816. msgPos = part.StartingPos
  817. }
  818. // break chunk on header
  819. if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
  820. count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
  821. written += int64(count)
  822. err = chunkBuffer.flush()
  823. if err != nil {
  824. return count, err
  825. }
  826. fmt.Println("->H")
  827. pos += count
  828. msgPos = part.StartingPosBody
  829. }
  830. // if on the latest (last) part, and yet there is still data to be written out
  831. if len(*parts)-1 == i && len(p)-1 > pos {
  832. count, _ = chunkBuffer.Write(p[pos:])
  833. written += int64(count)
  834. pos += count
  835. msgPos += uint(count)
  836. }
  837. // if there's no more data
  838. if pos >= len(p) {
  839. break
  840. }
  841. }
  842. }
  843. return sp.Write(p)
  844. })
  845. }
  846. return sd
  847. }