processor.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. package chunk
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "github.com/flashmob/go-guerrilla/backends"
  7. "github.com/flashmob/go-guerrilla/mail"
  8. "github.com/flashmob/go-guerrilla/mail/mimeparse"
  9. )
  10. // ----------------------------------------------------------------------------------
  11. // Processor Name: ChunkSaver
  12. // ----------------------------------------------------------------------------------
  13. // Description : Takes the stream and saves it in chunks. Chunks are split on the
  14. // : chunk_size config setting, and also at the end of MIME parts,
  15. // : and after a header. This allows for basic de-duplication: we can take a
  16. // : hash of each chunk, then check the database to see if we have it already.
  17. // : We don't need to write it to the database, but take the reference of the
  18. // : previously saved chunk and only increment the reference count.
  19. // : The rationale to put headers and bodies into separate chunks is
  20. // : due to headers often containing more unique data, while the bodies are
  21. // : often duplicated, especially for messages that are CC'd or forwarded
  22. // ----------------------------------------------------------------------------------
  23. // Requires : "mimeanalyzer" stream processor to be enabled before it
  24. // ----------------------------------------------------------------------------------
  25. // Config Options: chunk_size - maximum chunk size, in bytes
  26. // : storage_engine - "sql" or "memory", or make your own extension
  27. // --------------:-------------------------------------------------------------------
  28. // Input : e.MimeParts Which is of type *mime.Parts, as populated by "mimeanalyzer"
  29. // ----------------------------------------------------------------------------------
  30. // Output : Messages are saved using the Storage interface
  31. // : See store_sql.go and store_sql.go as examples
  32. // ----------------------------------------------------------------------------------
  33. func init() {
  34. backends.Streamers["chunksaver"] = func() *backends.StreamDecorator {
  35. return Chunksaver()
  36. }
  37. }
  38. type Config struct {
  39. // ChunkMaxBytes controls the maximum buffer size for saving
  40. // 16KB default.
  41. ChunkMaxBytes int `json:"chunk_size,omitempty"`
  42. // ChunkPrefetchCount specifies how many chunks to pre-fetch when reading from storage. Default: 2, Max: 32
  43. ChunkPrefetchCount int `json:"chunk_prefetch_count,omitempty"`
  44. // StorageEngine specifies which storage engine to use (see the StorageEngines map)
  45. StorageEngine string `json:"storage_engine,omitempty"`
  46. }
  47. //const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunk_size config setting
  48. /**
  49. *
  50. * A chunk ends ether:
  51. * after xKB or after end of a part, or end of header
  52. *
  53. * - buffer first chunk
  54. * - if didn't receive first chunk for more than x bytes, save normally
  55. *
  56. */
  57. func Chunksaver() *backends.StreamDecorator {
  58. var (
  59. config Config
  60. envelope *mail.Envelope
  61. chunkBuffer *ChunkingBufferMime
  62. msgPos uint
  63. database Storage
  64. written int64
  65. // just some headers from the first mime-part
  66. subject string
  67. to string
  68. from string
  69. progress int // tracks which mime parts were processed
  70. )
  71. sd := &backends.StreamDecorator{}
  72. sd.Configure = func(cfg backends.ConfigGroup) error {
  73. err := sd.ExtractConfig(cfg, &config)
  74. if err != nil {
  75. return err
  76. }
  77. if chunkBuffer == nil {
  78. chunkBuffer = NewChunkedBytesBufferMime()
  79. }
  80. // database could be injected when Decorate is called
  81. if database == nil {
  82. // configure storage if none was injected
  83. if config.StorageEngine == "" {
  84. return errors.New("storage_engine setting not configured")
  85. }
  86. if makerFn, ok := StorageEngines[config.StorageEngine]; ok {
  87. database = makerFn()
  88. } else {
  89. return fmt.Errorf("storage engine does not exist [%s]", config.StorageEngine)
  90. }
  91. }
  92. err = database.Initialize(cfg)
  93. if err != nil {
  94. return err
  95. }
  96. // configure the chunks buffer
  97. if config.ChunkMaxBytes > 0 {
  98. chunkBuffer.CapTo(config.ChunkMaxBytes)
  99. } else {
  100. chunkBuffer.CapTo(chunkMaxBytes)
  101. }
  102. return nil
  103. }
  104. sd.Shutdown = func() error {
  105. err := database.Shutdown()
  106. return err
  107. }
  108. sd.GetEmail = func(emailID uint64) (backends.SeekPartReader, error) {
  109. if database == nil {
  110. return nil, errors.New("database is nil")
  111. }
  112. email, err := database.GetMessage(emailID)
  113. if err != nil {
  114. return nil, errors.New("email not found")
  115. }
  116. r, err := NewChunkedReader(database, email, 0)
  117. if r != nil && config.ChunkPrefetchCount > 0 {
  118. // override the default with the configured value
  119. r.ChunkPrefetchCount = config.ChunkPrefetchCount
  120. if r.ChunkPrefetchCount > chunkPrefetchMax {
  121. r.ChunkPrefetchCount = chunkPrefetchMax
  122. }
  123. }
  124. return r, err
  125. }
  126. sd.Decorate =
  127. func(sp backends.StreamProcessor, a ...interface{}) backends.StreamProcessor {
  128. // optional dependency injection (you can pass your own instance of Storage or ChunkingBufferMime)
  129. for i := range a {
  130. if db, ok := a[i].(Storage); ok {
  131. database = db
  132. }
  133. if buff, ok := a[i].(*ChunkingBufferMime); ok {
  134. chunkBuffer = buff
  135. }
  136. }
  137. if database != nil {
  138. chunkBuffer.SetDatabase(database)
  139. }
  140. var writeTo uint
  141. var pos int
  142. sd.Open = func(e *mail.Envelope) error {
  143. // create a new entry & grab the id
  144. written = 0
  145. progress = 0
  146. var ip IPAddr
  147. if ret := net.ParseIP(e.RemoteIP); ret != nil {
  148. ip = IPAddr{net.IPAddr{IP: ret}}
  149. }
  150. mid, err := database.OpenMessage(
  151. e.QueuedId,
  152. e.MailFrom.String(),
  153. e.Helo,
  154. e.RcptTo[0].String(),
  155. ip,
  156. e.MailFrom.String(),
  157. e.Protocol(),
  158. e.TransportType,
  159. )
  160. if err != nil {
  161. return err
  162. }
  163. e.MessageID = mid
  164. envelope = e
  165. return nil
  166. }
  167. sd.Close = func() (err error) {
  168. err = chunkBuffer.Flush()
  169. if err != nil {
  170. // TODO we could delete the half saved message here
  171. return err
  172. }
  173. if mimeErr, ok := envelope.MimeError.(*mimeparse.Error); ok {
  174. mErr := mimeErr.Unwrap()
  175. chunkBuffer.Info.Err = mErr
  176. }
  177. defer chunkBuffer.Reset()
  178. if envelope.MessageID > 0 {
  179. err = database.CloseMessage(
  180. envelope.MessageID,
  181. written,
  182. &chunkBuffer.Info,
  183. subject,
  184. to,
  185. from,
  186. )
  187. if err != nil {
  188. return err
  189. }
  190. }
  191. return nil
  192. }
  193. fillVars := func(parts *mimeparse.Parts, subject, to, from string) (string, string, string) {
  194. if len(*parts) > 0 {
  195. if subject == "" {
  196. if val, ok := (*parts)[0].Headers["Subject"]; ok {
  197. subject = val[0]
  198. }
  199. }
  200. if to == "" {
  201. if val, ok := (*parts)[0].Headers["To"]; ok {
  202. addr, err := mail.NewAddress(val[0])
  203. if err == nil {
  204. to = addr.String()
  205. }
  206. }
  207. }
  208. if from == "" {
  209. if val, ok := (*parts)[0].Headers["From"]; ok {
  210. addr, err := mail.NewAddress(val[0])
  211. if err == nil {
  212. from = addr.String()
  213. }
  214. }
  215. }
  216. }
  217. return subject, to, from
  218. }
  219. // end() triggers a buffer flush, at the end of a header or part-boundary
  220. end := func(part *mimeparse.Part, offset uint, p []byte, start uint) (int, error) {
  221. var err error
  222. var count int
  223. // write out any unwritten bytes
  224. writeTo = start - offset
  225. size := uint(len(p))
  226. if writeTo > size {
  227. writeTo = size
  228. }
  229. if writeTo > 0 {
  230. count, err = chunkBuffer.Write(p[pos:writeTo])
  231. written += int64(count)
  232. pos += count
  233. if err != nil {
  234. return count, err
  235. }
  236. } else {
  237. count = 0
  238. }
  239. err = chunkBuffer.Flush()
  240. if err != nil {
  241. return count, err
  242. }
  243. chunkBuffer.CurrentPart(part)
  244. return count, nil
  245. }
  246. return backends.StreamProcessWith(func(p []byte) (count int, err error) {
  247. pos = 0
  248. if envelope.MimeParts == nil {
  249. return count, errors.New("no message headers found")
  250. } else if len(*envelope.MimeParts) > 0 {
  251. parts := envelope.MimeParts
  252. subject, to, from = fillVars(parts, subject, to, from)
  253. offset := msgPos
  254. chunkBuffer.CurrentPart((*parts)[0])
  255. for i := progress; i < len(*parts); i++ {
  256. part := (*parts)[i]
  257. // break chunk on new part
  258. if part.StartingPos > 0 && part.StartingPos >= msgPos {
  259. count, err = end(part, offset, p, part.StartingPos)
  260. if err != nil {
  261. return count, err
  262. }
  263. // end of a part here
  264. //fmt.Println("->N --end of part ---")
  265. msgPos = part.StartingPos
  266. }
  267. // break chunk on header
  268. if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
  269. count, err = end(part, offset, p, part.StartingPosBody)
  270. if err != nil {
  271. return count, err
  272. }
  273. // end of a header here
  274. //fmt.Println("->H --end of header --")
  275. msgPos += uint(count)
  276. }
  277. // if on the latest (last) part, and yet there is still data to be written out
  278. if len(*parts)-1 == i && len(p) > pos {
  279. count, _ = chunkBuffer.Write(p[pos:])
  280. written += int64(count)
  281. pos += count
  282. msgPos += uint(count)
  283. }
  284. // if there's no more data
  285. if pos >= len(p) {
  286. break
  287. }
  288. }
  289. if len(*parts) > 2 {
  290. progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
  291. }
  292. }
  293. return sp.Write(p)
  294. })
  295. }
  296. return sd
  297. }