Browse Source

- default stream buffer size of 4096
- decorated the smtp dot-reader to also parse the header while reading the body as a stream
- ParseHeaders() only parses the subject. Headers are now parsed by the stream based mime parser
- The new mime-parsing smtp dot-reader has been placed in reader.go
- several tests updated to reflect the refactoring

flashmob 6 years ago
parent
commit
190c862e91
10 changed files with 220 additions and 78 deletions
  1. 3 3
      api_test.go
  2. 5 1
      backends/gateway.go
  3. 14 2
      backends/gateway_test.go
  4. 2 4
      backends/s_mime.go
  5. 2 3
      client.go
  6. 16 25
      mail/envelope.go
  7. 17 3
      mail/envelope_test.go
  8. 79 35
      mail/mime/mime.go
  9. 76 0
      mail/reader.go
  10. 6 2
      server.go

+ 3 - 3
api_test.go

@@ -779,7 +779,7 @@ func TestStreamProcessor(t *testing.T) {
 
 
 }
 }
 
 
-var mime = `MIME-Version: 1.0
+var mime0 = `MIME-Version: 1.0
 X-Mailer: MailBee.NET 8.0.4.428
 X-Mailer: MailBee.NET 8.0.4.428
 Subject: test 
 Subject: test 
  subject
  subject
@@ -1058,9 +1058,9 @@ func TestStreamMimeProcessor(t *testing.T) {
 	}()
 	}()
 
 
 	// change \n to \r\n
 	// change \n to \r\n
-	mime = strings.Replace(mime2, "\n", "\r\n", -1)
+	mime0 = strings.Replace(mime2, "\n", "\r\n", -1)
 	// lets have a talk with the server
 	// lets have a talk with the server
-	if err := talkToServer("127.0.0.1:2525", mime); err != nil {
+	if err := talkToServer("127.0.0.1:2525", mime0); err != nil {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
 
 

+ 5 - 1
backends/gateway.go

@@ -127,6 +127,10 @@ const (
 	// default timeout for validating rcpt to, if 'gw_val_rcpt_timeout' not present in config
 	// default timeout for validating rcpt to, if 'gw_val_rcpt_timeout' not present in config
 	validateRcptTimeout = time.Second * 5
 	validateRcptTimeout = time.Second * 5
 	defaultProcessor    = "Debugger"
 	defaultProcessor    = "Debugger"
+
+	// streamBufferSize sets the size of the buffer for the streaming processors,
+	// can be configured using `stream_buffer_size`
+	streamBufferSize = 4096
 )
 )
 
 
 func (s backendState) String() string {
 func (s backendState) String() string {
@@ -473,7 +477,7 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 		gw.conveyor = make(chan *workerMsg, workersSize)
 		gw.conveyor = make(chan *workerMsg, workersSize)
 	}
 	}
 
 
-	size := 4096 // 4096
+	size := streamBufferSize
 	if gw.gwConfig.StreamBufferSize > 0 {
 	if gw.gwConfig.StreamBufferSize > 0 {
 		size = gw.gwConfig.StreamBufferSize
 		size = gw.gwConfig.StreamBufferSize
 	}
 	}

+ 14 - 2
backends/gateway_test.go

@@ -1,9 +1,12 @@
 package backends
 package backends
 
 
 import (
 import (
+	"bufio"
+	"bytes"
 	"fmt"
 	"fmt"
 	"github.com/flashmob/go-guerrilla/log"
 	"github.com/flashmob/go-guerrilla/log"
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mail"
+	"io"
 	"strings"
 	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
@@ -71,7 +74,7 @@ func TestStartProcessStop(t *testing.T) {
 		t.Fail()
 		t.Fail()
 	}
 	}
 	if gateway.State != BackendStateRunning {
 	if gateway.State != BackendStateRunning {
-		t.Error("gateway.State is not in rinning state, got ", gateway.State)
+		t.Error("gateway.State is not in running state, got ", gateway.State)
 	}
 	}
 	// can we place an envelope on the conveyor channel?
 	// can we place an envelope on the conveyor channel?
 
 
@@ -83,7 +86,16 @@ func TestStartProcessStop(t *testing.T) {
 		TLS:      true,
 		TLS:      true,
 	}
 	}
 	e.PushRcpt(mail.Address{User: "test", Host: "example.com"})
 	e.PushRcpt(mail.Address{User: "test", Host: "example.com"})
-	e.Data.WriteString("Subject:Test\n\nThis is a test.")
+	//e.Data.WriteString("Subject:Test\n\nThis is a test.")
+	in := "Subject: Test\n\nThis is a test.\n.\n"
+	mdr := mail.NewMimeDotReader(bufio.NewReader(bytes.NewBufferString(in)), 1)
+	i, err := io.Copy(&e.Data, mdr)
+	if err != nil && err != io.EOF {
+		t.Error(err, "cannot copy buffer", i, err)
+	}
+	if p := mdr.Parts(); p != nil && len(p) > 0 {
+		e.Header = p[0].Headers
+	}
 	notify := make(chan *notifyMsg)
 	notify := make(chan *notifyMsg)
 
 
 	gateway.conveyor <- &workerMsg{e, notify, TaskSaveMail, nil}
 	gateway.conveyor <- &workerMsg{e, notify, TaskSaveMail, nil}

+ 2 - 4
backends/s_mime.go

@@ -65,8 +65,8 @@ func StreamMimeAnalyzer() *StreamDecorator {
 				}()
 				}()
 
 
 				if parseErr == nil {
 				if parseErr == nil {
-					err := parser.Close()
-					return err
+					_ = parser.Close()
+					return nil
 				} else {
 				} else {
 					return parseErr
 					return parseErr
 				}
 				}
