processor.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. package chunk
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "github.com/flashmob/go-guerrilla/backends"
  7. "github.com/flashmob/go-guerrilla/mail"
  8. "github.com/flashmob/go-guerrilla/mail/mime"
  9. )
  10. // ----------------------------------------------------------------------------------
  11. // Processor Name: ChunkSaver
  12. // ----------------------------------------------------------------------------------
  13. // Description : Takes the stream and saves it in chunks. Chunks are split on the
  14. // : chunksaver_chunk_size config setting, and also at the end of MIME parts,
  15. // : and after a header. This allows for basic de-duplication: we can take a
  16. // : hash of each chunk, then check the database to see if we have it already.
  17. // : We don't need to write it to the database, but take the reference of the
  18. // : previously saved chunk and only increment the reference count.
  19. // : The rationale to put headers and bodies into separate chunks is
  20. // : due to headers often containing more unique data, while the bodies are
  21. // : often duplicated, especially for messages that are CC'd or forwarded
  22. // ----------------------------------------------------------------------------------
  23. // Requires : "mimeanalyzer" stream processor to be enabled before it
  24. // ----------------------------------------------------------------------------------
  25. // Config Options: chunksaver_chunk_size - maximum chunk size, in bytes
  26. // --------------:-------------------------------------------------------------------
  27. // Input : e.Values["MimeParts"] Which is of type *[]*mime.Part, as populated by "mimeanalyzer"
  28. // ----------------------------------------------------------------------------------
  29. // Output :
  30. // ----------------------------------------------------------------------------------
  31. func init() {
  32. backends.Streamers["chunksaver"] = func() *backends.StreamDecorator {
  33. return Chunksaver()
  34. }
  35. }
  36. type ChunkSaverConfig struct {
  37. // ChunkMaxBytes controls the maximum buffer size for saving
  38. // 16KB default.
  39. ChunkMaxBytes int `json:"chunksaver_chunk_size,omitempty"`
  40. StorageEngine string `json:"chunksaver_storage_engine,omitempty"`
  41. CompressLevel int `json:"chunksaver_compress_level,omitempty"`
  42. }
  43. //const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
  44. /**
  45. *
  46. * A chunk ends ether:
  47. * after xKB or after end of a part, or end of header
  48. *
  49. * - buffer first chunk
  50. * - if didn't receive first chunk for more than x bytes, save normally
  51. *
  52. */
  53. func Chunksaver() *backends.StreamDecorator {
  54. sd := &backends.StreamDecorator{}
  55. sd.Decorate =
  56. func(sp backends.StreamProcessor, a ...interface{}) backends.StreamProcessor {
  57. var (
  58. envelope *mail.Envelope
  59. chunkBuffer *ChunkedBytesBufferMime
  60. msgPos uint
  61. database ChunkSaverStorage
  62. written int64
  63. // just some headers from the first mime-part
  64. subject string
  65. to string
  66. from string
  67. progress int // tracks which mime parts were processed
  68. )
  69. var config *ChunkSaverConfig
  70. // optional dependency injection
  71. for i := range a {
  72. if db, ok := a[i].(ChunkSaverStorage); ok {
  73. database = db
  74. }
  75. if buff, ok := a[i].(*ChunkedBytesBufferMime); ok {
  76. chunkBuffer = buff
  77. }
  78. }
  79. backends.Svc.AddInitializer(backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
  80. configType := backends.BaseConfig(&ChunkSaverConfig{})
  81. bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType)
  82. if err != nil {
  83. return err
  84. }
  85. config = bcfg.(*ChunkSaverConfig)
  86. if chunkBuffer == nil {
  87. chunkBuffer = NewChunkedBytesBufferMime()
  88. }
  89. // configure storage if none was injected
  90. if database == nil {
  91. if config.StorageEngine == "memory" {
  92. db := new(ChunkSaverMemory)
  93. db.CompressLevel = config.CompressLevel
  94. database = db
  95. } else {
  96. db := new(ChunkSaverSQL)
  97. database = db
  98. }
  99. }
  100. err = database.Initialize(backendConfig)
  101. if err != nil {
  102. return err
  103. }
  104. // configure the chunks buffer
  105. if config.ChunkMaxBytes > 0 {
  106. chunkBuffer.CapTo(config.ChunkMaxBytes)
  107. } else {
  108. chunkBuffer.CapTo(chunkMaxBytes)
  109. }
  110. chunkBuffer.SetDatabase(database)
  111. return nil
  112. }))
  113. backends.Svc.AddShutdowner(backends.ShutdownWith(func() error {
  114. err := database.Shutdown()
  115. return err
  116. }))
  117. sd.Open = func(e *mail.Envelope) error {
  118. // create a new entry & grab the id
  119. written = 0
  120. progress = 0
  121. var ip net.IPAddr
  122. if ret := net.ParseIP(e.RemoteIP); ret != nil {
  123. ip = net.IPAddr{IP: ret}
  124. }
  125. mid, err := database.OpenMessage(
  126. e.MailFrom.String(),
  127. e.Helo,
  128. e.RcptTo[0].String(),
  129. ip,
  130. e.MailFrom.String(),
  131. e.TLS)
  132. if err != nil {
  133. return err
  134. }
  135. e.Values["messageID"] = mid
  136. envelope = e
  137. return nil
  138. }
  139. sd.Close = func() (err error) {
  140. err = chunkBuffer.Flush()
  141. if err != nil {
  142. // TODO we could delete the half saved message here
  143. return err
  144. }
  145. defer chunkBuffer.Reset()
  146. if mid, ok := envelope.Values["messageID"].(uint64); ok {
  147. err = database.CloseMessage(
  148. mid,
  149. written,
  150. &chunkBuffer.Info,
  151. subject,
  152. envelope.QueuedId,
  153. to,
  154. from,
  155. )
  156. if err != nil {
  157. return err
  158. }
  159. }
  160. return nil
  161. }
  162. fillVars := func(parts *[]*mime.Part, subject, to, from string) (string, string, string) {
  163. if len(*parts) > 0 {
  164. if subject == "" {
  165. if val, ok := (*parts)[0].Headers["Subject"]; ok {
  166. subject = val[0]
  167. }
  168. }
  169. if to == "" {
  170. if val, ok := (*parts)[0].Headers["To"]; ok {
  171. addr, err := mail.NewAddress(val[0])
  172. if err == nil {
  173. to = addr.String()
  174. }
  175. }
  176. }
  177. if from == "" {
  178. if val, ok := (*parts)[0].Headers["From"]; ok {
  179. addr, err := mail.NewAddress(val[0])
  180. if err == nil {
  181. from = addr.String()
  182. }
  183. }
  184. }
  185. }
  186. return subject, to, from
  187. }
  188. return backends.StreamProcessWith(func(p []byte) (count int, err error) {
  189. if envelope.Values == nil {
  190. return count, errors.New("no message headers found")
  191. }
  192. if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok && len(*parts) > 0 {
  193. var pos int
  194. subject, to, from = fillVars(parts, subject, to, from)
  195. offset := msgPos
  196. chunkBuffer.CurrentPart((*parts)[0])
  197. for i := progress; i < len(*parts); i++ {
  198. part := (*parts)[i]
  199. // break chunk on new part
  200. if part.StartingPos > 0 && part.StartingPos > msgPos {
  201. count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
  202. written += int64(count)
  203. err = chunkBuffer.Flush()
  204. if err != nil {
  205. return count, err
  206. }
  207. chunkBuffer.CurrentPart(part)
  208. fmt.Println("->N")
  209. pos += count
  210. msgPos = part.StartingPos
  211. }
  212. // break chunk on header
  213. if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
  214. count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
  215. written += int64(count)
  216. err = chunkBuffer.Flush()
  217. if err != nil {
  218. return count, err
  219. }
  220. chunkBuffer.CurrentPart(part)
  221. fmt.Println("->H")
  222. pos += count
  223. msgPos = part.StartingPosBody
  224. }
  225. // if on the latest (last) part, and yet there is still data to be written out
  226. if len(*parts)-1 == i && len(p)-1 > pos {
  227. count, _ = chunkBuffer.Write(p[pos:])
  228. written += int64(count)
  229. pos += count
  230. msgPos += uint(count)
  231. }
  232. // if there's no more data
  233. if pos >= len(p) {
  234. break
  235. }
  236. }
  237. if len(*parts) > 2 {
  238. progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
  239. }
  240. }
  241. return sp.Write(p)
  242. })
  243. }
  244. return sd
  245. }