buffer.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package chunk
  2. import (
  3. "crypto/md5"
  4. "errors"
  5. "fmt"
  6. "hash"
  7. "strings"
  8. "github.com/flashmob/go-guerrilla/mail/mime"
  9. )
  10. type flushEvent func() error
  11. type chunkedBytesBuffer struct {
  12. buf []byte
  13. flushTrigger flushEvent
  14. }
  15. // Flush signals that it's time to write the buffer out to storage
  16. func (c *chunkedBytesBuffer) Flush() error {
  17. if len(c.buf) == 0 {
  18. return nil
  19. }
  20. fmt.Print(string(c.buf))
  21. if c.flushTrigger != nil {
  22. if err := c.flushTrigger(); err != nil {
  23. return err
  24. }
  25. }
  26. c.Reset()
  27. return nil
  28. }
  29. // Reset sets the length back to 0, making it re-usable
  30. func (c *chunkedBytesBuffer) Reset() {
  31. c.buf = c.buf[:0] // set the length back to 0
  32. }
  33. // Write takes a p slice of bytes and writes it to the buffer.
  34. // It will never grow the buffer, flushing it as soon as it's full.
  35. func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
  36. remaining := len(p)
  37. bufCap := cap(c.buf)
  38. for {
  39. free := bufCap - len(c.buf)
  40. if free > remaining {
  41. // enough of room in the buffer
  42. c.buf = append(c.buf, p[i:i+remaining]...)
  43. i += remaining
  44. return
  45. } else {
  46. // fill the buffer to the 'brim' with a slice from p
  47. c.buf = append(c.buf, p[i:i+free]...)
  48. remaining -= free
  49. i += free
  50. err = c.Flush()
  51. if err != nil {
  52. return i, err
  53. }
  54. if remaining == 0 {
  55. return
  56. }
  57. }
  58. }
  59. }
  60. // CapTo caps the internal buffer to specified number of bytes, sets the length back to 0
  61. func (c *chunkedBytesBuffer) CapTo(n int) {
  62. if cap(c.buf) == n {
  63. return
  64. }
  65. c.buf = make([]byte, 0, n)
  66. }
  67. // ChunkedBytesBufferMime decorates chunkedBytesBuffer, specifying that to do when a flush event is triggered
  68. type ChunkedBytesBufferMime struct {
  69. chunkedBytesBuffer
  70. current *mime.Part
  71. Info PartsInfo
  72. md5 hash.Hash
  73. database ChunkSaverStorage
  74. }
  75. func NewChunkedBytesBufferMime() *ChunkedBytesBufferMime {
  76. b := new(ChunkedBytesBufferMime)
  77. b.chunkedBytesBuffer.flushTrigger = func() error {
  78. return b.onFlush()
  79. }
  80. b.md5 = md5.New()
  81. b.buf = make([]byte, 0, chunkMaxBytes)
  82. return b
  83. }
  84. func (b *ChunkedBytesBufferMime) SetDatabase(database ChunkSaverStorage) {
  85. b.database = database
  86. }
  87. // onFlush is called whenever the flush event fires.
  88. // - It saves the chunk to disk and adds the chunk's hash to the list.
  89. // - It builds the b.Info.Parts structure
  90. func (b *ChunkedBytesBufferMime) onFlush() error {
  91. b.md5.Write(b.buf)
  92. var chash HashKey
  93. copy(chash[:], b.md5.Sum([]byte{}))
  94. if b.current == nil {
  95. return errors.New("b.current part is nil")
  96. }
  97. if size := len(b.Info.Parts); size > 0 && b.Info.Parts[size-1].PartId == b.current.Node {
  98. // existing part, just append the hash
  99. lastPart := &b.Info.Parts[size-1]
  100. lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
  101. b.fillInfo(lastPart, size-1)
  102. lastPart.Size += uint(len(b.buf))
  103. } else {
  104. // add it as a new part
  105. part := ChunkedPart{
  106. PartId: b.current.Node,
  107. ChunkHash: []HashKey{chash},
  108. ContentBoundary: b.Info.boundary(b.current.ContentBoundary),
  109. Size: uint(len(b.buf)),
  110. }
  111. b.fillInfo(&part, 0)
  112. b.Info.Parts = append(b.Info.Parts, part)
  113. b.Info.Count++
  114. }
  115. if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
  116. return err
  117. }
  118. return nil
  119. }
  120. func (b *ChunkedBytesBufferMime) fillInfo(cp *ChunkedPart, index int) {
  121. if cp.ContentType == "" && b.current.ContentType != nil {
  122. cp.ContentType = b.current.ContentType.String()
  123. }
  124. if cp.Charset == "" && b.current.Charset != "" {
  125. cp.Charset = b.current.Charset
  126. }
  127. if cp.TransferEncoding == "" && b.current.TransferEncoding != "" {
  128. cp.TransferEncoding = b.current.TransferEncoding
  129. }
  130. if cp.ContentDisposition == "" && b.current.ContentDisposition != "" {
  131. cp.ContentDisposition = b.current.ContentDisposition
  132. if strings.Contains(cp.ContentDisposition, "attach") {
  133. b.Info.HasAttach = true
  134. }
  135. }
  136. if cp.ContentType != "" {
  137. if b.Info.TextPart == -1 && strings.Contains(cp.ContentType, "text/plain") {
  138. b.Info.TextPart = index
  139. } else if b.Info.HTMLPart == -1 && strings.Contains(cp.ContentType, "text/html") {
  140. b.Info.HTMLPart = index
  141. }
  142. }
  143. }
  144. // Reset decorates the Reset method of the chunkedBytesBuffer
  145. func (b *ChunkedBytesBufferMime) Reset() {
  146. b.md5.Reset()
  147. b.chunkedBytesBuffer.Reset()
  148. }
  149. func (b *ChunkedBytesBufferMime) CurrentPart(cp *mime.Part) {
  150. if b.current == nil {
  151. b.Info = *NewPartsInfo()
  152. b.Info.Parts = make([]ChunkedPart, 0, 3)
  153. b.Info.TextPart = -1
  154. b.Info.HTMLPart = -1
  155. }
  156. b.current = cp
  157. }