envelope.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package mail
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/md5"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "mime"
  10. "net/mail"
  11. "net/textproto"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // A WordDecoder decodes MIME headers containing RFC 2047 encoded-words.
  17. // Used by the MimeHeaderDecode function.
  18. // It's exposed public so that an alternative decoder can be set, eg Gnu iconv
  19. // by importing the mail/inconv package.
  20. // Another alternative would be to use https://godoc.org/golang.org/x/text/encoding
  21. var Dec mime.WordDecoder
  22. func init() {
  23. // use the default decoder, without Gnu inconv. Import the mail/inconv package to use iconv.
  24. Dec = mime.WordDecoder{}
  25. }
  26. const maxHeaderChunk = 1 + (3 << 10) // 3KB
  27. // Address encodes an email address of the form `<user@host>`
  28. type Address struct {
  29. User string
  30. Host string
  31. }
  32. func (ep *Address) String() string {
  33. return fmt.Sprintf("%s@%s", ep.User, ep.Host)
  34. }
  35. func (ep *Address) IsEmpty() bool {
  36. return ep.User == "" && ep.Host == ""
  37. }
  38. var ap = mail.AddressParser{}
  39. // NewAddress takes a string of an RFC 5322 address of the
  40. // form "Gogh Fir <[email protected]>" or "[email protected]".
  41. func NewAddress(str string) (Address, error) {
  42. a, err := ap.Parse(str)
  43. if err != nil {
  44. return Address{}, err
  45. }
  46. pos := strings.Index(a.Address, "@")
  47. if pos > 0 {
  48. return Address{
  49. User: a.Address[0:pos],
  50. Host: a.Address[pos+1:],
  51. },
  52. nil
  53. }
  54. return Address{}, errors.New("invalid address")
  55. }
  56. // Email represents a single SMTP message.
  57. type Envelope struct {
  58. // Remote IP address
  59. RemoteIP string
  60. // Message sent in EHLO command
  61. Helo string
  62. // Sender
  63. MailFrom Address
  64. // Recipients
  65. RcptTo []Address
  66. // Data stores the header and message body
  67. Data bytes.Buffer
  68. // Subject stores the subject of the email, extracted and decoded after calling ParseHeaders()
  69. Subject string
  70. // TLS is true if the email was received using a TLS connection
  71. TLS bool
  72. // Header stores the results from ParseHeaders()
  73. Header textproto.MIMEHeader
  74. // Values hold the values generated when processing the envelope by the backend
  75. Values map[string]interface{}
  76. // Hashes of each email on the rcpt
  77. Hashes []string
  78. // additional delivery header that may be added
  79. DeliveryHeader string
  80. // Email(s) will be queued with this id
  81. QueuedId string
  82. // When locked, it means that the envelope is being processed by the backend
  83. sync.Mutex
  84. }
  85. func NewEnvelope(remoteAddr string, clientID uint64) *Envelope {
  86. return &Envelope{
  87. RemoteIP: remoteAddr,
  88. Values: make(map[string]interface{}),
  89. QueuedId: queuedID(clientID),
  90. }
  91. }
  92. func queuedID(clientID uint64) string {
  93. return fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID))))
  94. }
  95. // ParseHeaders parses the headers into Header field of the Envelope struct.
  96. // Data buffer must be full before calling.
  97. // It assumes that at most 30kb of email data can be a header
  98. // Decoding of encoding to UTF is only done on the Subject, where the result is assigned to the Subject field
  99. func (e *Envelope) ParseHeaders() error {
  100. var err error
  101. if e.Header != nil {
  102. return errors.New("headers already parsed")
  103. }
  104. buf := bytes.NewBuffer(e.Data.Bytes())
  105. // find where the header ends, assuming that over 30 kb would be max
  106. max := maxHeaderChunk
  107. if buf.Len() < max {
  108. max = buf.Len()
  109. }
  110. // read in the chunk which we'll scan for the header
  111. chunk := make([]byte, max)
  112. buf.Read(chunk)
  113. headerEnd := strings.Index(string(chunk), "\n\n") // the first two new-lines chars are the End Of Header
  114. if headerEnd > -1 {
  115. header := chunk[0:headerEnd]
  116. headerReader := textproto.NewReader(bufio.NewReader(bytes.NewBuffer(header)))
  117. e.Header, err = headerReader.ReadMIMEHeader()
  118. if err != nil {
  119. // decode the subject
  120. if subject, ok := e.Header["Subject"]; ok {
  121. e.Subject = MimeHeaderDecode(subject[0])
  122. }
  123. }
  124. } else {
  125. err = errors.New("header not found")
  126. }
  127. return err
  128. }
  129. // Len returns the number of bytes that would be in the reader returned by NewReader()
  130. func (e *Envelope) Len() int {
  131. return len(e.DeliveryHeader) + e.Data.Len()
  132. }
  133. // Returns a new reader for reading the email contents, including the delivery headers
  134. func (e *Envelope) NewReader() io.Reader {
  135. return io.MultiReader(
  136. strings.NewReader(e.DeliveryHeader),
  137. bytes.NewReader(e.Data.Bytes()),
  138. )
  139. }
  140. // String converts the email to string.
  141. // Typically, you would want to use the compressor guerrilla.Processor for more efficiency, or use NewReader
  142. func (e *Envelope) String() string {
  143. return e.DeliveryHeader + e.Data.String()
  144. }
  145. // ResetTransaction is called when the transaction is reset (keeping the connection open)
  146. func (e *Envelope) ResetTransaction() {
  147. // ensure not processing by the backend, will only get lock if finished, otherwise block
  148. e.Lock()
  149. // got the lock, it means processing finished
  150. e.Unlock()
  151. e.MailFrom = Address{}
  152. e.RcptTo = []Address{}
  153. // reset the data buffer, keep it allocated
  154. e.Data.Reset()
  155. // todo: these are probably good candidates for buffers / use sync.Pool (after profiling)
  156. e.Subject = ""
  157. e.Header = nil
  158. e.Hashes = make([]string, 0)
  159. e.DeliveryHeader = ""
  160. e.Values = make(map[string]interface{})
  161. }
  162. // Seed is called when used with a new connection, once it's accepted
  163. func (e *Envelope) Reseed(RemoteIP string, clientID uint64) {
  164. e.RemoteIP = RemoteIP
  165. e.QueuedId = queuedID(clientID)
  166. e.Helo = ""
  167. e.TLS = false
  168. }
  169. // PushRcpt adds a recipient email address to the envelope
  170. func (e *Envelope) PushRcpt(addr Address) {
  171. e.RcptTo = append(e.RcptTo, addr)
  172. }
  173. // Pop removes the last email address that was pushed to the envelope
  174. func (e *Envelope) PopRcpt() Address {
  175. ret := e.RcptTo[len(e.RcptTo)-1]
  176. e.RcptTo = e.RcptTo[:len(e.RcptTo)-1]
  177. return ret
  178. }
  179. // Converts 7 bit encoded mime header strings to UTF-8
  180. func MimeHeaderDecode(str string) string {
  181. state := 0
  182. var buf bytes.Buffer
  183. var out []byte
  184. for i := 0; i < len(str); i++ {
  185. switch state {
  186. case 0:
  187. if str[i] == '=' {
  188. buf.WriteByte(str[i])
  189. state = 1
  190. } else {
  191. out = append(out, str[i])
  192. }
  193. case 1:
  194. if str[i] == '?' {
  195. buf.WriteByte(str[i])
  196. state = 2
  197. } else {
  198. out = append(out, str[i])
  199. buf.Reset()
  200. state = 0
  201. }
  202. case 2:
  203. if str[i] == ' ' {
  204. d, err := Dec.Decode(buf.String())
  205. if err == nil {
  206. out = append(out, []byte(d)...)
  207. } else {
  208. out = append(out, buf.Bytes()...)
  209. }
  210. out = append(out, ' ')
  211. buf.Reset()
  212. state = 0
  213. } else {
  214. buf.WriteByte(str[i])
  215. }
  216. }
  217. }
  218. if buf.Len() > 0 {
  219. d, err := Dec.Decode(buf.String())
  220. if err == nil {
  221. out = append(out, []byte(d)...)
  222. } else {
  223. out = append(out, buf.Bytes()...)
  224. }
  225. }
  226. return string(out)
  227. }
  228. // Envelopes have their own pool
  229. type Pool struct {
  230. // envelopes that are ready to be borrowed
  231. pool chan *Envelope
  232. // semaphore to control number of maximum borrowed envelopes
  233. sem chan bool
  234. }
  235. func NewPool(poolSize int) *Pool {
  236. return &Pool{
  237. pool: make(chan *Envelope, poolSize),
  238. sem: make(chan bool, poolSize),
  239. }
  240. }
  241. func (p *Pool) Borrow(remoteAddr string, clientID uint64) *Envelope {
  242. var e *Envelope
  243. p.sem <- true // block the envelope until more room
  244. select {
  245. case e = <-p.pool:
  246. e.Reseed(remoteAddr, clientID)
  247. default:
  248. e = NewEnvelope(remoteAddr, clientID)
  249. }
  250. return e
  251. }
  252. // Return returns an envelope back to the envelope pool
  253. // Make sure that envelope finished processing before calling this
  254. func (p *Pool) Return(e *Envelope) {
  255. select {
  256. case p.pool <- e:
  257. //placed envelope back in pool
  258. default:
  259. // pool is full, discard it
  260. }
  261. // take a value off the semaphore to make room for more envelopes
  262. <-p.sem
  263. }