@@ -74,9 +74,7 @@ func StreamMimeAnalyzer() *StreamDecorator {
 
 
 			return StreamProcessWith(func(p []byte) (int, error) {
 			return StreamProcessWith(func(p []byte) (int, error) {
 				_ = envelope
 				_ = envelope
-				if len(envelope.Header) > 0 {
 
 
-				}
 				if _, ok := envelope.Values["MimeParts"]; !ok {
 				if _, ok := envelope.Values["MimeParts"]; !ok {
 					envelope.Values["MimeParts"] = &parser.Parts
 					envelope.Values["MimeParts"] = &parser.Parts
 				}
 				}

+ 2 - 3
client.go

@@ -11,7 +11,6 @@ import (
 	"github.com/flashmob/go-guerrilla/mail/rfc5321"
 	"github.com/flashmob/go-guerrilla/mail/rfc5321"
 	"github.com/flashmob/go-guerrilla/response"
 	"github.com/flashmob/go-guerrilla/response"
 	"net"
 	"net"
-	"net/textproto"
 	"sync"
 	"sync"
 	"time"
 	"time"
 )
 )
@@ -47,7 +46,7 @@ type client struct {
 	conn       net.Conn
 	conn       net.Conn
 	bufin      *smtpBufferedReader
 	bufin      *smtpBufferedReader
 	bufout     *bufio.Writer
 	bufout     *bufio.Writer
-	smtpReader *textproto.Reader
+	smtpReader *mail.MimeDotReader
 	ar         *adjustableLimitedReader
 	ar         *adjustableLimitedReader
 	// guards access to conn
 	// guards access to conn
 	connGuard sync.Mutex
 	connGuard sync.Mutex
@@ -70,7 +69,7 @@ func NewClient(conn net.Conn, clientID uint64, logger log.Logger, envelope *mail
 	}
 	}
 
 
 	// used for reading the DATA state
 	// used for reading the DATA state
-	c.smtpReader = textproto.NewReader(c.bufin.Reader)
+	c.smtpReader = mail.NewMimeDotReader(c.bufin.Reader, 1)
 	return c
 	return c
 }
 }
 
 

+ 16 - 25
mail/envelope.go

@@ -1,7 +1,6 @@
 package mail
 package mail
 
 
 import (
 import (
-	"bufio"
 	"bytes"
 	"bytes"
 	"crypto/md5"
 	"crypto/md5"
 	"errors"
 	"errors"
@@ -13,6 +12,8 @@ import (
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 	"time"
 	"time"
+
+	mimelib "github.com/flashmob/go-guerrilla/mail/mime"
 )
 )
 
 
 // A WordDecoder decodes MIME headers containing RFC 2047 encoded-words.
 // A WordDecoder decodes MIME headers containing RFC 2047 encoded-words.
@@ -27,8 +28,6 @@ func init() {
 	Dec = mime.WordDecoder{}
 	Dec = mime.WordDecoder{}
 }
 }
 
 
-const maxHeaderChunk = 1 + (4 << 10) // 4KB
-
 // Address encodes an email address of the form `<user@host>`
 // Address encodes an email address of the form `<user@host>`
 type Address struct {
 type Address struct {
 	// User is local part
 	// User is local part
@@ -113,36 +112,28 @@ func queuedID(clientID uint64) string {
 	return fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID))))
 	return fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID))))
 }
 }
 
 
