buffer.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package chunk
  2. import (
  3. "crypto/md5"
  4. "errors"
  5. "hash"
  6. "strings"
  7. "github.com/flashmob/go-guerrilla/mail/mime"
  8. )
  9. type flushEvent func() error
  10. type chunkingBuffer struct {
  11. buf []byte
  12. flushTrigger flushEvent
  13. }
  14. // Flush signals that it's time to write the buffer out to storage
  15. func (c *chunkingBuffer) Flush() error {
  16. if len(c.buf) == 0 {
  17. return nil
  18. }
  19. if c.flushTrigger != nil {
  20. if err := c.flushTrigger(); err != nil {
  21. return err
  22. }
  23. }
  24. c.Reset()
  25. return nil
  26. }
  27. // Reset sets the length back to 0, making it re-usable
  28. func (c *chunkingBuffer) Reset() {
  29. c.buf = c.buf[:0] // set the length back to 0
  30. }
  31. // Write takes a p slice of bytes and writes it to the buffer.
  32. // It will never grow the buffer, flushing it as soon as it's full.
  33. func (c *chunkingBuffer) Write(p []byte) (i int, err error) {
  34. remaining := len(p) // number of bytes remaining to write
  35. bufCap := cap(c.buf)
  36. for {
  37. free := bufCap - len(c.buf)
  38. if free > remaining {
  39. // enough of room in the buffer
  40. c.buf = append(c.buf, p[i:i+remaining]...)
  41. i += remaining
  42. return
  43. } else {
  44. // fill the buffer to the 'brim' with a slice from p
  45. c.buf = append(c.buf, p[i:i+free]...)
  46. remaining -= free
  47. i += free
  48. err = c.Flush()
  49. if err != nil {
  50. return i, err
  51. }
  52. if remaining == 0 {
  53. return
  54. }
  55. }
  56. }
  57. }
  58. // CapTo caps the internal buffer to specified number of bytes, sets the length back to 0
  59. func (c *chunkingBuffer) CapTo(n int) {
  60. if cap(c.buf) == n {
  61. return
  62. }
  63. c.buf = make([]byte, 0, n)
  64. }
  65. // ChunkingBufferMime decorates chunkingBuffer, specifying that to do when a flush event is triggered
  66. type ChunkingBufferMime struct {
  67. chunkingBuffer
  68. current *mime.Part
  69. Info PartsInfo
  70. md5 hash.Hash
  71. database Storage
  72. }
  73. func NewChunkedBytesBufferMime() *ChunkingBufferMime {
  74. b := new(ChunkingBufferMime)
  75. b.chunkingBuffer.flushTrigger = func() error {
  76. return b.onFlush()
  77. }
  78. b.md5 = md5.New()
  79. b.buf = make([]byte, 0, chunkMaxBytes)
  80. return b
  81. }
  82. func (b *ChunkingBufferMime) SetDatabase(database Storage) {
  83. b.database = database
  84. }
  85. // onFlush is called whenever the flush event fires.
  86. // - It saves the chunk to disk and adds the chunk's hash to the list.
  87. // - It builds the b.Info.Parts structure
  88. func (b *ChunkingBufferMime) onFlush() error {
  89. b.md5.Write(b.buf)
  90. var chash HashKey
  91. copy(chash[:], b.md5.Sum([]byte{}))
  92. if b.current == nil {
  93. return errors.New("b.current part is nil")
  94. }
  95. if size := len(b.Info.Parts); size > 0 && b.Info.Parts[size-1].PartId == b.current.Node {
  96. // existing part, just append the hash
  97. lastPart := &b.Info.Parts[size-1]
  98. lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
  99. b.fillInfo(lastPart, size-1)
  100. lastPart.Size += uint(len(b.buf))
  101. } else {
  102. // add it as a new part
  103. part := ChunkedPart{
  104. PartId: b.current.Node,
  105. ChunkHash: []HashKey{chash},
  106. ContentBoundary: b.Info.boundary(b.current.ContentBoundary),
  107. Size: uint(len(b.buf)),
  108. }
  109. b.fillInfo(&part, 0)
  110. b.Info.Parts = append(b.Info.Parts, part)
  111. b.Info.Count++
  112. }
  113. if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
  114. return err
  115. }
  116. return nil
  117. }
  118. func (b *ChunkingBufferMime) fillInfo(cp *ChunkedPart, index int) {
  119. if cp.ContentType == "" && b.current.ContentType != nil {
  120. cp.ContentType = b.current.ContentType.String()
  121. }
  122. if cp.Charset == "" && b.current.Charset != "" {
  123. cp.Charset = b.current.Charset
  124. }
  125. if cp.TransferEncoding == "" && b.current.TransferEncoding != "" {
  126. cp.TransferEncoding = b.current.TransferEncoding
  127. }
  128. if cp.ContentDisposition == "" && b.current.ContentDisposition != "" {
  129. cp.ContentDisposition = b.current.ContentDisposition
  130. if strings.Contains(cp.ContentDisposition, "attach") {
  131. b.Info.HasAttach = true
  132. }
  133. }
  134. if cp.ContentType != "" {
  135. if b.Info.TextPart == -1 && strings.Contains(cp.ContentType, "text/plain") {
  136. b.Info.TextPart = index
  137. } else if b.Info.HTMLPart == -1 && strings.Contains(cp.ContentType, "text/html") {
  138. b.Info.HTMLPart = index
  139. }
  140. }
  141. }
  142. // Reset decorates the Reset method of the chunkingBuffer
  143. func (b *ChunkingBufferMime) Reset() {
  144. b.md5.Reset()
  145. b.chunkingBuffer.Reset()
  146. }
  147. func (b *ChunkingBufferMime) CurrentPart(cp *mime.Part) {
  148. if b.current == nil {
  149. b.Info = *NewPartsInfo()
  150. b.Info.Parts = make([]ChunkedPart, 0, 3)
  151. b.Info.TextPart = -1
  152. b.Info.HTMLPart = -1
  153. }
  154. b.current = cp
  155. }