processor.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. package chunk
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "github.com/flashmob/go-guerrilla/backends"
  7. "github.com/flashmob/go-guerrilla/mail"
  8. "github.com/flashmob/go-guerrilla/mail/mimeparse"
  9. )
  10. // ----------------------------------------------------------------------------------
  11. // Processor Name: ChunkSaver
  12. // ----------------------------------------------------------------------------------
  13. // Description : Takes the stream and saves it in chunks. Chunks are split on the
  14. // : chunk_size config setting, and also at the end of MIME parts,
  15. // : and after a header. This allows for basic de-duplication: we can take a
  16. // : hash of each chunk, then check the database to see if we have it already.
  17. // : We don't need to write it to the database, but take the reference of the
  18. // : previously saved chunk and only increment the reference count.
  19. // : The rationale to put headers and bodies into separate chunks is
  20. // : due to headers often containing more unique data, while the bodies are
  21. // : often duplicated, especially for messages that are CC'd or forwarded
  22. // ----------------------------------------------------------------------------------
  23. // Requires : "mimeanalyzer" stream processor to be enabled before it
  24. // ----------------------------------------------------------------------------------
  25. // Config Options: chunk_size - maximum chunk size, in bytes
  26. // : storage_engine - "sql" or "memory", or make your own extension
  27. // --------------:-------------------------------------------------------------------
  28. // Input : e.MimeParts Which is of type *mime.Parts, as populated by "mimeanalyzer"
  29. // ----------------------------------------------------------------------------------
  30. // Output : Messages are saved using the Storage interface
  31. // : See store_sql.go and store_sql.go as examples
  32. // ----------------------------------------------------------------------------------
  33. func init() {
  34. backends.Streamers["chunksaver"] = func() *backends.StreamDecorator {
  35. return Chunksaver()
  36. }
  37. }
  38. type Config struct {
  39. // ChunkMaxBytes controls the maximum buffer size for saving
  40. // 16KB default.
  41. ChunkMaxBytes int `json:"chunk_size,omitempty"`
  42. // ChunkPrefetchCount specifies how many chunks to pre-fetch when reading from storage.
  43. // It may reduce the number of trips required to storage
  44. ChunkPrefetchCount int `json:"chunk_prefetch_count,omitempty"`
  45. // StorageEngine specifies which storage engine to use (see the StorageEngines map)
  46. StorageEngine string `json:"storage_engine,omitempty"`
  47. }
  48. //const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunk_size config setting
  49. /**
  50. *
  51. * A chunk ends ether:
  52. * after xKB or after end of a part, or end of header
  53. *
  54. * - buffer first chunk
  55. * - if didn't receive first chunk for more than x bytes, save normally
  56. *
  57. */
  58. func Chunksaver() *backends.StreamDecorator {
  59. var (
  60. config Config
  61. envelope *mail.Envelope
  62. chunkBuffer *ChunkingBufferMime
  63. msgPos uint
  64. database Storage
  65. written int64
  66. // just some headers from the first mime-part
  67. subject string
  68. to string
  69. from string
  70. progress int // tracks which mime parts were processed
  71. )
  72. sd := &backends.StreamDecorator{}
  73. sd.Configure = func(cfg backends.ConfigGroup) error {
  74. err := sd.ExtractConfig(cfg, &config)
  75. if err != nil {
  76. return err
  77. }
  78. if chunkBuffer == nil {
  79. chunkBuffer = NewChunkedBytesBufferMime()
  80. }
  81. // database could be injected when Decorate is called
  82. if database == nil {
  83. // configure storage if none was injected
  84. if config.StorageEngine == "" {
  85. return errors.New("storage_engine setting not configured")
  86. }
  87. if makerFn, ok := StorageEngines[config.StorageEngine]; ok {
  88. database = makerFn()
  89. } else {
  90. return fmt.Errorf("storage engine does not exist [%s]", config.StorageEngine)
  91. }
  92. }
  93. err = database.Initialize(cfg)
  94. if err != nil {
  95. return err
  96. }
  97. // configure the chunks buffer
  98. if config.ChunkMaxBytes > 0 {
  99. chunkBuffer.CapTo(config.ChunkMaxBytes)
  100. } else {
  101. chunkBuffer.CapTo(chunkMaxBytes)
  102. }
  103. return nil
  104. }
  105. sd.Shutdown = func() error {
  106. err := database.Shutdown()
  107. return err
  108. }
  109. sd.GetEmail = func(emailID uint64) (backends.SeekPartReader, error) {
  110. if database == nil {
  111. return nil, errors.New("database is nil")
  112. }
  113. email, err := database.GetEmail(emailID)
  114. if err != nil {
  115. return nil, errors.New("email not found")
  116. }
  117. r, err := NewChunkedReader(database, email, 0)
  118. r.ChunkPrefetchCount = config.ChunkPrefetchCount
  119. return r, err
  120. }
  121. sd.Decorate =
  122. func(sp backends.StreamProcessor, a ...interface{}) backends.StreamProcessor {
  123. // optional dependency injection (you can pass your own instance of Storage or ChunkingBufferMime)
  124. for i := range a {
  125. if db, ok := a[i].(Storage); ok {
  126. database = db
  127. }
  128. if buff, ok := a[i].(*ChunkingBufferMime); ok {
  129. chunkBuffer = buff
  130. }
  131. }
  132. if database != nil {
  133. chunkBuffer.SetDatabase(database)
  134. }
  135. var writeTo uint
  136. var pos int
  137. sd.Open = func(e *mail.Envelope) error {
  138. // create a new entry & grab the id
  139. written = 0
  140. progress = 0
  141. var ip IPAddr
  142. if ret := net.ParseIP(e.RemoteIP); ret != nil {
  143. ip = IPAddr{net.IPAddr{IP: ret}}
  144. }
  145. mid, err := database.OpenMessage(
  146. e.QueuedId,
  147. e.MailFrom.String(),
  148. e.Helo,
  149. e.RcptTo[0].String(),
  150. ip,
  151. e.MailFrom.String(),
  152. e.Protocol(),
  153. e.TransportType,
  154. )
  155. if err != nil {
  156. return err
  157. }
  158. e.MessageID = mid
  159. envelope = e
  160. return nil
  161. }
  162. sd.Close = func() (err error) {
  163. err = chunkBuffer.Flush()
  164. if err != nil {
  165. // TODO we could delete the half saved message here
  166. return err
  167. }
  168. if mimeErr, ok := envelope.MimeError.(*mimeparse.Error); ok {
  169. mErr := mimeErr.Unwrap()
  170. chunkBuffer.Info.Err = mErr
  171. }
  172. defer chunkBuffer.Reset()
  173. if envelope.MessageID > 0 {
  174. err = database.CloseMessage(
  175. envelope.MessageID,
  176. written,
  177. &chunkBuffer.Info,
  178. subject,
  179. to,
  180. from,
  181. )
  182. if err != nil {
  183. return err
  184. }
  185. }
  186. return nil
  187. }
  188. fillVars := func(parts *mimeparse.Parts, subject, to, from string) (string, string, string) {
  189. if len(*parts) > 0 {
  190. if subject == "" {
  191. if val, ok := (*parts)[0].Headers["Subject"]; ok {
  192. subject = val[0]
  193. }
  194. }
  195. if to == "" {
  196. if val, ok := (*parts)[0].Headers["To"]; ok {
  197. addr, err := mail.NewAddress(val[0])
  198. if err == nil {
  199. to = addr.String()
  200. }
  201. }
  202. }
  203. if from == "" {
  204. if val, ok := (*parts)[0].Headers["From"]; ok {
  205. addr, err := mail.NewAddress(val[0])
  206. if err == nil {
  207. from = addr.String()
  208. }
  209. }
  210. }
  211. }
  212. return subject, to, from
  213. }
  214. // end() triggers a buffer flush, at the end of a header or part-boundary
  215. end := func(part *mimeparse.Part, offset uint, p []byte, start uint) (int, error) {
  216. var err error
  217. var count int
  218. // write out any unwritten bytes
  219. writeTo = start - offset
  220. size := uint(len(p))
  221. if writeTo > size {
  222. writeTo = size
  223. }
  224. if writeTo > 0 {
  225. count, err = chunkBuffer.Write(p[pos:writeTo])
  226. written += int64(count)
  227. pos += count
  228. if err != nil {
  229. return count, err
  230. }
  231. } else {
  232. count = 0
  233. }
  234. err = chunkBuffer.Flush()
  235. if err != nil {
  236. return count, err
  237. }
  238. chunkBuffer.CurrentPart(part)
  239. return count, nil
  240. }
  241. return backends.StreamProcessWith(func(p []byte) (count int, err error) {
  242. pos = 0
  243. if envelope.MimeParts == nil {
  244. return count, errors.New("no message headers found")
  245. } else if len(*envelope.MimeParts) > 0 {
  246. parts := envelope.MimeParts
  247. subject, to, from = fillVars(parts, subject, to, from)
  248. offset := msgPos
  249. chunkBuffer.CurrentPart((*parts)[0])
  250. for i := progress; i < len(*parts); i++ {
  251. part := (*parts)[i]
  252. // break chunk on new part
  253. if part.StartingPos > 0 && part.StartingPos >= msgPos {
  254. count, err = end(part, offset, p, part.StartingPos)
  255. if err != nil {
  256. return count, err
  257. }
  258. // end of a part here
  259. //fmt.Println("->N --end of part ---")
  260. msgPos = part.StartingPos
  261. }
  262. // break chunk on header
  263. if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
  264. count, err = end(part, offset, p, part.StartingPosBody)
  265. if err != nil {
  266. return count, err
  267. }
  268. // end of a header here
  269. //fmt.Println("->H --end of header --")
  270. msgPos += uint(count)
  271. }
  272. // if on the latest (last) part, and yet there is still data to be written out
  273. if len(*parts)-1 == i && len(p) > pos {
  274. count, _ = chunkBuffer.Write(p[pos:])
  275. written += int64(count)
  276. pos += count
  277. msgPos += uint(count)
  278. }
  279. // if there's no more data
  280. if pos >= len(p) {
  281. break
  282. }
  283. }
  284. if len(*parts) > 2 {
  285. progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
  286. }
  287. }
  288. return sp.Write(p)
  289. })
  290. }
  291. return sd
  292. }