processor.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package chunk
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "github.com/flashmob/go-guerrilla/backends"
  7. "github.com/flashmob/go-guerrilla/mail"
  8. "github.com/flashmob/go-guerrilla/mail/mimeparse"
  9. )
  10. // ----------------------------------------------------------------------------------
  11. // Processor Name: ChunkSaver
  12. // ----------------------------------------------------------------------------------
  13. // Description : Takes the stream and saves it in chunks. Chunks are split on the
  14. // : chunk_size config setting, and also at the end of MIME parts,
  15. // : and after a header. This allows for basic de-duplication: we can take a
  16. // : hash of each chunk, then check the database to see if we have it already.
  17. // : We don't need to write it to the database, but take the reference of the
  18. // : previously saved chunk and only increment the reference count.
  19. // : The rationale to put headers and bodies into separate chunks is
  20. // : due to headers often containing more unique data, while the bodies are
  21. // : often duplicated, especially for messages that are CC'd or forwarded
  22. // ----------------------------------------------------------------------------------
  23. // Requires : "mimeanalyzer" stream processor to be enabled before it
  24. // ----------------------------------------------------------------------------------
  25. // Config Options: chunk_size - maximum chunk size, in bytes
  26. // --------------:-------------------------------------------------------------------
  27. // Input : e.MimeParts Which is of type *mime.Parts, as populated by "mimeanalyzer"
  28. // ----------------------------------------------------------------------------------
  29. // Output : Messages are saved using the Storage interface
  30. // : See store_sql.go and store_sql.go as examples
  31. // ----------------------------------------------------------------------------------
  32. func init() {
  33. backends.Streamers["chunksaver"] = func() *backends.StreamDecorator {
  34. return Chunksaver()
  35. }
  36. }
  37. type Config struct {
  38. // ChunkMaxBytes controls the maximum buffer size for saving
  39. // 16KB default.
  40. ChunkMaxBytes int `json:"chunk_size,omitempty"`
  41. StorageEngine string `json:"storage_engine,omitempty"`
  42. }
  43. //const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunk_size config setting
  44. /**
  45. *
  46. * A chunk ends ether:
  47. * after xKB or after end of a part, or end of header
  48. *
  49. * - buffer first chunk
  50. * - if didn't receive first chunk for more than x bytes, save normally
  51. *
  52. */
  53. func Chunksaver() *backends.StreamDecorator {
  54. var (
  55. config Config
  56. envelope *mail.Envelope
  57. chunkBuffer *ChunkingBufferMime
  58. msgPos uint
  59. database Storage
  60. written int64
  61. // just some headers from the first mime-part
  62. subject string
  63. to string
  64. from string
  65. progress int // tracks which mime parts were processed
  66. )
  67. sd := &backends.StreamDecorator{}
  68. sd.Configure = func(cfg backends.ConfigGroup) error {
  69. err := sd.ExtractConfig(cfg, &config)
  70. if err != nil {
  71. return err
  72. }
  73. if chunkBuffer == nil {
  74. chunkBuffer = NewChunkedBytesBufferMime()
  75. }
  76. // database could be injected when Decorate is called
  77. if database == nil {
  78. // configure storage if none was injected
  79. if config.StorageEngine == "" {
  80. return errors.New("storage_engine setting not configured")
  81. }
  82. if makerFn, ok := StorageEngines[config.StorageEngine]; ok {
  83. database = makerFn()
  84. } else {
  85. return fmt.Errorf("storage engine does not exist [%s]", config.StorageEngine)
  86. }
  87. }
  88. err = database.Initialize(cfg)
  89. if err != nil {
  90. return err
  91. }
  92. // configure the chunks buffer
  93. if config.ChunkMaxBytes > 0 {
  94. chunkBuffer.CapTo(config.ChunkMaxBytes)
  95. } else {
  96. chunkBuffer.CapTo(chunkMaxBytes)
  97. }
  98. return nil
  99. }
  100. sd.Shutdown = func() error {
  101. err := database.Shutdown()
  102. return err
  103. }
  104. sd.Decorate =
  105. func(sp backends.StreamProcessor, a ...interface{}) backends.StreamProcessor {
  106. // optional dependency injection (you can pass your own instance of Storage or ChunkingBufferMime)
  107. for i := range a {
  108. if db, ok := a[i].(Storage); ok {
  109. database = db
  110. }
  111. if buff, ok := a[i].(*ChunkingBufferMime); ok {
  112. chunkBuffer = buff
  113. }
  114. }
  115. if database != nil {
  116. chunkBuffer.SetDatabase(database)
  117. }
  118. var writeTo uint
  119. var pos int
  120. sd.Open = func(e *mail.Envelope) error {
  121. // create a new entry & grab the id
  122. written = 0
  123. progress = 0
  124. var ip net.IPAddr
  125. if ret := net.ParseIP(e.RemoteIP); ret != nil {
  126. ip = net.IPAddr{IP: ret}
  127. }
  128. mid, err := database.OpenMessage(
  129. e.MailFrom.String(),
  130. e.Helo,
  131. e.RcptTo[0].String(),
  132. ip,
  133. e.MailFrom.String(),
  134. e.TLS,
  135. e.TransportType,
  136. )
  137. if err != nil {
  138. return err
  139. }
  140. e.MessageID = mid
  141. envelope = e
  142. return nil
  143. }
  144. sd.Close = func() (err error) {
  145. err = chunkBuffer.Flush()
  146. if err != nil {
  147. // TODO we could delete the half saved message here
  148. return err
  149. }
  150. defer chunkBuffer.Reset()
  151. if envelope.MessageID > 0 {
  152. err = database.CloseMessage(
  153. envelope.MessageID,
  154. written,
  155. &chunkBuffer.Info,
  156. subject,
  157. envelope.QueuedId,
  158. to,
  159. from,
  160. )
  161. if err != nil {
  162. return err
  163. }
  164. }
  165. return nil
  166. }
  167. fillVars := func(parts *mimeparse.Parts, subject, to, from string) (string, string, string) {
  168. if len(*parts) > 0 {
  169. if subject == "" {
  170. if val, ok := (*parts)[0].Headers["Subject"]; ok {
  171. subject = val[0]
  172. }
  173. }
  174. if to == "" {
  175. if val, ok := (*parts)[0].Headers["To"]; ok {
  176. addr, err := mail.NewAddress(val[0])
  177. if err == nil {
  178. to = addr.String()
  179. }
  180. }
  181. }
  182. if from == "" {
  183. if val, ok := (*parts)[0].Headers["From"]; ok {
  184. addr, err := mail.NewAddress(val[0])
  185. if err == nil {
  186. from = addr.String()
  187. }
  188. }
  189. }
  190. }
  191. return subject, to, from
  192. }
  193. // end() triggers a buffer flush, at the end of a header or part-boundary
  194. end := func(part *mimeparse.Part, offset uint, p []byte, start uint) (int, error) {
  195. var err error
  196. var count int
  197. // write out any unwritten bytes
  198. writeTo = start - offset
  199. size := uint(len(p))
  200. if writeTo > size {
  201. writeTo = size
  202. }
  203. if writeTo > 0 {
  204. count, err = chunkBuffer.Write(p[pos:writeTo])
  205. written += int64(count)
  206. pos += count
  207. if err != nil {
  208. return count, err
  209. }
  210. } else {
  211. count = 0
  212. }
  213. err = chunkBuffer.Flush()
  214. if err != nil {
  215. return count, err
  216. }
  217. chunkBuffer.CurrentPart(part)
  218. return count, nil
  219. }
  220. return backends.StreamProcessWith(func(p []byte) (count int, err error) {
  221. pos = 0
  222. if envelope.MimeParts == nil {
  223. return count, errors.New("no message headers found")
  224. } else if len(*envelope.MimeParts) > 0 {
  225. parts := envelope.MimeParts
  226. subject, to, from = fillVars(parts, subject, to, from)
  227. offset := msgPos
  228. chunkBuffer.CurrentPart((*parts)[0])
  229. for i := progress; i < len(*parts); i++ {
  230. part := (*parts)[i]
  231. // break chunk on new part
  232. if part.StartingPos > 0 && part.StartingPos >= msgPos {
  233. count, err = end(part, offset, p, part.StartingPos)
  234. if err != nil {
  235. return count, err
  236. }
  237. // end of a part here
  238. //fmt.Println("->N --end of part ---")
  239. msgPos = part.StartingPos
  240. }
  241. // break chunk on header
  242. if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
  243. count, err = end(part, offset, p, part.StartingPosBody)
  244. if err != nil {
  245. return count, err
  246. }
  247. // end of a header here
  248. //fmt.Println("->H --end of header --")
  249. msgPos += uint(count)
  250. }
  251. // if on the latest (last) part, and yet there is still data to be written out
  252. if len(*parts)-1 == i && len(p) > pos {
  253. count, _ = chunkBuffer.Write(p[pos:])
  254. written += int64(count)
  255. pos += count
  256. msgPos += uint(count)
  257. }
  258. // if there's no more data
  259. if pos >= len(p) {
  260. break
  261. }
  262. }
  263. if len(*parts) > 2 {
  264. progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
  265. }
  266. }
  267. return sp.Write(p)
  268. })
  269. }
  270. return sd
  271. }