s_chunksaver.go 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260
  1. package backends
  2. // ----------------------------------------------------------------------------------
  3. // Processor Name: ChunkSaver
  4. // ----------------------------------------------------------------------------------
  5. // Description : Takes the stream and saves it in chunks. Chunks are split on the
  6. // : chunksaver_chunk_size config setting, and also at the end of MIME parts,
  7. // : and after a header. This allows for basic de-duplication: we can take a
  8. // : hash of each chunk, then check the database to see if we have it already.
  9. // : We don't need to write it to the database, but take the reference of the
  10. // : previously saved chunk and only increment the reference count.
  11. // : The rationale to put headers and bodies into separate chunks is
  12. // : due to headers often containing more unique data, while the bodies are
  13. // : often duplicated, especially for messages that are CC'd or forwarded
  14. // ----------------------------------------------------------------------------------
  15. // Requires : "mimeanalyzer" stream processor to be enabled before it
  16. // ----------------------------------------------------------------------------------
  17. // Config Options: chunksaver_chunk_size - maximum chunk size, in bytes
  18. // --------------:-------------------------------------------------------------------
  19. // Input : e.Values["MimeParts"] Which is of type *[]*mime.Part, as populated by "mimeanalyzer"
  20. // ----------------------------------------------------------------------------------
  21. // Output :
  22. // ----------------------------------------------------------------------------------
  23. import (
  24. "bytes"
  25. "compress/zlib"
  26. "crypto/md5"
  27. "database/sql"
  28. "encoding/base64"
  29. "encoding/binary"
  30. "encoding/json"
  31. "errors"
  32. "fmt"
  33. "github.com/flashmob/go-guerrilla/mail"
  34. "github.com/flashmob/go-guerrilla/mail/mime"
  35. "hash"
  36. "io"
  37. "io/ioutil"
  38. "net"
  39. "strings"
  40. "sync"
  41. "time"
  42. )
  43. type chunkSaverConfig struct {
  44. // ChunkMaxBytes controls the maximum buffer size for saving
  45. // 16KB default.
  46. ChunkMaxBytes int `json:"chunksaver_chunk_size,omitempty"`
  47. StorageEngine string `json:"chunksaver_storage_engine,omitempty"`
  48. CompressLevel int `json:"chunksaver_compress_level,omitempty"`
  49. }
  50. func init() {
  51. streamers["chunksaver"] = func() *StreamDecorator {
  52. return Chunksaver()
  53. }
  54. }
  55. const hashByteSize = 16
  56. type HashKey [hashByteSize]byte
  57. // Pack takes a slice and copies each byte to HashKey internal representation
  58. func (h *HashKey) Pack(b []byte) {
  59. if len(b) < hashByteSize {
  60. return
  61. }
  62. copy(h[:], b[0:hashByteSize])
  63. }
  64. // String implements the Stringer interface from fmt
  65. func (h HashKey) String() string {
  66. return base64.RawStdEncoding.EncodeToString(h[0:hashByteSize])
  67. }
  68. // UnmarshalJSON implements the Unmarshaler interface from encoding/json
  69. func (h *HashKey) UnmarshalJSON(b []byte) error {
  70. dbuf := make([]byte, base64.RawStdEncoding.DecodedLen(len(b[1:len(b)-1])))
  71. _, err := base64.RawStdEncoding.Decode(dbuf, b[1:len(b)-1])
  72. if err != nil {
  73. return err
  74. }
  75. h.Pack(dbuf)
  76. return nil
  77. }
  78. // MarshalJSON implements the Marshaler interface from encoding/json
  79. // The value is marshaled as a raw base64 to save some bytes
  80. // eg. instead of typically using hex, de17038001170380011703ff01170380 would be represented as 3hcDgAEXA4ABFwP/ARcDgA
  81. func (h *HashKey) MarshalJSON() ([]byte, error) {
  82. return []byte(`"` + h.String() + `"`), nil
  83. }
  84. // PartsInfo describes the mime-parts contained in the email
  85. type PartsInfo struct {
  86. Count uint32 `json:"c"` // number of parts
  87. TextPart int `json:"tp"` // index of the main text part to display
  88. HTMLPart int `json:"hp"` // index of the main html part to display (if any)
  89. HasAttach bool `json:"a"` // is there an attachment?
  90. Parts []ChunkedPart `json:"p"` // info describing a mime-part
  91. CBoundaries []string `json:"cbl"` // content boundaries list
  92. bp sync.Pool // bytes.buffer pool
  93. }
  94. // ChunkedPart contains header information about a mime-part, including keys pointing to where the data is stored at
  95. type ChunkedPart struct {
  96. PartId string `json:"i"`
  97. Size uint `json:"s"`
  98. ChunkHash []HashKey `json:"h"` // sequence of hashes the data is stored at
  99. ContentType string `json:"t"`
  100. Charset string `json:"c"`
  101. TransferEncoding string `json:"e"`
  102. ContentDisposition string `json:"d"`
  103. ContentBoundary int `json:"cb"` // index to the CBoundaries list in PartsInfo
  104. }
  105. func NewPartsInfo() *PartsInfo {
  106. pi := new(PartsInfo)
  107. pi.bp = sync.Pool{
  108. // if not available, then create a new one
  109. New: func() interface{} {
  110. var b bytes.Buffer
  111. return &b
  112. },
  113. }
  114. return pi
  115. }
  116. // boundary takes a string and returns the index of the string in the info.CBoundaries slice
  117. func (info *PartsInfo) boundary(cb string) int {
  118. for i := range info.CBoundaries {
  119. if info.CBoundaries[i] == cb {
  120. return i
  121. }
  122. }
  123. info.CBoundaries = append(info.CBoundaries, cb)
  124. return len(info.CBoundaries) - 1
  125. }
  126. // UnmarshalJSON unmarshals the JSON and decompresses using zlib
  127. func (info *PartsInfo) UnmarshalJSONZlib(b []byte) error {
  128. r, err := zlib.NewReader(bytes.NewReader(b[1 : len(b)-1]))
  129. if err != nil {
  130. return err
  131. }
  132. all, err := ioutil.ReadAll(r)
  133. if err != nil {
  134. return err
  135. }
  136. err = json.Unmarshal(all, info)
  137. if err != nil {
  138. return err
  139. }
  140. return nil
  141. }
  142. // MarshalJSONZlib marshals and compresses the bytes using zlib
  143. func (info *PartsInfo) MarshalJSONZlib() ([]byte, error) {
  144. buf, err := json.Marshal(info)
  145. if err != nil {
  146. return buf, err
  147. }
  148. // borrow a buffer form the pool
  149. compressed := info.bp.Get().(*bytes.Buffer)
  150. // put back in the pool
  151. defer func() {
  152. compressed.Reset()
  153. info.bp.Put(b)
  154. }()
  155. zlibw, err := zlib.NewWriterLevel(compressed, 9)
  156. if err != nil {
  157. return buf, err
  158. }
  159. if _, err := zlibw.Write(buf); err != nil {
  160. return buf, err
  161. }
  162. if err := zlibw.Close(); err != nil {
  163. return buf, err
  164. }
  165. return []byte(`"` + compressed.String() + `"`), nil
  166. }
  167. type flushEvent func() error
  168. type chunkedBytesBuffer struct {
  169. buf []byte
  170. flushTrigger flushEvent
  171. }
  172. // flush signals that it's time to write the buffer out to storage
  173. func (c *chunkedBytesBuffer) flush() error {
  174. if len(c.buf) == 0 {
  175. return nil
  176. }
  177. fmt.Print(string(c.buf))
  178. if c.flushTrigger != nil {
  179. if err := c.flushTrigger(); err != nil {
  180. return err
  181. }
  182. }
  183. c.Reset()
  184. return nil
  185. }
  186. // Reset sets the length back to 0, making it re-usable
  187. func (c *chunkedBytesBuffer) Reset() {
  188. c.buf = c.buf[:0] // set the length back to 0
  189. }
  190. // Write takes a p slice of bytes and writes it to the buffer.
  191. // It will never grow the buffer, flushing it as soon as it's full.
  192. func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
  193. remaining := len(p)
  194. bufCap := cap(c.buf)
  195. for {
  196. free := bufCap - len(c.buf)
  197. if free > remaining {
  198. // enough of room in the buffer
  199. c.buf = append(c.buf, p[i:i+remaining]...)
  200. i += remaining
  201. return
  202. } else {
  203. // fill the buffer to the 'brim' with a slice from p
  204. c.buf = append(c.buf, p[i:i+free]...)
  205. remaining -= free
  206. i += free
  207. err = c.flush()
  208. if err != nil {
  209. return i, err
  210. }
  211. if remaining == 0 {
  212. return
  213. }
  214. }
  215. }
  216. }
  217. // capTo caps the internal buffer to specified number of bytes, sets the length back to 0
  218. func (c *chunkedBytesBuffer) capTo(n int) {
  219. if cap(c.buf) == n {
  220. return
  221. }
  222. c.buf = make([]byte, 0, n)
  223. }
  224. // chunkedBytesBufferMime decorates chunkedBytesBuffer, specifying that to do when a flush event is triggered
  225. type chunkedBytesBufferMime struct {
  226. chunkedBytesBuffer
  227. current *mime.Part
  228. info PartsInfo
  229. md5 hash.Hash
  230. database ChunkSaverStorage
  231. }
  232. func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
  233. b := new(chunkedBytesBufferMime)
  234. b.chunkedBytesBuffer.flushTrigger = func() error {
  235. return b.onFlush()
  236. }
  237. b.md5 = md5.New()
  238. b.buf = make([]byte, 0, chunkMaxBytes)
  239. return b
  240. }
  241. func (b *chunkedBytesBufferMime) setDatabase(database ChunkSaverStorage) {
  242. b.database = database
  243. }
  244. // onFlush is called whenever the flush event fires.
  245. // - It saves the chunk to disk and adds the chunk's hash to the list.
  246. // - It builds the b.info.Parts structure
  247. func (b *chunkedBytesBufferMime) onFlush() error {
  248. b.md5.Write(b.buf)
  249. var chash HashKey
  250. copy(chash[:], b.md5.Sum([]byte{}))
  251. if b.current == nil {
  252. return errors.New("b.current part is nil")
  253. }
  254. if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
  255. // existing part, just append the hash
  256. lastPart := &b.info.Parts[size-1]
  257. lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
  258. b.fillInfo(lastPart, size-1)
  259. lastPart.Size += uint(len(b.buf))
  260. } else {
  261. // add it as a new part
  262. part := ChunkedPart{
  263. PartId: b.current.Node,
  264. ChunkHash: []HashKey{chash},
  265. ContentBoundary: b.info.boundary(b.current.ContentBoundary),
  266. Size: uint(len(b.buf)),
  267. }
  268. b.fillInfo(&part, 0)
  269. b.info.Parts = append(b.info.Parts, part)
  270. b.info.Count++
  271. }
  272. if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
  273. return err
  274. }
  275. return nil
  276. }
  277. func (b *chunkedBytesBufferMime) fillInfo(cp *ChunkedPart, index int) {
  278. if cp.ContentType == "" && b.current.ContentType != nil {
  279. cp.ContentType = b.current.ContentType.String()
  280. }
  281. if cp.Charset == "" && b.current.Charset != "" {
  282. cp.Charset = b.current.Charset
  283. }
  284. if cp.TransferEncoding == "" && b.current.TransferEncoding != "" {
  285. cp.TransferEncoding = b.current.TransferEncoding
  286. }
  287. if cp.ContentDisposition == "" && b.current.ContentDisposition != "" {
  288. cp.ContentDisposition = b.current.ContentDisposition
  289. if strings.Contains(cp.ContentDisposition, "attach") {
  290. b.info.HasAttach = true
  291. }
  292. }
  293. if cp.ContentType != "" {
  294. if b.info.TextPart == -1 && strings.Contains(cp.ContentType, "text/plain") {
  295. b.info.TextPart = index
  296. } else if b.info.HTMLPart == -1 && strings.Contains(cp.ContentType, "text/html") {
  297. b.info.HTMLPart = index
  298. }
  299. }
  300. }
  301. // Reset decorates the Reset method of the chunkedBytesBuffer
  302. func (b *chunkedBytesBufferMime) Reset() {
  303. b.md5.Reset()
  304. b.chunkedBytesBuffer.Reset()
  305. }
  306. func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
  307. if b.current == nil {
  308. b.info = *NewPartsInfo()
  309. b.info.Parts = make([]ChunkedPart, 0, 3)
  310. b.info.TextPart = -1
  311. b.info.HTMLPart = -1
  312. }
  313. b.current = cp
  314. }
  315. // ChunkSaverStorage defines an interface to the storage layer (the database)
  316. type ChunkSaverStorage interface {
  317. // OpenMessage is used to begin saving an email. An email id is returned and used to call CloseMessage later
  318. OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error)
  319. // CloseMessage finalizes the writing of an email. Additional data collected while parsing the email is saved
  320. CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error
  321. // AddChunk saves a chunk of bytes to a given hash key
  322. AddChunk(data []byte, hash []byte) error
  323. // GetEmail returns an email that's been saved
  324. GetEmail(mailID uint64) (*ChunkSaverEmail, error)
  325. // GetChunks loads in the specified chunks of bytes from storage
  326. GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error)
  327. // Initialize is called when the backend is started
  328. Initialize(cfg BackendConfig) error
  329. // Shutdown is called when the backend gets shutdown.
  330. Shutdown() (err error)
  331. }
  332. // ChunkSaverEmail represents an email
  333. type ChunkSaverEmail struct {
  334. mailID uint64
  335. createdAt time.Time
  336. size int64
  337. from string // from stores the email address found in the "From" header field
  338. to string // to stores the email address found in the "From" header field
  339. partsInfo PartsInfo
  340. helo string // helo message given by the client when the message was transmitted
  341. subject string // subject stores the value from the first "Subject" header field
  342. deliveryID string
  343. recipient string // recipient is the email address that the server received from the RCPT TO command
  344. ipv4 net.IPAddr // set to a value if client connected via ipv4
  345. ipv6 net.IPAddr // set to a value if client connected via ipv6
  346. returnPath string // returnPath is the email address that the server received from the MAIL FROM command
  347. isTLS bool // isTLS is true when TLS was used to connect
  348. }
  349. type ChunkSaverChunk struct {
  350. modifiedAt time.Time
  351. referenceCount uint // referenceCount counts how many emails reference this chunk
  352. data io.Reader
  353. }
  354. type chunkSaverMemoryEmail struct {
  355. mailID uint64
  356. createdAt time.Time
  357. size int64
  358. from string
  359. to string
  360. partsInfo []byte
  361. helo string
  362. subject string
  363. deliveryID string
  364. recipient string
  365. ipv4 net.IPAddr
  366. ipv6 net.IPAddr
  367. returnPath string
  368. isTLS bool
  369. }
  370. type chunkSaverMemoryChunk struct {
  371. modifiedAt time.Time
  372. referenceCount uint
  373. data []byte
  374. }
  375. type chunkSaverMemory struct {
  376. chunks map[HashKey]*chunkSaverMemoryChunk
  377. emails []*chunkSaverMemoryEmail
  378. nextID uint64
  379. IDOffset uint64
  380. compressLevel int
  381. }
  382. // OpenMessage implements the ChunkSaverStorage interface
  383. func (m *chunkSaverMemory) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
  384. var ip4, ip6 net.IPAddr
  385. if ip := ipAddress.IP.To4(); ip != nil {
  386. ip4 = ipAddress
  387. } else {
  388. ip6 = ipAddress
  389. }
  390. email := chunkSaverMemoryEmail{
  391. mailID: m.nextID,
  392. createdAt: time.Now(),
  393. from: from,
  394. helo: helo,
  395. recipient: recipient,
  396. ipv4: ip4,
  397. ipv6: ip6,
  398. returnPath: returnPath,
  399. isTLS: isTLS,
  400. }
  401. m.emails = append(m.emails, &email)
  402. m.nextID++
  403. return email.mailID, nil
  404. }
  405. // CloseMessage implements the ChunkSaverStorage interface
  406. func (m *chunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
  407. if email := m.emails[mailID-m.IDOffset]; email == nil {
  408. return errors.New("email not found")
  409. } else {
  410. email.size = size
  411. if info, err := partsInfo.MarshalJSONZlib(); err != nil {
  412. return err
  413. } else {
  414. email.partsInfo = info
  415. }
  416. email.subject = subject
  417. email.deliveryID = deliveryID
  418. email.to = to
  419. email.from = from
  420. email.size = size
  421. }
  422. return nil
  423. }
  424. // AddChunk implements the ChunkSaverStorage interface
  425. func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
  426. var key HashKey
  427. if len(hash) != hashByteSize {
  428. return errors.New("invalid hash")
  429. }
  430. key.Pack(hash)
  431. var compressed bytes.Buffer
  432. zlibw, err := zlib.NewWriterLevel(&compressed, m.compressLevel)
  433. if err != nil {
  434. return err
  435. }
  436. if chunk, ok := m.chunks[key]; ok {
  437. // only update the counters and update time
  438. chunk.referenceCount++
  439. chunk.modifiedAt = time.Now()
  440. } else {
  441. if _, err := zlibw.Write(data); err != nil {
  442. return err
  443. }
  444. if err := zlibw.Close(); err != nil {
  445. return err
  446. }
  447. // add a new chunk
  448. newChunk := chunkSaverMemoryChunk{
  449. modifiedAt: time.Now(),
  450. referenceCount: 1,
  451. data: compressed.Bytes(),
  452. }
  453. m.chunks[key] = &newChunk
  454. }
  455. return nil
  456. }
  457. // Initialize implements the ChunkSaverStorage interface
  458. func (m *chunkSaverMemory) Initialize(cfg BackendConfig) error {
  459. m.IDOffset = 1
  460. m.nextID = m.IDOffset
  461. m.emails = make([]*chunkSaverMemoryEmail, 0, 100)
  462. m.chunks = make(map[HashKey]*chunkSaverMemoryChunk, 1000)
  463. m.compressLevel = zlib.NoCompression
  464. return nil
  465. }
  466. // Shutdown implements the ChunkSaverStorage interface
  467. func (m *chunkSaverMemory) Shutdown() (err error) {
  468. m.emails = nil
  469. m.chunks = nil
  470. return nil
  471. }
  472. // GetEmail implements the ChunkSaverStorage interface
  473. func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
  474. if size := uint64(len(m.emails)) - m.IDOffset; size > mailID-m.IDOffset {
  475. return nil, errors.New("mail not found")
  476. }
  477. email := m.emails[mailID-m.IDOffset]
  478. pi := NewPartsInfo()
  479. if err := pi.UnmarshalJSONZlib(email.partsInfo); err != nil {
  480. return nil, err
  481. }
  482. return &ChunkSaverEmail{
  483. mailID: email.mailID,
  484. createdAt: email.createdAt,
  485. size: email.size,
  486. from: email.from,
  487. to: email.to,
  488. partsInfo: *pi,
  489. helo: email.helo,
  490. subject: email.subject,
  491. deliveryID: email.deliveryID,
  492. recipient: email.recipient,
  493. ipv4: email.ipv4,
  494. ipv6: email.ipv6,
  495. returnPath: email.returnPath,
  496. isTLS: email.isTLS,
  497. }, nil
  498. }
  499. // GetChunk implements the ChunkSaverStorage interface
  500. func (m *chunkSaverMemory) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
  501. result := make([]*ChunkSaverChunk, 0, len(hash))
  502. var key HashKey
  503. for i := range hash {
  504. key = hash[i]
  505. if c, ok := m.chunks[key]; ok {
  506. zwr, err := zlib.NewReader(bytes.NewReader(c.data))
  507. if err != nil {
  508. return nil, err
  509. }
  510. result = append(result, &ChunkSaverChunk{
  511. modifiedAt: c.modifiedAt,
  512. referenceCount: c.referenceCount,
  513. data: zwr,
  514. })
  515. }
  516. }
  517. return result, nil
  518. }
  519. type chunkSaverSQLConfig struct {
  520. EmailTable string `json:"chunksaver_email_table,omitempty"`
  521. ChunkTable string `json:"chunksaver_chunk_table,omitempty"`
  522. Driver string `json:"chunksaver_sql_driver,omitempty"`
  523. DSN string `json:"chunksaver_sql_dsn,omitempty"`
  524. PrimaryHost string `json:"chunksaver_primary_mail_host,omitempty"`
  525. }
  526. // chunkSaverSQL implements the ChunkSaverStorage interface
  527. type chunkSaverSQL struct {
  528. config *chunkSaverSQLConfig
  529. statements map[string]*sql.Stmt
  530. db *sql.DB
  531. }
  532. func (c *chunkSaverSQL) connect() (*sql.DB, error) {
  533. var err error
  534. if c.db, err = sql.Open(c.config.Driver, c.config.DSN); err != nil {
  535. Log().Error("cannot open database: ", err)
  536. return nil, err
  537. }
  538. // do we have permission to access the table?
  539. _, err = c.db.Query("SELECT mail_id FROM " + c.config.EmailTable + " LIMIT 1")
  540. if err != nil {
  541. return nil, err
  542. }
  543. return c.db, err
  544. }
  545. func (c *chunkSaverSQL) prepareSql() error {
  546. if c.statements == nil {
  547. c.statements = make(map[string]*sql.Stmt)
  548. }
  549. if stmt, err := c.db.Prepare(`INSERT INTO ` +
  550. c.config.EmailTable +
  551. ` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, is_tls)
  552. VALUES(?, ?, ?, ?, ?, ?, ?)`); err != nil {
  553. return err
  554. } else {
  555. c.statements["insertEmail"] = stmt
  556. }
  557. // begin inserting an email (before saving chunks)
  558. if stmt, err := c.db.Prepare(`INSERT INTO ` +
  559. c.config.ChunkTable +
  560. ` (data, hash)
  561. VALUES(?, ?)`); err != nil {
  562. return err
  563. } else {
  564. c.statements["insertChunk"] = stmt
  565. }
  566. // finalize the email (the connection closed)
  567. if stmt, err := c.db.Prepare(`
  568. UPDATE ` + c.config.EmailTable + `
  569. SET size=?, parts_info = ?, subject, delivery_id = ?, to = ?
  570. WHERE mail_id = ? `); err != nil {
  571. return err
  572. } else {
  573. c.statements["finalizeEmail"] = stmt
  574. }
  575. // Check the existence of a chunk (the reference_count col is incremented if it exists)
  576. // This means we can avoid re-inserting an existing chunk, only update its reference_count
  577. if stmt, err := c.db.Prepare(`
  578. UPDATE ` + c.config.ChunkTable + `
  579. SET reference_count=reference_count+1
  580. WHERE hash = ? `); err != nil {
  581. return err
  582. } else {
  583. c.statements["chunkReferenceIncr"] = stmt
  584. }
  585. // If the reference_count is 0 then it means the chunk has been deleted
  586. // Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
  587. if stmt, err := c.db.Prepare(`
  588. UPDATE ` + c.config.ChunkTable + `
  589. SET reference_count=reference_count-1
  590. WHERE hash = ? AND reference_count > 0`); err != nil {
  591. return err
  592. } else {
  593. c.statements["chunkReferenceDecr"] = stmt
  594. }
  595. // fetch an email
  596. if stmt, err := c.db.Prepare(`
  597. SELECT *
  598. from ` + c.config.EmailTable + `
  599. where mail_id=?`); err != nil {
  600. return err
  601. } else {
  602. c.statements["selectMail"] = stmt
  603. }
  604. // fetch a chunk
  605. if stmt, err := c.db.Prepare(`
  606. SELECT *
  607. from ` + c.config.ChunkTable + `
  608. where hash=?`); err != nil {
  609. return err
  610. } else {
  611. c.statements["selectChunk"] = stmt
  612. }
  613. // TODO sweep old chunks
  614. // TODO sweep incomplete emails
  615. return nil
  616. }
  617. // OpenMessage implements the ChunkSaverStorage interface
  618. func (c *chunkSaverSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
  619. // if it's ipv4 then we want ipv6 to be 0, and vice-versa
  620. var ip4 uint32
  621. ip6 := make([]byte, 16)
  622. if ip := ipAddress.IP.To4(); ip != nil {
  623. ip4 = binary.BigEndian.Uint32(ip)
  624. } else {
  625. _ = copy(ip6, []byte(ipAddress.IP))
  626. }
  627. r, err := c.statements["insertEmail"].Exec(from, helo, recipient, ip4, ip6, returnPath, isTLS)
  628. if err != nil {
  629. return 0, err
  630. }
  631. id, err := r.LastInsertId()
  632. if err != nil {
  633. return 0, err
  634. }
  635. return uint64(id), err
  636. }
  637. // AddChunk implements the ChunkSaverStorage interface
  638. func (c *chunkSaverSQL) AddChunk(data []byte, hash []byte) error {
  639. // attempt to increment the reference_count (it means the chunk is already in there)
  640. r, err := c.statements["chunkReferenceIncr"].Exec(hash)
  641. if err != nil {
  642. return err
  643. }
  644. affected, err := r.RowsAffected()
  645. if err != nil {
  646. return err
  647. }
  648. if affected == 0 {
  649. // chunk isn't in there, let's insert it
  650. _, err := c.statements["insertChunk"].Exec(data, hash)
  651. if err != nil {
  652. return err
  653. }
  654. }
  655. return nil
  656. }
  657. // CloseMessage implements the ChunkSaverStorage interface
  658. func (c *chunkSaverSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
  659. partsInfoJson, err := json.Marshal(partsInfo)
  660. if err != nil {
  661. return err
  662. }
  663. _, err = c.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, deliveryID, to, mailID)
  664. if err != nil {
  665. return err
  666. }
  667. return nil
  668. }
  669. // Initialize loads the specific database config, connects to the db, prepares statements
  670. func (c *chunkSaverSQL) Initialize(cfg BackendConfig) error {
  671. configType := BaseConfig(&chunkSaverSQLConfig{})
  672. bcfg, err := Svc.ExtractConfig(cfg, configType)
  673. if err != nil {
  674. return err
  675. }
  676. c.config = bcfg.(*chunkSaverSQLConfig)
  677. c.db, err = c.connect()
  678. if err != nil {
  679. return err
  680. }
  681. err = c.prepareSql()
  682. if err != nil {
  683. return err
  684. }
  685. return nil
  686. }
  687. // Shutdown implements the ChunkSaverStorage interface
  688. func (c *chunkSaverSQL) Shutdown() (err error) {
  689. defer func() {
  690. closeErr := c.db.Close()
  691. if closeErr != err {
  692. Log().WithError(err).Error("failed to close sql database")
  693. err = closeErr
  694. }
  695. }()
  696. for i := range c.statements {
  697. if err = c.statements[i].Close(); err != nil {
  698. Log().WithError(err).Error("failed to close sql statement")
  699. }
  700. }
  701. return err
  702. }
  703. // GetEmail implements the ChunkSaverStorage interface
  704. func (c *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
  705. return &ChunkSaverEmail{}, nil
  706. }
  707. // GetChunk implements the ChunkSaverStorage interface
  708. func (c *chunkSaverSQL) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
  709. result := make([]*ChunkSaverChunk, 0, len(hash))
  710. return result, nil
  711. }
  712. type chunkMailReader struct {
  713. db ChunkSaverStorage
  714. email *ChunkSaverEmail
  715. // part requests a part. If 0, all the parts are read sequentially
  716. part int
  717. i, j int
  718. cache cachedChunks
  719. }
  720. // NewChunkMailReader loads the email and selects which mime-part Read will read, starting from 1
  721. // if part is 0, Read will read in the entire message. 1 selects the first part, 2 2nd, and so on..
  722. func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int) (*chunkMailReader, error) {
  723. r := new(chunkMailReader)
  724. r.db = db
  725. r.part = part
  726. if email == nil {
  727. return nil, errors.New("nil email")
  728. } else {
  729. r.email = email
  730. }
  731. if err := r.SeekPart(part); err != nil {
  732. return nil, err
  733. }
  734. r.cache = cachedChunks{
  735. db: db,
  736. }
  737. return r, nil
  738. }
  739. // SeekPart resets the reader. The part argument chooses which part Read will read in
  740. // If part is 0, Read will return the entire message
  741. func (r *chunkMailReader) SeekPart(part int) error {
  742. if parts := len(r.email.partsInfo.Parts); parts == 0 {
  743. return errors.New("email has mime parts missing")
  744. } else if part > parts {
  745. return errors.New("no such part available")
  746. }
  747. r.i = part
  748. r.j = 0
  749. return nil
  750. }
  751. type cachedChunks struct {
  752. chunks []*ChunkSaverChunk
  753. hashIndex map[int]HashKey
  754. db ChunkSaverStorage
  755. }
  756. const chunkCachePreload = 2
  757. // warm allocates the chunk cache, and gets the first few and stores them in the cache
  758. func (c *cachedChunks) warm(hashes ...HashKey) (int, error) {
  759. if c.hashIndex == nil {
  760. c.hashIndex = make(map[int]HashKey, len(hashes))
  761. }
  762. if c.chunks == nil {
  763. c.chunks = make([]*ChunkSaverChunk, 0, 100)
  764. }
  765. if len(c.chunks) > 0 {
  766. // already been filled
  767. return len(c.chunks), nil
  768. }
  769. // let's pre-load some hashes.
  770. preload := chunkCachePreload
  771. if len(hashes) < preload {
  772. preload = len(hashes)
  773. }
  774. if chunks, err := c.db.GetChunks(hashes[0:preload]...); err != nil {
  775. return 0, err
  776. } else {
  777. for i := range hashes {
  778. c.hashIndex[i] = hashes[i]
  779. if i < preload {
  780. c.chunks = append(c.chunks, chunks[i])
  781. } else {
  782. // don't pre-load
  783. c.chunks = append(c.chunks, nil) // nil will be a placeholder for our chunk
  784. }
  785. }
  786. }
  787. return len(c.chunks), nil
  788. }
  789. // get returns a chunk. If the chunk doesn't exist, it gets it and pre-loads the next few
  790. // also removes the previous chunks that now have become stale
  791. func (c *cachedChunks) get(i int) (*ChunkSaverChunk, error) {
  792. if i > len(c.chunks) {
  793. return nil, errors.New("not enough chunks")
  794. }
  795. if c.chunks[i] != nil {
  796. // cache hit!
  797. return c.chunks[i], nil
  798. } else {
  799. var toGet []HashKey
  800. if key, ok := c.hashIndex[i]; ok {
  801. toGet = append(toGet, key)
  802. } else {
  803. return nil, errors.New(fmt.Sprintf("hash for key [%s] not found", key))
  804. }
  805. // make a list of chunks to load (extra ones to be pre-loaded)
  806. for to := i + 1; to < len(c.chunks) || to > chunkCachePreload+i; to++ {
  807. if key, ok := c.hashIndex[to]; ok {
  808. toGet = append(toGet, key)
  809. }
  810. }
  811. if chunks, err := c.db.GetChunks(toGet...); err != nil {
  812. return nil, err
  813. } else {
  814. // cache the pre-loaded chunks
  815. for j := i; j < len(c.chunks); j++ {
  816. c.chunks[j] = chunks[j-i]
  817. c.hashIndex[j] = toGet[j-i]
  818. }
  819. // remove any old ones (walk back)
  820. for j := i; j > -1; j-- {
  821. if c.chunks[j] != nil {
  822. c.chunks[j] = nil
  823. } else {
  824. break
  825. }
  826. }
  827. // return the chunk asked for
  828. return chunks[0], nil
  829. }
  830. }
  831. }
  832. // purgeChunks remove any chunks before i
  833. func (c *cachedChunks) purgeChunks(i int) {
  834. }
  835. func (c *cachedChunks) empty() {
  836. for i := range c.chunks {
  837. c.chunks[i] = nil
  838. }
  839. c.chunks = c.chunks[:] // set len to 0
  840. for key := range c.hashIndex {
  841. delete(c.hashIndex, key)
  842. }
  843. }
  844. // Read implements the io.Reader interface
  845. func (r *chunkMailReader) Read(p []byte) (n int, err error) {
  846. var length int
  847. for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
  848. length, err = r.cache.warm(r.email.partsInfo.Parts[r.i].ChunkHash...)
  849. if err != nil {
  850. return
  851. }
  852. var nRead int
  853. for r.j < length {
  854. chunk, err := r.cache.get(r.j)
  855. if err != nil {
  856. return nRead, err
  857. }
  858. nRead, err = chunk.data.Read(p)
  859. if err == io.EOF {
  860. r.j++ // advance to the next chunk
  861. err = nil
  862. }
  863. if r.j == length { // last chunk in a part?
  864. r.j = 0 // reset chunk index
  865. r.i++ // advance to the next part
  866. if r.i == len(r.email.partsInfo.Parts) || r.part > 0 {
  867. // there are no more parts to return
  868. err = io.EOF
  869. r.cache.empty()
  870. }
  871. }
  872. // unless there's an error, the next time this function will be
  873. // called, it will read the next chunk
  874. return nRead, err
  875. }
  876. }
  877. err = io.EOF
  878. return n, err
  879. }
  880. // chunkPartDecoder decodes base64 and q-printable, then converting charset to utf8-8
  881. type chunkPartDecoder struct {
  882. *chunkMailReader
  883. buf []byte
  884. state int
  885. }
  886. func NewChunkPartDecoder(db ChunkSaverStorage, email *ChunkSaverEmail, part int) (*chunkPartDecoder, error) {
  887. r, err := NewChunkMailReader(db, email, part)
  888. if err != nil {
  889. return nil, err
  890. }
  891. decoder := new(chunkPartDecoder)
  892. decoder.chunkMailReader = r
  893. return decoder, nil
  894. }
  895. const chunkSaverNL = '\n'
  896. const (
  897. decoderStateFindHeader int = iota
  898. decoderStateMatchNL
  899. decoderStateDecode
  900. )
  901. func (r *chunkPartDecoder) Read(p []byte) (n int, err error) {
  902. var part *ChunkedPart
  903. //if cap(p) != cap(r.buf) {
  904. r.buf = make([]byte, len(p), cap(p))
  905. var start, buffered int
  906. part = &r.email.partsInfo.Parts[r.part]
  907. _ = part
  908. buffered, err = r.chunkMailReader.Read(r.buf)
  909. if buffered == 0 {
  910. return
  911. }
  912. for {
  913. switch r.state {
  914. case decoderStateFindHeader:
  915. // finding the start of the header
  916. if start = bytes.Index(r.buf, []byte{chunkSaverNL, chunkSaverNL}); start != -1 {
  917. start += 2 // skip the \n\n
  918. r.state = decoderStateDecode // found the header
  919. continue // continue scanning
  920. } else if r.buf[len(r.buf)-1] == chunkSaverNL {
  921. // the last char is a \n so next call to Read will check if it starts with a matching \n
  922. r.state = decoderStateMatchNL
  923. }
  924. case decoderStateMatchNL:
  925. if r.buf[0] == '\n' {
  926. // found the header
  927. start = 1
  928. r.state = decoderStateDecode
  929. continue
  930. } else {
  931. r.state = decoderStateFindHeader
  932. continue
  933. }
  934. case decoderStateDecode:
  935. if start < len(r.buf) {
  936. // todo decode here (q-printable, base64, charset)
  937. n += copy(p[:], r.buf[start:buffered])
  938. }
  939. return
  940. }
  941. buffered, err = r.chunkMailReader.Read(r.buf)
  942. if buffered == 0 {
  943. return
  944. }
  945. }
  946. }
  947. const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
  948. /**
  949. *
  950. * A chunk ends ether:
  951. * after xKB or after end of a part, or end of header
  952. *
  953. * - buffer first chunk
  954. * - if didn't receive first chunk for more than x bytes, save normally
  955. *
  956. */
  957. func Chunksaver() *StreamDecorator {
  958. sd := &StreamDecorator{}
  959. sd.Decorate =
  960. func(sp StreamProcessor, a ...interface{}) StreamProcessor {
  961. var (
  962. envelope *mail.Envelope
  963. chunkBuffer *chunkedBytesBufferMime
  964. msgPos uint
  965. database ChunkSaverStorage
  966. written int64
  967. // just some headers from the first mime-part
  968. subject string
  969. to string
  970. from string
  971. progress int // tracks which mime parts were processed
  972. )
  973. var config *chunkSaverConfig
  974. // optional dependency injection
  975. for i := range a {
  976. if db, ok := a[i].(ChunkSaverStorage); ok {
  977. database = db
  978. }
  979. if buff, ok := a[i].(*chunkedBytesBufferMime); ok {
  980. chunkBuffer = buff
  981. }
  982. }
  983. Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
  984. configType := BaseConfig(&chunkSaverConfig{})
  985. bcfg, err := Svc.ExtractConfig(backendConfig, configType)
  986. if err != nil {
  987. return err
  988. }
  989. config = bcfg.(*chunkSaverConfig)
  990. if chunkBuffer == nil {
  991. chunkBuffer = newChunkedBytesBufferMime()
  992. }
  993. // configure storage if none was injected
  994. if database == nil {
  995. if config.StorageEngine == "memory" {
  996. db := new(chunkSaverMemory)
  997. db.compressLevel = config.CompressLevel
  998. database = db
  999. } else {
  1000. db := new(chunkSaverSQL)
  1001. database = db
  1002. }
  1003. }
  1004. err = database.Initialize(backendConfig)
  1005. if err != nil {
  1006. return err
  1007. }
  1008. // configure the chunks buffer
  1009. if config.ChunkMaxBytes > 0 {
  1010. chunkBuffer.capTo(config.ChunkMaxBytes)
  1011. } else {
  1012. chunkBuffer.capTo(chunkMaxBytes)
  1013. }
  1014. chunkBuffer.setDatabase(database)
  1015. return nil
  1016. }))
  1017. Svc.AddShutdowner(ShutdownWith(func() error {
  1018. err := database.Shutdown()
  1019. return err
  1020. }))
  1021. sd.Open = func(e *mail.Envelope) error {
  1022. // create a new entry & grab the id
  1023. written = 0
  1024. progress = 0
  1025. var ip net.IPAddr
  1026. if ret := net.ParseIP(e.RemoteIP); ret != nil {
  1027. ip = net.IPAddr{IP: ret}
  1028. }
  1029. mid, err := database.OpenMessage(
  1030. e.MailFrom.String(),
  1031. e.Helo,
  1032. e.RcptTo[0].String(),
  1033. ip,
  1034. e.MailFrom.String(),
  1035. e.TLS)
  1036. if err != nil {
  1037. return err
  1038. }
  1039. e.Values["messageID"] = mid
  1040. envelope = e
  1041. return nil
  1042. }
  1043. sd.Close = func() (err error) {
  1044. err = chunkBuffer.flush()
  1045. if err != nil {
  1046. // TODO we could delete the half saved message here
  1047. return err
  1048. }
  1049. defer chunkBuffer.Reset()
  1050. if mid, ok := envelope.Values["messageID"].(uint64); ok {
  1051. err = database.CloseMessage(
  1052. mid,
  1053. written,
  1054. &chunkBuffer.info,
  1055. subject,
  1056. envelope.QueuedId,
  1057. to,
  1058. from,
  1059. )
  1060. if err != nil {
  1061. return err
  1062. }
  1063. }
  1064. return nil
  1065. }
  1066. fillVars := func(parts *[]*mime.Part, subject, to, from string) (string, string, string) {
  1067. if len(*parts) > 0 {
  1068. if subject == "" {
  1069. if val, ok := (*parts)[0].Headers["Subject"]; ok {
  1070. subject = val[0]
  1071. }
  1072. }
  1073. if to == "" {
  1074. if val, ok := (*parts)[0].Headers["To"]; ok {
  1075. addr, err := mail.NewAddress(val[0])
  1076. if err == nil {
  1077. to = addr.String()
  1078. }
  1079. }
  1080. }
  1081. if from == "" {
  1082. if val, ok := (*parts)[0].Headers["From"]; ok {
  1083. addr, err := mail.NewAddress(val[0])
  1084. if err == nil {
  1085. from = addr.String()
  1086. }
  1087. }
  1088. }
  1089. }
  1090. return subject, to, from
  1091. }
  1092. return StreamProcessWith(func(p []byte) (count int, err error) {
  1093. if envelope.Values == nil {
  1094. return count, errors.New("no message headers found")
  1095. }
  1096. if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok && len(*parts) > 0 {
  1097. var pos int
  1098. subject, to, from = fillVars(parts, subject, to, from)
  1099. offset := msgPos
  1100. chunkBuffer.currentPart((*parts)[0])
  1101. for i := progress; i < len(*parts); i++ {
  1102. part := (*parts)[i]
  1103. // break chunk on new part
  1104. if part.StartingPos > 0 && part.StartingPos > msgPos {
  1105. count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
  1106. written += int64(count)
  1107. err = chunkBuffer.flush()
  1108. if err != nil {
  1109. return count, err
  1110. }
  1111. chunkBuffer.currentPart(part)
  1112. fmt.Println("->N")
  1113. pos += count
  1114. msgPos = part.StartingPos
  1115. }
  1116. // break chunk on header
  1117. if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
  1118. count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
  1119. written += int64(count)
  1120. err = chunkBuffer.flush()
  1121. if err != nil {
  1122. return count, err
  1123. }
  1124. chunkBuffer.currentPart(part)
  1125. fmt.Println("->H")
  1126. pos += count
  1127. msgPos = part.StartingPosBody
  1128. }
  1129. // if on the latest (last) part, and yet there is still data to be written out
  1130. if len(*parts)-1 == i && len(p)-1 > pos {
  1131. count, _ = chunkBuffer.Write(p[pos:])
  1132. written += int64(count)
  1133. pos += count
  1134. msgPos += uint(count)
  1135. }
  1136. // if there's no more data
  1137. if pos >= len(p) {
  1138. break
  1139. }
  1140. }
  1141. if len(*parts) > 2 {
  1142. progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
  1143. }
  1144. }
  1145. return sp.Write(p)
  1146. })
  1147. }
  1148. return sd
  1149. }