envelope.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package mail
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/md5"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "mime"
  10. "net/mail"
  11. "net/textproto"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. // A WordDecoder decodes MIME headers containing RFC 2047 encoded-words.
  17. // Used by the MimeHeaderDecode function.
  18. // It's exposed public so that an alternative decoder can be set, eg Gnu iconv
  19. // by importing the mail/inconv package.
  20. // Another alternative would be to use https://godoc.org/golang.org/x/text/encoding
  21. var Dec mime.WordDecoder
  22. func init() {
  23. // use the default decoder, without Gnu inconv. Import the mail/inconv package to use iconv.
  24. Dec = mime.WordDecoder{}
  25. }
  26. const maxHeaderChunk = 1 + (4 << 10) // 4KB
  27. // Address encodes an email address of the form `<user@host>`
  28. type Address struct {
  29. // User is local part
  30. User string
  31. // Host is the domain
  32. Host string
  33. // ADL is at-domain list if matched
  34. ADL []string
  35. // PathParams contains any ESTMP parameters that were matched
  36. PathParams [][]string
  37. // NullPath is true if <> was received
  38. NullPath bool
  39. }
  40. func (ep *Address) String() string {
  41. return fmt.Sprintf("%s@%s", ep.User, ep.Host)
  42. }
  43. func (ep *Address) IsEmpty() bool {
  44. return ep.User == "" && ep.Host == ""
  45. }
  46. var ap = mail.AddressParser{}
  47. // NewAddress takes a string of an RFC 5322 address of the
  48. // form "Gogh Fir <[email protected]>" or "[email protected]".
  49. func NewAddress(str string) (Address, error) {
  50. a, err := ap.Parse(str)
  51. if err != nil {
  52. return Address{}, err
  53. }
  54. pos := strings.Index(a.Address, "@")
  55. if pos > 0 {
  56. return Address{
  57. User: a.Address[0:pos],
  58. Host: a.Address[pos+1:],
  59. },
  60. nil
  61. }
  62. return Address{}, errors.New("invalid address")
  63. }
  64. // Email represents a single SMTP message.
  65. type Envelope struct {
  66. // Remote IP address
  67. RemoteIP string
  68. // Message sent in EHLO command
  69. Helo string
  70. // Sender
  71. MailFrom Address
  72. // Recipients
  73. RcptTo []Address
  74. // Data stores the header and message body
  75. Data bytes.Buffer
  76. // Subject stores the subject of the email, extracted and decoded after calling ParseHeaders()
  77. Subject string
  78. // TLS is true if the email was received using a TLS connection
  79. TLS bool
  80. // Header stores the results from ParseHeaders()
  81. Header textproto.MIMEHeader
  82. // Values hold the values generated when processing the envelope by the backend
  83. Values map[string]interface{}
  84. // Hashes of each email on the rcpt
  85. Hashes []string
  86. // additional delivery header that may be added
  87. DeliveryHeader string
  88. // Email(s) will be queued with this id
  89. QueuedId string
  90. // When locked, it means that the envelope is being processed by the backend
  91. sync.Mutex
  92. }
  93. func NewEnvelope(remoteAddr string, clientID uint64) *Envelope {
  94. return &Envelope{
  95. RemoteIP: remoteAddr,
  96. Values: make(map[string]interface{}),
  97. QueuedId: queuedID(clientID),
  98. }
  99. }
  100. func queuedID(clientID uint64) string {
  101. return fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID))))
  102. }
  103. // ParseHeaders parses the headers into Header field of the Envelope struct.
  104. // Data buffer must be full before calling.
  105. // It assumes that at most 30kb of email data can be a header
  106. // Decoding of encoding to UTF is only done on the Subject, where the result is assigned to the Subject field
  107. func (e *Envelope) ParseHeaders() error {
  108. var err error
  109. if e.Header != nil {
  110. return errors.New("headers already parsed")
  111. }
  112. buf := e.Data.Bytes()
  113. // find where the header ends, assuming that over 30 kb would be max
  114. if len(buf) > maxHeaderChunk {
  115. buf = buf[:maxHeaderChunk]
  116. }
  117. headerEnd := bytes.Index(buf, []byte{'\n', '\n'}) // the first two new-lines chars are the End Of Header
  118. if headerEnd > -1 {
  119. header := buf[0:headerEnd]
  120. headerReader := textproto.NewReader(bufio.NewReader(bytes.NewBuffer(header)))
  121. e.Header, err = headerReader.ReadMIMEHeader()
  122. if err != nil {
  123. // decode the subject
  124. if subject, ok := e.Header["Subject"]; ok {
  125. e.Subject = MimeHeaderDecode(subject[0])
  126. }
  127. }
  128. } else {
  129. err = errors.New("header not found")
  130. }
  131. return err
  132. }
  133. // Len returns the number of bytes that would be in the reader returned by NewReader()
  134. func (e *Envelope) Len() int {
  135. return len(e.DeliveryHeader) + e.Data.Len()
  136. }
  137. // Returns a new reader for reading the email contents, including the delivery headers
  138. func (e *Envelope) NewReader() io.Reader {
  139. return io.MultiReader(
  140. strings.NewReader(e.DeliveryHeader),
  141. bytes.NewReader(e.Data.Bytes()),
  142. )
  143. }
  144. // String converts the email to string.
  145. // Typically, you would want to use the compressor guerrilla.Processor for more efficiency, or use NewReader
  146. func (e *Envelope) String() string {
  147. return e.DeliveryHeader + e.Data.String()
  148. }
  149. // ResetTransaction is called when the transaction is reset (keeping the connection open)
  150. func (e *Envelope) ResetTransaction() {
  151. // ensure not processing by the backend, will only get lock if finished, otherwise block
  152. e.Lock()
  153. // got the lock, it means processing finished
  154. e.Unlock()
  155. e.MailFrom = Address{}
  156. e.RcptTo = []Address{}
  157. // reset the data buffer, keep it allocated
  158. e.Data.Reset()
  159. // todo: these are probably good candidates for buffers / use sync.Pool (after profiling)
  160. e.Subject = ""
  161. e.Header = nil
  162. e.Hashes = make([]string, 0)
  163. e.DeliveryHeader = ""
  164. e.Values = make(map[string]interface{})
  165. }
  166. // Seed is called when used with a new connection, once it's accepted
  167. func (e *Envelope) Reseed(RemoteIP string, clientID uint64) {
  168. e.RemoteIP = RemoteIP
  169. e.QueuedId = queuedID(clientID)
  170. e.Helo = ""
  171. e.TLS = false
  172. }
  173. // PushRcpt adds a recipient email address to the envelope
  174. func (e *Envelope) PushRcpt(addr Address) {
  175. e.RcptTo = append(e.RcptTo, addr)
  176. }
  177. // Pop removes the last email address that was pushed to the envelope
  178. func (e *Envelope) PopRcpt() Address {
  179. ret := e.RcptTo[len(e.RcptTo)-1]
  180. e.RcptTo = e.RcptTo[:len(e.RcptTo)-1]
  181. return ret
  182. }
  183. // Converts 7 bit encoded mime header strings to UTF-8
  184. func MimeHeaderDecode(str string) string {
  185. state := 0
  186. var buf bytes.Buffer
  187. var out []byte
  188. for i := 0; i < len(str); i++ {
  189. switch state {
  190. case 0:
  191. if str[i] == '=' {
  192. buf.WriteByte(str[i])
  193. state = 1
  194. } else {
  195. out = append(out, str[i])
  196. }
  197. case 1:
  198. if str[i] == '?' {
  199. buf.WriteByte(str[i])
  200. state = 2
  201. } else {
  202. out = append(out, str[i])
  203. buf.Reset()
  204. state = 0
  205. }
  206. case 2:
  207. if str[i] == ' ' {
  208. d, err := Dec.Decode(buf.String())
  209. if err == nil {
  210. out = append(out, []byte(d)...)
  211. } else {
  212. out = append(out, buf.Bytes()...)
  213. }
  214. out = append(out, ' ')
  215. buf.Reset()
  216. state = 0
  217. } else {
  218. buf.WriteByte(str[i])
  219. }
  220. }
  221. }
  222. if buf.Len() > 0 {
  223. d, err := Dec.Decode(buf.String())
  224. if err == nil {
  225. out = append(out, []byte(d)...)
  226. } else {
  227. out = append(out, buf.Bytes()...)
  228. }
  229. }
  230. return string(out)
  231. }
  232. // Envelopes have their own pool
  233. type Pool struct {
  234. // envelopes that are ready to be borrowed
  235. pool chan *Envelope
  236. // semaphore to control number of maximum borrowed envelopes
  237. sem chan bool
  238. }
  239. func NewPool(poolSize int) *Pool {
  240. return &Pool{
  241. pool: make(chan *Envelope, poolSize),
  242. sem: make(chan bool, poolSize),
  243. }
  244. }
  245. func (p *Pool) Borrow(remoteAddr string, clientID uint64) *Envelope {
  246. var e *Envelope
  247. p.sem <- true // block the envelope until more room
  248. select {
  249. case e = <-p.pool:
  250. e.Reseed(remoteAddr, clientID)
  251. default:
  252. e = NewEnvelope(remoteAddr, clientID)
  253. }
  254. return e
  255. }
  256. // Return returns an envelope back to the envelope pool
  257. // Make sure that envelope finished processing before calling this
  258. func (p *Pool) Return(e *Envelope) {
  259. select {
  260. case p.pool <- e:
  261. //placed envelope back in pool
  262. default:
  263. // pool is full, discard it
  264. }
  265. // take a value off the semaphore to make room for more envelopes
  266. <-p.sem
  267. }