+func (e *Envelope) setHeaders(p []*mimelib.Part) {
+	if p != nil && len(p) > 0 {
+		e.Header = p[0].Headers
+	}
+}
+
 // ParseHeaders parses the headers into Header field of the Envelope struct.
 // ParseHeaders parses the headers into Header field of the Envelope struct.
 // Data buffer must be full before calling.
 // Data buffer must be full before calling.
 // It assumes that at most 30kb of email data can be a header
 // It assumes that at most 30kb of email data can be a header
 // Decoding of encoding to UTF is only done on the Subject, where the result is assigned to the Subject field
 // Decoding of encoding to UTF is only done on the Subject, where the result is assigned to the Subject field
 func (e *Envelope) ParseHeaders() error {
 func (e *Envelope) ParseHeaders() error {
-	var err error
-	if e.Header != nil {
-		return errors.New("headers already parsed")
+	if e.Header == nil {
+		return errors.New("headers not parsed")
 	}
 	}
-	buf := e.Data.Bytes()
-	// find where the header ends, assuming that over 30 kb would be max
-	if len(buf) > maxHeaderChunk {
-		buf = buf[:maxHeaderChunk]
+	if len(e.Header) == 0 {
+		return errors.New("header not found")
 	}
 	}
-
-	headerEnd := bytes.Index(buf, []byte{'\n', '\n'}) // the first two new-lines chars are the End Of Header
-	if headerEnd > -1 {
-		header := buf[0:headerEnd]
-		headerReader := textproto.NewReader(bufio.NewReader(bytes.NewBuffer(header)))
-		e.Header, err = headerReader.ReadMIMEHeader()
-		if err != nil {
-			// decode the subject
-			if subject, ok := e.Header["Subject"]; ok {
-				e.Subject = MimeHeaderDecode(subject[0])
-			}
-		}
-	} else {
-		err = errors.New("header not found")
+	// decode the subject
+	if subject, ok := e.Header["Subject"]; ok {
+		e.Subject = MimeHeaderDecode(subject[0])
 	}
 	}
-	return err
+	return nil
 }
 }
 
 
 // Len returns the number of bytes that would be in the reader returned by NewReader()
 // Len returns the number of bytes that would be in the reader returned by NewReader()

+ 17 - 3
mail/envelope_test.go

