buffer.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package chunk
  2. import (
  3. "crypto/md5"
  4. "errors"
  5. "hash"
  6. "strings"
  7. "github.com/flashmob/go-guerrilla/mail/mimeparse"
  8. )
  9. type flushEvent func() error
  10. type chunkingBuffer struct {
  11. buf []byte
  12. flushTrigger flushEvent
  13. }
  14. // Flush signals that it's time to write the buffer out to storage
  15. func (c *chunkingBuffer) Flush() error {
  16. if len(c.buf) == 0 {
  17. return nil
  18. }
  19. if c.flushTrigger != nil {
  20. if err := c.flushTrigger(); err != nil {
  21. return err
  22. }
  23. }
  24. c.Reset()
  25. return nil
  26. }
  27. // Reset sets the length back to 0, making it re-usable
  28. func (c *chunkingBuffer) Reset() {
  29. c.buf = c.buf[:0] // set the length back to 0
  30. }
  31. // Write takes a p slice of bytes and writes it to the buffer.
  32. // It will never grow the buffer, flushing it as soon as it's full.
  33. func (c *chunkingBuffer) Write(p []byte) (i int, err error) {
  34. remaining := len(p) // number of bytes remaining to write
  35. bufCap := cap(c.buf)
  36. for {
  37. free := bufCap - len(c.buf)
  38. if free > remaining {
  39. // enough of room in the buffer
  40. c.buf = append(c.buf, p[i:i+remaining]...)
  41. i += remaining
  42. return
  43. } else {
  44. // fill the buffer to the 'brim' with a slice from p
  45. c.buf = append(c.buf, p[i:i+free]...)
  46. remaining -= free
  47. i += free
  48. err = c.Flush()
  49. if err != nil {
  50. return i, err
  51. }
  52. if remaining == 0 {
  53. return
  54. }
  55. }
  56. }
  57. }
  58. // CapTo caps the internal buffer to specified number of bytes, sets the length back to 0
  59. func (c *chunkingBuffer) CapTo(n int) {
  60. if cap(c.buf) == n {
  61. return
  62. }
  63. c.buf = make([]byte, 0, n)
  64. }
  65. // ChunkingBufferMime decorates chunkingBuffer, defining what to do when a flush event is triggered
  66. // in other words,
  67. type ChunkingBufferMime struct {
  68. chunkingBuffer
  69. current *mimeparse.Part
  70. Info PartsInfo
  71. md5 hash.Hash
  72. database Storage
  73. }
  74. func NewChunkedBytesBufferMime() *ChunkingBufferMime {
  75. b := new(ChunkingBufferMime)
  76. b.chunkingBuffer.flushTrigger = func() error {
  77. return b.onFlush()
  78. }
  79. b.md5 = md5.New()
  80. b.buf = make([]byte, 0, chunkMaxBytes)
  81. return b
  82. }
  83. func (b *ChunkingBufferMime) SetDatabase(database Storage) {
  84. b.database = database
  85. }
  86. // onFlush is called whenever the flush event fires.
  87. // - It saves the chunk to disk and adds the chunk's hash to the list.
  88. // - It builds the b.Info.Parts structure
  89. func (b *ChunkingBufferMime) onFlush() error {
  90. b.md5.Write(b.buf)
  91. var chash HashKey
  92. copy(chash[:], b.md5.Sum([]byte{}))
  93. if b.current == nil {
  94. return errors.New("b.current part is nil")
  95. }
  96. if size := len(b.Info.Parts); size > 0 && b.Info.Parts[size-1].PartId == b.current.Node {
  97. // existing part, just append the hash
  98. lastPart := &b.Info.Parts[size-1]
  99. lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
  100. b.fillInfo(lastPart, size-1)
  101. lastPart.Size += uint(len(b.buf))
  102. } else {
  103. // add it as a new part
  104. part := ChunkedPart{
  105. PartId: b.current.Node,
  106. ChunkHash: []HashKey{chash},
  107. ContentBoundary: b.Info.boundary(b.current.ContentBoundary),
  108. Size: uint(len(b.buf)),
  109. }
  110. b.fillInfo(&part, 0)
  111. b.Info.Parts = append(b.Info.Parts, part)
  112. b.Info.Count++
  113. }
  114. if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
  115. return err
  116. }
  117. return nil
  118. }
  119. func (b *ChunkingBufferMime) fillInfo(cp *ChunkedPart, index int) {
  120. if cp.ContentType == "" && b.current.ContentType != nil {
  121. cp.ContentType = b.current.ContentType.String()
  122. }
  123. if cp.Charset == "" && b.current.Charset != "" {
  124. cp.Charset = b.current.Charset
  125. }
  126. if cp.TransferEncoding == "" && b.current.TransferEncoding != "" {
  127. cp.TransferEncoding = b.current.TransferEncoding
  128. }
  129. if cp.ContentDisposition == "" && b.current.ContentDisposition != "" {
  130. cp.ContentDisposition = b.current.ContentDisposition
  131. if strings.Contains(cp.ContentDisposition, "attach") {
  132. b.Info.HasAttach = true
  133. }
  134. }
  135. if cp.ContentType != "" {
  136. if b.Info.TextPart == -1 && strings.Contains(cp.ContentType, "text/plain") {
  137. b.Info.TextPart = index
  138. } else if b.Info.HTMLPart == -1 && strings.Contains(cp.ContentType, "text/html") {
  139. b.Info.HTMLPart = index
  140. }
  141. }
  142. }
  143. // Reset decorates the Reset method of the chunkingBuffer
  144. func (b *ChunkingBufferMime) Reset() {
  145. b.md5.Reset()
  146. b.chunkingBuffer.Reset()
  147. }
  148. // CurrentPart sets the current mime part that's being buffered
  149. func (b *ChunkingBufferMime) CurrentPart(cp *mimeparse.Part) {
  150. if b.current == nil {
  151. b.Info = *NewPartsInfo()
  152. b.Info.Parts = make([]ChunkedPart, 0, 3)
  153. b.Info.TextPart = -1
  154. b.Info.HTMLPart = -1
  155. }
  156. b.current = cp
  157. }