s_headers_parser.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package backends
  2. import (
  3. "bufio"
  4. "bytes"
  5. "io"
  6. "net/textproto"
  7. "github.com/flashmob/go-guerrilla/mail"
  8. )
  9. func init() {
  10. streamers["headersparser"] = func() *StreamDecorator {
  11. return StreamHeadersParser()
  12. }
  13. }
  14. const stateHeaderScanning = 0
  15. const stateHeaderNotScanning = 1
  16. const headerMaxBytes = 1024 * 4
  17. func StreamHeadersParser() *StreamDecorator {
  18. sd := &StreamDecorator{}
  19. sd.p =
  20. func(sp StreamProcessor) StreamProcessor {
  21. // buf buffers the header
  22. var buf bytes.Buffer
  23. var state byte
  24. var lastByte byte
  25. var total int64
  26. var envelope *mail.Envelope
  27. parse := func() error {
  28. var err error
  29. // use a TeeReader to split the write to both sp and headerReader
  30. r := bufio.NewReader(io.TeeReader(&buf, sp))
  31. headerReader := textproto.NewReader(r)
  32. envelope.Header, err = headerReader.ReadMIMEHeader()
  33. if err != nil {
  34. if subject, ok := envelope.Header["Subject"]; ok {
  35. envelope.Subject = mail.MimeHeaderDecode(subject[0])
  36. }
  37. }
  38. return err
  39. }
  40. sd.Open = func(e *mail.Envelope) error {
  41. buf.Reset()
  42. state = 0
  43. lastByte = 0
  44. total = 0
  45. envelope = e
  46. return nil
  47. }
  48. sd.Close = func() error {
  49. // If header wasn't detected
  50. // pump out whatever is in the buffer to the underlying writer
  51. if state == stateHeaderScanning {
  52. _, err := io.Copy(sp, &buf)
  53. return err
  54. }
  55. return nil
  56. }
  57. return StreamProcessWith(func(p []byte) (int, error) {
  58. switch state {
  59. case stateHeaderScanning:
  60. // detect end of header \n\n
  61. headerEnd := bytes.Index(p, []byte{'\n', '\n'})
  62. if headerEnd == -1 && (lastByte == '\n' && p[0] == '\n') {
  63. headerEnd = 0
  64. }
  65. var remainder []byte // remainder are the non-header bytes after the \n\n
  66. if headerEnd > -1 {
  67. if len(p) > headerEnd {
  68. remainder = p[headerEnd:]
  69. }
  70. p = p[:headerEnd]
  71. }
  72. // read in the header to a temp buffer
  73. n, err := io.Copy(&buf, bytes.NewReader(p))
  74. lastByte = p[n-1] // remember the last byte read
  75. if headerEnd > -1 {
  76. // header found, parse it
  77. parseErr := parse()
  78. if parseErr != nil {
  79. Log().WithError(parseErr).Error("cannot parse headers")
  80. }
  81. // flush the remainder to the underlying writer
  82. if remainder != nil {
  83. n1, _ := sp.Write(remainder)
  84. n = n + int64(n1)
  85. }
  86. state = stateHeaderNotScanning
  87. } else {
  88. total += n
  89. // give up if we didn't detect the header after x bytes
  90. if total > headerMaxBytes {
  91. state = stateHeaderNotScanning
  92. n, err = io.Copy(sp, &buf)
  93. return int(n), err
  94. }
  95. }
  96. return int(n), err
  97. case stateHeaderNotScanning:
  98. // just forward everything to the next writer without buffering
  99. return sp.Write(p)
  100. }
  101. return sp.Write(p)
  102. })
  103. }
  104. return sd
  105. }