@@ -1,6 +1,8 @@
 package mail
 package mail
 
 
 import (
 import (
+	"bufio"
+	"bytes"
 	"io"
 	"io"
 	"io/ioutil"
 	"io/ioutil"
 	"strings"
 	"strings"
@@ -52,7 +54,18 @@ func TestEnvelope(t *testing.T) {
 	if to.String() != "[email protected]" {
 	if to.String() != "[email protected]" {
 		t.Error("to does not equal [email protected], it was:", to.String())
 		t.Error("to does not equal [email protected], it was:", to.String())
 	}
 	}
-	e.Data.WriteString("Subject: Test\n\nThis is a test nbnb nbnb hgghgh nnnbnb nbnbnb nbnbn.")
+	// we feed the input through the NewMineDotReader, it will parse the headers while reading the input
+	// the input has a single line header and ends with a line with a single .
+	in := "Subject: =?utf-8?B?55So5oi34oCcRXBpZGVtaW9sb2d5IGluIG51cnNpbmcgYW5kIGg=?=\n\nThis is a test nbnb nbnb hgghgh nnnbnb nbnbnb nbnbn.\n.\n"
+	mdr := NewMimeDotReader(bufio.NewReader(bytes.NewBufferString(in)), 1)
+	i, err := io.Copy(&e.Data, mdr)
+	if err != nil && err != io.EOF {
+		t.Error(err, "cannot copy buffer", i, err)
+	}
+	// pass the parsed headers to the envelope
+	if p := mdr.Parts(); p != nil && len(p) > 0 {
+		e.Header = p[0].Headers
+	}
 
 
 	addHead := "Delivered-To: " + to.String() + "\n"
 	addHead := "Delivered-To: " + to.String() + "\n"
 	addHead += "Received: from " + e.Helo + " (" + e.Helo + "  [" + e.RemoteIP + "])\n"
 	addHead += "Received: from " + e.Helo + " (" + e.Helo + "  [" + e.RemoteIP + "])\n"
@@ -64,12 +77,13 @@ func TestEnvelope(t *testing.T) {
 	if len(data) != e.Len() {
 	if len(data) != e.Len() {
 		t.Error("e.Len() is incorrect, it shown ", e.Len(), " but we wanted ", len(data))
 		t.Error("e.Len() is incorrect, it shown ", e.Len(), " but we wanted ", len(data))
 	}
 	}
+
 	if err := e.ParseHeaders(); err != nil && err != io.EOF {
 	if err := e.ParseHeaders(); err != nil && err != io.EOF {
 		t.Error("cannot parse headers:", err)
 		t.Error("cannot parse headers:", err)
 		return
 		return
 	}
 	}
-	if e.Subject != "Test" {
-		t.Error("Subject expecting: Test, got:", e.Subject)
+	if e.Subject != "用户“Epidemiology in nursing and h" {
+		t.Error("Subject expecting: 用户“Epidemiology in nursing and h, got:", e.Subject)
 	}
 	}
 
 
 }
 }

+ 79 - 35
mail/mime/mime.go

@@ -20,50 +20,61 @@ import (
 )
 )
 
 
 const (
 const (
-	maxBoundaryLen       = 70 + 10
-	doubleDash           = "--"
-	startPos             = -1
-	headerErrorThreshold = 4
-)
+	// maxBoundaryLen limits the length of the content-boundary.
+	// Technically the limit is 79, but here we are more liberal
+	maxBoundaryLen = 70 + 10
 
 
-type boundaryEnd struct {
-	cb string
-}
+	// doubleDash is the prefix for a content-boundary string. It is also added
+	// as a postfix to a content-boundary string to signal the end of content parts.
+	doubleDash = "--"
 
 
-func (e boundaryEnd) Error() string {
-	return e.cb
-}
+	// startPos assigns the pos property when the buffer is set.
+	// The reason why -1 is because peek() implementation becomes simpler
+	startPos = -1
+
+	// headerErrorThreshold how many errors in the header
+	headerErrorThreshold = 4
+
+	// MaxNodes limits the number of items in the Parts array. Effectively limiting
+	// the number of nested calls the parser may make.
+	MaxNodes = 512
+)
 
 
 var NotMime = errors.New("not Mime")
 var NotMime = errors.New("not Mime")
+var MaxNodesErr = errors.New("too many mime part nodes")
 
 
 type captureBuffer struct {
 type captureBuffer struct {
 	bytes.Buffer
 	bytes.Buffer
-	upper bool
+	upper bool // flag used by acceptHeaderName(), if true, the next accepted chr will be uppercase'd
 }
 }
 
 
 type Parser struct {
 type Parser struct {
 
 
 	// related to the state of the parser
 	// related to the state of the parser
 
 
-	buf                   []byte
-	pos                   int
-	peekOffset            int
-	ch                    byte
-	gotNewSlice, consumed chan bool
-	accept                captureBuffer
-	boundaryMatched       int
-	count                 uint
-	result                chan parserMsg
-	mux                   sync.Mutex
+	buf                   []byte         // input buffer
+	pos                   int            // position in the input buffer
+	peekOffset            int            // peek() ignores \r so we must keep count of how many \r were ignored
+	ch                    byte           // value of byte at current pos in buf[]. At EOF, ch == 0
+	gotNewSlice, consumed chan bool      // flags that control the synchronisation of reads
+	accept                captureBuffer  // input is captured in to this buffer to build strings
+	boundaryMatched       int            // an offset. Used in cases where the boundary string is split over multiple buffers
+	count                 uint           // counts how many times Parse() was called
+	result                chan parserMsg // used to pass the parsing result back to the main goroutine
+	mux                   sync.Mutex     // ensure calls to Parse() and Close() are synchronized
 
 
 	// Parts is the mime parts tree. The parser builds the parts as it consumes the input
 	// Parts is the mime parts tree. The parser builds the parts as it consumes the input
 	// In order to represent the tree in an array, we use Parts.Node to store the name of
 	// In order to represent the tree in an array, we use Parts.Node to store the name of
 	// each node. The name of the node is the *path* of the node. The root node is always
 	// each node. The name of the node is the *path* of the node. The root node is always
 	// "1". The child would be "1.1", the next sibling would be "1.2", while the child of
 	// "1". The child would be "1.1", the next sibling would be "1.2", while the child of
 	// "1.2" would be "1.2.1"
 	// "1.2" would be "1.2.1"
-	Parts           []*Part
-	msgPos          uint
-	lastBoundaryPos uint
+	Parts []*Part
+
+	msgPos uint // global position in the message
+
+	lastBoundaryPos uint // the last msgPos where a boundary was detected
+
+	maxNodes int // the desired number of maximum nodes the parser is limited to
 }
 }
 
 
 type Part struct {
 type Part struct {
@@ -254,7 +265,9 @@ func (p *Parser) skip(nBytes int) {
 		p.pos += remainder - 1
 		p.pos += remainder - 1
 		p.msgPos += uint(remainder - 1)
 		p.msgPos += uint(remainder - 1)
 		p.next()
 		p.next()
-		if nBytes < 1 {
+		if p.ch == 0 {
+			return
+		} else if nBytes < 1 {
 			return
 			return
 		}
 		}
 	}
 	}
@@ -376,7 +389,7 @@ func (p *Parser) transportPadding() (err error) {
 	}
 	}
 }
 }
 
 
