processor.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package chunk
  2. import (
  3. "errors"
  4. "net"
  5. "github.com/flashmob/go-guerrilla/backends"
  6. "github.com/flashmob/go-guerrilla/mail"
  7. "github.com/flashmob/go-guerrilla/mail/mime"
  8. )
  9. // ----------------------------------------------------------------------------------
  10. // Processor Name: ChunkSaver
  11. // ----------------------------------------------------------------------------------
  12. // Description : Takes the stream and saves it in chunks. Chunks are split on the
  13. // : chunksaver_chunk_size config setting, and also at the end of MIME parts,
  14. // : and after a header. This allows for basic de-duplication: we can take a
  15. // : hash of each chunk, then check the database to see if we have it already.
  16. // : We don't need to write it to the database, but take the reference of the
  17. // : previously saved chunk and only increment the reference count.
  18. // : The rationale to put headers and bodies into separate chunks is
  19. // : due to headers often containing more unique data, while the bodies are
  20. // : often duplicated, especially for messages that are CC'd or forwarded
  21. // ----------------------------------------------------------------------------------
  22. // Requires : "mimeanalyzer" stream processor to be enabled before it
  23. // ----------------------------------------------------------------------------------
  24. // Config Options: chunksaver_chunk_size - maximum chunk size, in bytes
  25. // --------------:-------------------------------------------------------------------
  26. // Input : e.Values["MimeParts"] Which is of type *mime.Parts, as populated by "mimeanalyzer"
  27. // ----------------------------------------------------------------------------------
  28. // Output :
  29. // ----------------------------------------------------------------------------------
  30. func init() {
  31. backends.Streamers["chunksaver"] = func() *backends.StreamDecorator {
  32. return Chunksaver()
  33. }
  34. }
  35. type Config struct {
  36. // ChunkMaxBytes controls the maximum buffer size for saving
  37. // 16KB default.
  38. ChunkMaxBytes int `json:"chunksaver_chunk_size,omitempty"`
  39. StorageEngine string `json:"chunksaver_storage_engine,omitempty"`
  40. CompressLevel int `json:"chunksaver_compress_level,omitempty"`
  41. }
  42. //const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
  43. /**
  44. *
  45. * A chunk ends ether:
  46. * after xKB or after end of a part, or end of header
  47. *
  48. * - buffer first chunk
  49. * - if didn't receive first chunk for more than x bytes, save normally
  50. *
  51. */
  52. func Chunksaver() *backends.StreamDecorator {
  53. sd := &backends.StreamDecorator{}
  54. sd.Decorate =
  55. func(sp backends.StreamProcessor, a ...interface{}) backends.StreamProcessor {
  56. var (
  57. envelope *mail.Envelope
  58. chunkBuffer *ChunkingBufferMime
  59. msgPos uint
  60. database Storage
  61. written int64
  62. // just some headers from the first mime-part
  63. subject string
  64. to string
  65. from string
  66. progress int // tracks which mime parts were processed
  67. )
  68. var config *Config
  69. // optional dependency injection (you can pass your own instance of Storage or ChunkingBufferMime)
  70. for i := range a {
  71. if db, ok := a[i].(Storage); ok {
  72. database = db
  73. }
  74. if buff, ok := a[i].(*ChunkingBufferMime); ok {
  75. chunkBuffer = buff
  76. }
  77. }
  78. backends.Svc.AddInitializer(backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
  79. configType := backends.BaseConfig(&Config{})
  80. bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType)
  81. if err != nil {
  82. return err
  83. }
  84. config = bcfg.(*Config)
  85. if chunkBuffer == nil {
  86. chunkBuffer = NewChunkedBytesBufferMime()
  87. }
  88. // configure storage if none was injected
  89. if database == nil {
  90. if config.StorageEngine == "memory" {
  91. db := new(StoreMemory)
  92. db.CompressLevel = config.CompressLevel
  93. database = db
  94. } else {
  95. db := new(StoreSQL)
  96. database = db
  97. }
  98. }
  99. err = database.Initialize(backendConfig)
  100. if err != nil {
  101. return err
  102. }
  103. // configure the chunks buffer
  104. if config.ChunkMaxBytes > 0 {
  105. chunkBuffer.CapTo(config.ChunkMaxBytes)
  106. } else {
  107. chunkBuffer.CapTo(chunkMaxBytes)
  108. }
  109. chunkBuffer.SetDatabase(database)
  110. return nil
  111. }))
  112. backends.Svc.AddShutdowner(backends.ShutdownWith(func() error {
  113. err := database.Shutdown()
  114. return err
  115. }))
  116. var writeTo uint
  117. var pos int
  118. sd.Open = func(e *mail.Envelope) error {
  119. // create a new entry & grab the id
  120. written = 0
  121. progress = 0
  122. var ip net.IPAddr
  123. if ret := net.ParseIP(e.RemoteIP); ret != nil {
  124. ip = net.IPAddr{IP: ret}
  125. }
  126. mid, err := database.OpenMessage(
  127. e.MailFrom.String(),
  128. e.Helo,
  129. e.RcptTo[0].String(),
  130. ip,
  131. e.MailFrom.String(),
  132. e.TLS,
  133. e.TransportType,
  134. )
  135. if err != nil {
  136. return err
  137. }
  138. e.Values["messageID"] = mid
  139. envelope = e
  140. return nil
  141. }
  142. sd.Close = func() (err error) {
  143. err = chunkBuffer.Flush()
  144. if err != nil {
  145. // TODO we could delete the half saved message here
  146. return err
  147. }
  148. defer chunkBuffer.Reset()
  149. if mid, ok := envelope.Values["messageID"].(uint64); ok {
  150. err = database.CloseMessage(
  151. mid,
  152. written,
  153. &chunkBuffer.Info,
  154. subject,
  155. envelope.QueuedId,
  156. to,
  157. from,
  158. )
  159. if err != nil {
  160. return err
  161. }
  162. }
  163. return nil
  164. }
  165. fillVars := func(parts *mime.Parts, subject, to, from string) (string, string, string) {
  166. if len(*parts) > 0 {
  167. if subject == "" {
  168. if val, ok := (*parts)[0].Headers["Subject"]; ok {
  169. subject = val[0]
  170. }
  171. }
  172. if to == "" {
  173. if val, ok := (*parts)[0].Headers["To"]; ok {
  174. addr, err := mail.NewAddress(val[0])
  175. if err == nil {
  176. to = addr.String()
  177. }
  178. }
  179. }
  180. if from == "" {
  181. if val, ok := (*parts)[0].Headers["From"]; ok {
  182. addr, err := mail.NewAddress(val[0])
  183. if err == nil {
  184. from = addr.String()
  185. }
  186. }
  187. }
  188. }
  189. return subject, to, from
  190. }
  191. // end() triggers a buffer flush, at the end of a header or part-boundary
  192. end := func(part *mime.Part, offset uint, p []byte, start uint) (int, error) {
  193. var err error
  194. var count int
  195. // write out any unwritten bytes
  196. writeTo = start - offset
  197. size := uint(len(p))
  198. if writeTo > size {
  199. writeTo = size
  200. }
  201. if writeTo > 0 {
  202. count, err = chunkBuffer.Write(p[pos:writeTo])
  203. written += int64(count)
  204. pos += count
  205. if err != nil {
  206. return count, err
  207. }
  208. } else {
  209. count = 0
  210. }
  211. err = chunkBuffer.Flush()
  212. if err != nil {
  213. return count, err
  214. }
  215. chunkBuffer.CurrentPart(part)
  216. return count, nil
  217. }
  218. return backends.StreamProcessWith(func(p []byte) (count int, err error) {
  219. pos = 0
  220. if envelope.Values == nil {
  221. return count, errors.New("no message headers found")
  222. }
  223. if parts, ok := envelope.Values["MimeParts"].(*mime.Parts); ok && len(*parts) > 0 {
  224. subject, to, from = fillVars(parts, subject, to, from)
  225. offset := msgPos
  226. chunkBuffer.CurrentPart((*parts)[0])
  227. for i := progress; i < len(*parts); i++ {
  228. part := (*parts)[i]
  229. // break chunk on new part
  230. if part.StartingPos > 0 && part.StartingPos >= msgPos {
  231. count, err = end(part, offset, p, part.StartingPos)
  232. if err != nil {
  233. return count, err
  234. }
  235. // end of a part here
  236. //fmt.Println("->N --end of part ---")
  237. msgPos = part.StartingPos
  238. }
  239. // break chunk on header
  240. if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
  241. count, err = end(part, offset, p, part.StartingPosBody)
  242. if err != nil {
  243. return count, err
  244. }
  245. // end of a header here
  246. //fmt.Println("->H --end of header --")
  247. msgPos += uint(count)
  248. }
  249. // if on the latest (last) part, and yet there is still data to be written out
  250. if len(*parts)-1 == i && len(p) > pos {
  251. count, _ = chunkBuffer.Write(p[pos:])
  252. written += int64(count)
  253. pos += count
  254. msgPos += uint(count)
  255. }
  256. // if there's no more data
  257. if pos >= len(p) {
  258. break
  259. }
  260. }
  261. if len(*parts) > 2 {
  262. progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
  263. }
  264. }
  265. return sp.Write(p)
  266. })
  267. }
  268. return sd
  269. }