s_transformer.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package backends
  2. import (
  3. "bytes"
  4. "io"
  5. "regexp"
  6. "sync"
  7. "github.com/flashmob/go-guerrilla/chunk/transfer"
  8. "github.com/flashmob/go-guerrilla/mail"
  9. "github.com/flashmob/go-guerrilla/mail/mimeparse"
  10. )
  11. // ----------------------------------------------------------------------------------
  12. // Processor Name: transformer
  13. // ----------------------------------------------------------------------------------
  14. // Description : Transforms from base64 / q-printable to 8bit and converts charset to utf-8
  15. // ----------------------------------------------------------------------------------
  16. // Config Options:
  17. // --------------:-------------------------------------------------------------------
  18. // Input : envelope.MimeParts
  19. // ----------------------------------------------------------------------------------
  20. // Output : 8bit mime message, with charsets decoded to UTF-8
  21. // : Note that this processor changes the body counts. Therefore, it makes
  22. // : a new instance of envelope.MimeParts which is then populated
  23. // : by parsing the new re-written message
  24. // ----------------------------------------------------------------------------------
  25. func init() {
  26. Streamers["transformer"] = func() *StreamDecorator {
  27. return Transformer()
  28. }
  29. }
  30. // Transform stream processor: convert an email to UTF-8
  31. type Transform struct {
  32. sp io.Writer
  33. isBody bool // the next bytes to be sent are body?
  34. buf bytes.Buffer
  35. current *mimeparse.Part
  36. decoder io.Reader
  37. pr *io.PipeReader
  38. pw *io.PipeWriter
  39. partsCachedOriginal *mimeparse.Parts
  40. envelope *mail.Envelope
  41. // we re-parse the output since the counts have changed
  42. // parser implements the io.Writer interface, here output will be sent to it and then forwarded to the next processor
  43. parser *mimeparse.Parser
  44. }
  45. // swap caches the original parts from envelope.MimeParts
  46. // and point them to our parts
  47. func (t *Transform) swap() *mimeparse.Parts {
  48. parts := t.envelope.MimeParts
  49. if parts != nil {
  50. t.partsCachedOriginal = parts
  51. parts = &t.parser.Parts
  52. return parts
  53. }
  54. return nil
  55. }
  56. // unswap points the parts from MimeParts back to the original ones
  57. func (t *Transform) unswap() {
  58. if t.envelope.MimeParts != nil {
  59. t.envelope.MimeParts = t.partsCachedOriginal
  60. }
  61. }
  62. // regexpCharset captures the charset value
  63. var regexpCharset = regexp.MustCompile("(?i)charset=\"?(.+)\"?") // (?i) is a flag for case-insensitive
  64. func (t *Transform) ReWrite(b []byte, last bool) (count int, err error) {
  65. defer func() {
  66. count = len(b)
  67. }()
  68. if !t.isBody {
  69. // Header re-write, how it works
  70. // we place the partial header's bytes on a buffer from which we can read one line at a time
  71. // then we match and replace the lines we want, output replaced live.
  72. // The following re-writes are mde:
  73. // - base64 => 8bit
  74. // - supported non-utf8 charset => utf8
  75. if i, err := io.Copy(&t.buf, bytes.NewReader(b)); err != nil {
  76. return int(i), err
  77. }
  78. var charsetProcessed bool
  79. charsetFrom := ""
  80. for {
  81. line, rErr := t.buf.ReadBytes('\n')
  82. if rErr == nil {
  83. if !charsetProcessed {
  84. // is charsetFrom supported?
  85. exists := t.current.Headers.Get("content-type")
  86. if exists != "" {
  87. charsetProcessed = true
  88. charsetFrom = t.current.ContentType.Charset()
  89. if !mail.SupportsCharset(charsetFrom) {
  90. charsetFrom = ""
  91. }
  92. }
  93. }
  94. if bytes.Contains(line, []byte("Content-Transfer-Encoding: base64")) {
  95. line = bytes.Replace(line, []byte("base64"), []byte("8bit"), 1)
  96. } else if bytes.Contains(line, []byte("charset")) {
  97. if match := regexpCharset.FindSubmatch(line); match != nil && len(match) > 0 {
  98. // test if the encoding is supported
  99. if charsetFrom != "" {
  100. // it's supported, we can change it to utf8
  101. line = regexpCharset.ReplaceAll(line, []byte("charset=utf8"))
  102. }
  103. }
  104. }
  105. _, err = io.Copy(t.parser, bytes.NewReader(line))
  106. if err != nil {
  107. return
  108. }
  109. if line[0] == '\n' {
  110. // end of header
  111. break
  112. }
  113. } else {
  114. return
  115. }
  116. }
  117. } else {
  118. if ct := t.current.ContentType.Supertype(); ct == "multipart" || ct == "message" {
  119. _, err = io.Copy(t.parser, bytes.NewReader(b))
  120. return
  121. }
  122. // Body Decode, how it works:
  123. // First, the decoder is setup, depending on the source encoding type.
  124. // Next, since the decoder is an io.Reader, we need to use a pipe to connect it.
  125. // Subsequent calls write to the pipe in a goroutine and the parent-thread copies the result to the output stream
  126. // The routine stops feeding the decoder data before EndingPosBody, and not decoding anything after, but still
  127. // outputting the un-decoded remainder.
  128. // The decoder is destroyed at the end of the body (when last == true)
  129. t.pr, t.pw = io.Pipe()
  130. if t.decoder == nil {
  131. t.buf.Reset()
  132. // the decoder will be reading from an underlying pipe
  133. charsetFrom := t.current.ContentType.Charset()
  134. if charsetFrom == "" {
  135. charsetFrom = mail.MostCommonCharset
  136. }
  137. if mail.SupportsCharset(charsetFrom) {
  138. t.decoder, err = transfer.NewBodyDecoder(t.pr, transfer.ParseEncoding(t.current.TransferEncoding), charsetFrom)
  139. if err != nil {
  140. return
  141. }
  142. t.current.Charset = "utf8"
  143. t.current.TransferEncoding = "8bit"
  144. }
  145. }
  146. wg := sync.WaitGroup{}
  147. wg.Add(1)
  148. // out is the slice that will be decoded
  149. var out []byte
  150. // remainder will not be decoded. Typically, this contains the boundary maker, and we want to preserve it
  151. var remainder []byte
  152. if t.current.EndingPosBody > 0 {
  153. size := t.current.EndingPosBody - t.current.StartingPosBody - 1 // -1 since we do not want \n
  154. out = b[:size]
  155. remainder = b[size:]
  156. } else {
  157. // use the entire slice
  158. out = b
  159. }
  160. go func() {
  161. // stream our slice to the pipe
  162. defer wg.Done()
  163. _, pRrr := io.Copy(t.pw, bytes.NewReader(out))
  164. if pRrr != nil {
  165. _ = t.pw.CloseWithError(err)
  166. return
  167. }
  168. _ = t.pw.Close()
  169. }()
  170. // do the decoding
  171. var i int64
  172. i, err = io.Copy(t.parser, t.decoder)
  173. // wait for the pipe to finish
  174. wg.Wait()
  175. _ = t.pr.Close()
  176. if last {
  177. t.decoder = nil
  178. }
  179. count += int(i)
  180. if err != nil {
  181. return
  182. }
  183. // flush any remainder
  184. if len(remainder) > 0 {
  185. i, err = io.Copy(t.parser, bytes.NewReader(remainder))
  186. count += int(i)
  187. if err != nil {
  188. return
  189. }
  190. }
  191. }
  192. return count, err
  193. }
  194. func (t *Transform) Reset() {
  195. t.decoder = nil
  196. }
  197. func Transformer() *StreamDecorator {
  198. var (
  199. msgPos uint
  200. progress int
  201. )
  202. reWriter := Transform{}
  203. sd := &StreamDecorator{}
  204. sd.Decorate =
  205. func(sp StreamProcessor, a ...interface{}) StreamProcessor {
  206. var (
  207. envelope *mail.Envelope
  208. // total is the total number of bytes written
  209. total int64
  210. // pos tracks the current position of the output slice
  211. pos int
  212. // written is the number of bytes written out in this call
  213. written int
  214. )
  215. if reWriter.sp == nil {
  216. reWriter.sp = sp
  217. }
  218. sd.Open = func(e *mail.Envelope) error {
  219. envelope = e
  220. if reWriter.parser == nil {
  221. reWriter.parser = mimeparse.NewMimeParserWriter(sp)
  222. reWriter.parser.Open()
  223. }
  224. reWriter.envelope = envelope
  225. return nil
  226. }
  227. sd.Close = func() error {
  228. total = 0
  229. return reWriter.parser.Close()
  230. }
  231. end := func(part *mimeparse.Part, offset uint, p []byte, start uint) (int, error) {
  232. var err error
  233. var count int
  234. count, err = reWriter.ReWrite(p[pos:start-offset], true)
  235. written += count
  236. if err != nil {
  237. return count, err
  238. }
  239. reWriter.current = part
  240. pos += count
  241. return count, nil
  242. }
  243. return StreamProcessWith(func(p []byte) (count int, err error) {
  244. pos = 0
  245. written = 0
  246. parts := envelope.MimeParts
  247. if parts != nil && len(*parts) > 0 {
  248. // we are going to change envelope.MimeParts to our own copy with our own counts
  249. envelope.MimeParts = reWriter.swap()
  250. defer func() {
  251. reWriter.unswap()
  252. total += int64(written)
  253. }()
  254. offset := msgPos
  255. reWriter.current = (*parts)[0]
  256. for i := progress; i < len(*parts); i++ {
  257. part := (*parts)[i]
  258. // break chunk on new part
  259. if part.StartingPos > 0 && part.StartingPos >= msgPos {
  260. count, err = end(part, offset, p, part.StartingPos)
  261. if err != nil {
  262. break
  263. }
  264. msgPos = part.StartingPos
  265. reWriter.isBody = false
  266. }
  267. // break chunk on header (found the body)
  268. if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
  269. count, err = end(part, offset, p, part.StartingPosBody)
  270. if err != nil {
  271. break
  272. }
  273. reWriter.isBody = true
  274. msgPos += uint(count)
  275. }
  276. // if on the latest (last) part, and yet there is still data to be written out
  277. if len(*parts)-1 == i && len(p)-1 > pos {
  278. count, err = reWriter.ReWrite(p[pos:], false)
  279. written += count
  280. if err != nil {
  281. break
  282. }
  283. pos += count
  284. msgPos += uint(count)
  285. }
  286. // if there's no more data
  287. if pos >= len(p) {
  288. break
  289. }
  290. }
  291. if len(*parts) > 2 {
  292. progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
  293. }
  294. }
  295. // note that in this case, ReWrite method will output the stream to further processors down the line
  296. // here we just return back with the result
  297. return written, err
  298. })
  299. }
  300. return sd
  301. }