-// acceptHeaderName build the header name in the buffer while ensuring that
+// acceptHeaderName builds the header name in the buffer while ensuring that
 // that the case is normalized. Ie. Content-type is written as Content-Type
 // that the case is normalized. Ie. Content-type is written as Content-Type
 func (p *Parser) acceptHeaderName() {
 func (p *Parser) acceptHeaderName() {
 	if p.accept.upper && p.ch >= 'a' && p.ch <= 'z' {
 	if p.accept.upper && p.ch >= 'a' && p.ch <= 'z' {
@@ -457,6 +470,9 @@ func (p *Parser) header(mh *Part) (err error) {
 					switch {
 					switch {
 					case contentType.parameters[i].name == "boundary":
 					case contentType.parameters[i].name == "boundary":
 						mh.ContentBoundary = contentType.parameters[i].value
 						mh.ContentBoundary = contentType.parameters[i].value
+						if len(mh.ContentBoundary) >= maxBoundaryLen {
+							return errors.New("boundary exceeded max length")
+						}
 					case contentType.parameters[i].name == "charset":
 					case contentType.parameters[i].name == "charset":
 						mh.Charset = contentType.parameters[i].value
 						mh.Charset = contentType.parameters[i].value
 					case contentType.parameters[i].name == "name":
 					case contentType.parameters[i].name == "name":
@@ -746,15 +762,36 @@ func (p *Parser) parameter() (attribute, value string, err error) {
 // mime scans the mime content and builds the mime-part tree in
 // mime scans the mime content and builds the mime-part tree in
 // p.Parts on-the-fly, as more bytes get fed in.
 // p.Parts on-the-fly, as more bytes get fed in.
 func (p *Parser) mime(part *Part, cb string) (err error) {
 func (p *Parser) mime(part *Part, cb string) (err error) {
-
+	if len(p.Parts) >= p.maxNodes {
+		for {
+			// skip until the end of the stream (we've stopped parsing due to max nodes)
+			p.skip(len(p.buf) + 1)
+			if p.ch == 0 {
+				break
+			}
+		}
+		if p.maxNodes == 1 {
+			// in this case, only one header item, so assume the end of message is
+			// the ending position of the header
+			p.Parts[0].EndingPos = p.msgPos
+			p.Parts[0].EndingPosBody = p.msgPos
+		} else {
+			err = MaxNodesErr
+		}
+		return
+	}
 	count := 1
 	count := 1
 	root := part == nil
 	root := part == nil
 	if root {
 	if root {
 		part = newPart()
 		part = newPart()
 		p.addPart(part, "1")
 		p.addPart(part, "1")
 		defer func() {
 		defer func() {
-			if part != nil {
+			if err != MaxNodesErr {
 				part.EndingPosBody = p.lastBoundaryPos
 				part.EndingPosBody = p.lastBoundaryPos
+			} else {
+				// remove the unfinished node (edge case)
+				var parts []*Part
+				p.Parts = append(parts, p.Parts[:p.maxNodes]...)
 			}
 			}
 		}()
 		}()
 	}
 	}
@@ -851,6 +888,7 @@ func (p *Parser) reset() {
 	p.ch = 0
 	p.ch = 0
 }
 }
 
 
+// Open prepares the parser for accepting input
 func (p *Parser) Open() {
 func (p *Parser) Open() {
 	p.Parts = make([]*Part, 0)
 	p.Parts = make([]*Part, 0)
 }
 }
@@ -913,11 +951,6 @@ func (p *Parser) Parse(buf []byte) error {
 		go func() {
 		go func() {
 			p.next()
 			p.next()
 			err := p.mime(nil, "")
 			err := p.mime(nil, "")
-			if _, ok := err.(boundaryEnd); ok {
-				err = nil
-			}
-			fmt.Println("mine() ret", err)
-
 			p.result <- parserMsg{err}
 			p.result <- parserMsg{err}
 		}()
 		}()
 	} else {
 	} else {
@@ -936,17 +969,28 @@ func (p *Parser) Parse(buf []byte) error {
 	}
 	}
 }
 }
 
 
+// ParseError returns true if the type of error was a parse error
+// Returns false if it was an io.EOF or the email was not mime, or exceeded maximum nodes
 func (p *Parser) ParseError(err error) bool {
 func (p *Parser) ParseError(err error) bool {
-	if err != nil && err != io.EOF && err != NotMime {
+	if err != nil && err != io.EOF && err != NotMime && err != MaxNodesErr {
 		return true
 		return true
 	}
 	}
 	return false
 	return false
 }
 }
 
 
+// NewMimeParser returns a mime parser. See MaxNodes for how many nodes it's limited to
 func NewMimeParser() *Parser {
 func NewMimeParser() *Parser {
 	p := new(Parser)
 	p := new(Parser)
 	p.consumed = make(chan bool)
 	p.consumed = make(chan bool)
 	p.gotNewSlice = make(chan bool)
 	p.gotNewSlice = make(chan bool)
 	p.result = make(chan parserMsg, 1)
 	p.result = make(chan parserMsg, 1)
+	p.maxNodes = MaxNodes
+	return p
+}
+
+// NewMimeParser returns a mime parser with a custom MaxNodes value
+func NewMimeParserLimited(maxNodes int) *Parser {
+	p := NewMimeParser()
+	p.maxNodes = maxNodes
 	return p
 	return p
 }
 }

+ 76 - 0
mail/reader.go

@@ -0,0 +1,76 @@
+package mail
+
+import (
+	"bufio"
+	"io"
+	"net/textproto"
+
+	"github.com/flashmob/go-guerrilla/mail/mime"
+)
+
+// MimeDotReader parses the mime structure while reading using the underlying reader
+type MimeDotReader struct {
+	R       io.Reader
+	p       *mime.Parser
+	mimeErr error
+}
+
+// Read parses the mime structure wile reading. Results are immediately available in
+// the data-structure returned from Parts() after each read.
+func (r *MimeDotReader) Read(p []byte) (n int, err error) {
+	n, err = r.R.Read(p)
+	if n > 0 {
+		if r.mimeErr == nil {
+			r.mimeErr = r.p.Parse(p)
+		}
+	}
+	if err != nil {
+		if r.mimeErr == nil {
+			r.mimeErr = r.p.Close()
+		}
+		return
+	}
+	return
+}
+
+// Close closes the underlying reader if it's a ReadCloser and closes the mime parser
+func (r MimeDotReader) Close() (err error) {
+	if rc, t := r.R.(io.ReadCloser); t {
+		err = rc.Close()
+	}
+	// parser already closed?
+	if r.mimeErr != nil {
+		return r.mimeErr
+	}
+	// close the parser, only care about parse errors
+	if pErr := r.p.Close(); r.p.ParseError(pErr) {
+		err = pErr
+	}
+	return
+}
+
+// Parts returns the mime-header parts built by the parser
+func (r *MimeDotReader) Parts() []*mime.Part {
+	return r.p.Parts
+}
+
+// Returns the underlying io.Reader (which is a dotReader from textproto)
+// useful for reading from directly if mime parsing is not desirable.
+func (r *MimeDotReader) DotReader() io.Reader {
+	return r.R
+}
+
+// NewMimeDotReader returns a pointer to a new MimeDotReader
+// br is the underlying reader it will read from
+// maxNodes limits the number of nodes can be added to the mime tree before the mime-parser aborts
+func NewMimeDotReader(br *bufio.Reader, maxNodes int) *MimeDotReader {
+	r := new(MimeDotReader)
+	r.R = textproto.NewReader(br).DotReader()
+	if maxNodes > 0 {
+		r.p = mime.NewMimeParserLimited(maxNodes)
+	} else {
+		r.p = mime.NewMimeParser()
+	}
+	r.p.Open()
+	return r
+}

+ 6 - 2
server.go

@@ -553,10 +553,14 @@ func (s *server) handleClient(client *client) {
 				res, err = be.ProcessStream(client.smtpReader.DotReader(), client.Envelope)
 				res, err = be.ProcessStream(client.smtpReader.DotReader(), client.Envelope)
 
 
 			} else {
 			} else {
-				// or buffer the entire message
-				n, err = client.Data.ReadFrom(client.smtpReader.DotReader())
+				// or buffer the entire message (parse headers & mime structure as we go along)
+				n, err = client.Data.ReadFrom(client.smtpReader)
 				if n > sc.MaxSize {
 				if n > sc.MaxSize {
 					err = fmt.Errorf("maximum DATA size exceeded (%d)", sc.MaxSize)
 					err = fmt.Errorf("maximum DATA size exceeded (%d)", sc.MaxSize)
+				} else {
+					if p := client.smtpReader.Parts(); p != nil && len(p) > 0 {
+						client.Envelope.Header = p[0].Headers
+					}
 				}
 				}
 			}
 			}