s_transformer.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package backends
  2. import (
  3. "bytes"
  4. "github.com/flashmob/go-guerrilla/chunk/transfer"
  5. "github.com/flashmob/go-guerrilla/mail"
  6. "github.com/flashmob/go-guerrilla/mail/mime"
  7. "io"
  8. "regexp"
  9. "sync"
  10. )
  11. // ----------------------------------------------------------------------------------
  12. // Processor Name: transformer
  13. // ----------------------------------------------------------------------------------
  14. // Description : Transforms from base64 / q-printable to 8bit and converts charset to utf-8
  15. // ----------------------------------------------------------------------------------
  16. // Config Options:
  17. // --------------:-------------------------------------------------------------------
  18. // Input : envelope.Values["MimeParts"]
  19. // ----------------------------------------------------------------------------------
  20. // Output : 8bit mime message, with charsets decoded to UTF-8
  21. // : Note that this processor changes the body counts. Therefore, it makes
  22. // : a new instance of envelope.Values["MimeParts"] which is then populated
  23. // : by parsing the new re-written message
  24. // ----------------------------------------------------------------------------------
  25. func init() {
  26. Streamers["transformer"] = func() *StreamDecorator {
  27. return Transformer()
  28. }
  29. }
  30. type TransformerConfig struct {
  31. // we can add any config here
  32. }
  33. type Transform struct {
  34. sp io.Writer
  35. isBody bool // the next bytes to be sent are body?
  36. buf bytes.Buffer
  37. current *mime.Part
  38. decoder io.Reader
  39. pr *io.PipeReader
  40. pw *io.PipeWriter
  41. partsCachedOriginal *mime.Parts
  42. envelope *mail.Envelope
  43. // we re-parse the output since the counts have changed
  44. // parser implements the io.Writer interface, here output will be sent to it and then forwarded to the next processor
  45. parser *mime.Parser
  46. }
  47. // cache the original parts from envelope.Values
  48. // and point them to our parts
  49. func (t *Transform) swap() *mime.Parts {
  50. if parts, ok := t.envelope.Values["MimeParts"].(*mime.Parts); ok {
  51. t.partsCachedOriginal = parts
  52. parts = &t.parser.Parts
  53. return parts
  54. }
  55. return nil
  56. }
  57. // point the parts from envelope.Values back to the original ones
  58. func (t *Transform) unswap() {
  59. if parts, ok := t.envelope.Values["MimeParts"].(*mime.Parts); ok {
  60. _ = parts
  61. parts = t.partsCachedOriginal
  62. }
  63. }
  64. func (t *Transform) ReWrite(b []byte) (count int, err error) {
  65. if !t.isBody {
  66. // we place the partial header's bytes on a buffer from which we can read one line at a time
  67. // then we match and replace the lines we want
  68. count = len(b)
  69. if i, err := io.Copy(&t.buf, bytes.NewReader(b)); err != nil {
  70. return int(i), err
  71. }
  72. for {
  73. line, rErr := t.buf.ReadBytes('\n')
  74. if rErr == nil {
  75. if bytes.Contains(line, []byte("Content-Transfer-Encoding: base64")) {
  76. line = bytes.Replace(line, []byte("base64"), []byte("8bit"), 1)
  77. t.current.TransferEncoding = "8bit"
  78. t.current.Charset = "utf8"
  79. } else if bytes.Contains(line, []byte("charset=")) {
  80. rx := regexp.MustCompile("charset=\".+?\"")
  81. line = rx.ReplaceAll(line, []byte("charset=\"utf8\""))
  82. }
  83. _, err = io.Copy(t.parser, bytes.NewReader(line))
  84. if err != nil {
  85. return
  86. }
  87. } else {
  88. break
  89. }
  90. }
  91. } else {
  92. // do body decode here
  93. t.pr, t.pw = io.Pipe()
  94. if t.decoder == nil {
  95. t.buf.Reset()
  96. // the decoder will be reading from an underlying pipe
  97. t.decoder, err = transfer.NewBodyDecoder(t.pr, transfer.Base64, "iso-8859-1")
  98. }
  99. wg := sync.WaitGroup{}
  100. wg.Add(1)
  101. go func() {
  102. // stream our slice to the pipe
  103. defer wg.Done()
  104. _, pRrr := io.Copy(t.pw, bytes.NewReader(b))
  105. if pRrr != nil {
  106. _ = t.pw.CloseWithError(err)
  107. return
  108. }
  109. _ = t.pw.Close()
  110. }()
  111. // do the decoding
  112. var i int64
  113. i, err = io.Copy(t.parser, t.decoder)
  114. // wait for the pipe to finish
  115. _ = i
  116. wg.Wait()
  117. _ = t.pr.Close()
  118. count = len(b)
  119. }
  120. return count, err
  121. }
  122. func (t *Transform) Reset() {
  123. t.decoder = nil
  124. }
  125. func Transformer() *StreamDecorator {
  126. var conf *TransformerConfig
  127. Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
  128. configType := BaseConfig(&HeaderConfig{})
  129. bcfg, err := Svc.ExtractConfig(backendConfig, configType)
  130. if err != nil {
  131. return err
  132. }
  133. conf = bcfg.(*TransformerConfig)
  134. _ = conf
  135. return nil
  136. }))
  137. var msgPos uint
  138. var progress int
  139. reWriter := Transform{}
  140. sd := &StreamDecorator{}
  141. sd.Decorate =
  142. func(sp StreamProcessor, a ...interface{}) StreamProcessor {
  143. var envelope *mail.Envelope
  144. if reWriter.sp == nil {
  145. reWriter.sp = sp
  146. }
  147. sd.Open = func(e *mail.Envelope) error {
  148. envelope = e
  149. _ = envelope
  150. if reWriter.parser == nil {
  151. reWriter.parser = mime.NewMimeParserWriter(sp)
  152. reWriter.parser.Open()
  153. }
  154. reWriter.envelope = envelope
  155. return nil
  156. }
  157. return StreamProcessWith(func(p []byte) (count int, err error) {
  158. var total int
  159. if parts, ok := envelope.Values["MimeParts"].(*mime.Parts); ok && len(*parts) > 0 {
  160. // we are going to change envelope.Values["MimeParts"] to our own copy with our own counts
  161. envelope.Values["MimeParts"] = reWriter.swap()
  162. defer reWriter.unswap()
  163. var pos int
  164. offset := msgPos
  165. reWriter.current = (*parts)[0]
  166. for i := progress; i < len(*parts); i++ {
  167. part := (*parts)[i]
  168. // break chunk on new part
  169. if part.StartingPos > 0 && part.StartingPos > msgPos {
  170. reWriter.isBody = false
  171. count, err = reWriter.ReWrite(p[pos : part.StartingPos-offset])
  172. total += count
  173. if err != nil {
  174. break
  175. }
  176. reWriter.current = part
  177. pos += count
  178. msgPos = part.StartingPos
  179. }
  180. // break chunk on header (found the body)
  181. if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
  182. count, err = reWriter.ReWrite(p[pos : part.StartingPosBody-offset])
  183. total += count
  184. if err != nil {
  185. break
  186. }
  187. _, _ = reWriter.parser.Write([]byte{'\n'}) // send an end of header to the parser
  188. reWriter.isBody = true
  189. reWriter.current = part
  190. pos += count
  191. msgPos = part.StartingPosBody
  192. }
  193. // if on the latest (last) part, and yet there is still data to be written out
  194. if len(*parts)-1 == i && len(p)-1 > pos {
  195. count, err = reWriter.ReWrite(p[pos:])
  196. total += count
  197. if err != nil {
  198. break
  199. }
  200. pos += count
  201. msgPos += uint(count)
  202. }
  203. // if there's no more data
  204. if pos >= len(p) {
  205. break
  206. }
  207. }
  208. if len(*parts) > 2 {
  209. progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
  210. }
  211. }
  212. // note that in this case, ReWrite method will output the stream to further processors down the line
  213. // here we just return back with the result
  214. return total, err
  215. })
  216. }
  217. return sd
  218. }