Browse Source

it gives the correct result when buffer is 256 bytes

flashmob 6 years ago
parent
commit
ebac03ba7c
6 changed files with 77 additions and 51 deletions
  1. 11 0
      api_test.go
  2. 1 0
      backends/gateway.go
  3. 8 4
      backends/s_mime.go
  4. 3 3
      backends/s_mysql_chunksaver.go
  5. 42 38
      mail/mime/mime.go
  6. 12 6
      mail/mime/mime_test.go

+ 11 - 0
api_test.go

@@ -11,6 +11,7 @@ import (
 	"io/ioutil"
 	"io/ioutil"
 	"net"
 	"net"
 	"os"
 	"os"
+	"runtime/pprof"
 	"strings"
 	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
@@ -921,6 +922,16 @@ func TestStreamMimeProcessor(t *testing.T) {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
 
 
+	go func() {
+		time.Sleep(time.Second * 15)
+		//panic("here")
+		//		*moo = *moo + 6
+
+		// for debugging deadlocks
+		pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
+		os.Exit(1)
+	}()
+
 	// change \n to \r\n
 	// change \n to \r\n
 	mime = strings.Replace(mime, "\n", "\r\n", -1)
 	mime = strings.Replace(mime, "\n", "\r\n", -1)
 	// lets have a talk with the server
 	// lets have a talk with the server

+ 1 - 0
backends/gateway.go

@@ -603,6 +603,7 @@ func (gw *BackendGateway) workDispatcher(
 				if err == nil {
 				if err == nil {
 					var buf []byte
 					var buf []byte
 					buf = make([]byte, 1024*4)
 					buf = make([]byte, 1024*4)
+					buf = make([]byte, 256)
 					if msg.e.Values["size"], err = io.CopyBuffer(stream, msg.r, buf); err != nil {
 					if msg.e.Values["size"], err = io.CopyBuffer(stream, msg.r, buf); err != nil {
 						Log().WithError(err).Error("stream writing failed")
 						Log().WithError(err).Error("stream writing failed")
 					}
 					}

+ 8 - 4
backends/s_mime.go

@@ -54,11 +54,15 @@ func StreamMimeAnalyzer() *StreamDecorator {
 			}
 			}
 
 
 			sd.Close = func() error {
 			sd.Close = func() error {
-				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.MimeHeader); ok {
-					for _, v := range *parts {
-						fmt.Println(v.Part + " " + strconv.Itoa(int(v.StartingPos)) + " " + strconv.Itoa(int(v.StartingPosBody)) + " " + strconv.Itoa(int(v.EndingPosBody)))
+
+				defer func() {
+					// todo remove debugging
+					if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok {
+						for _, v := range *parts {
+							fmt.Println(v.Part + " " + strconv.Itoa(int(v.StartingPos)) + " " + strconv.Itoa(int(v.StartingPosBody)) + " " + strconv.Itoa(int(v.EndingPosBody)))
+						}
 					}
 					}
-				}
+				}()
 
 
 				if parseErr == nil {
 				if parseErr == nil {
 					err := parser.Close()
 					err := parser.Close()

+ 3 - 3
backends/s_mysql_chunksaver.go

@@ -53,7 +53,7 @@ func MysqlChunksaver() *StreamDecorator {
 			}
 			}
 
 
 			sd.Close = func() error {
 			sd.Close = func() error {
-				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.MimeHeader); ok {
+				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok {
 					for _, v := range *parts {
 					for _, v := range *parts {
 						fmt.Println(v.Part + " " + strconv.Itoa(int(v.StartingPos)) + " " + strconv.Itoa(int(v.StartingPosBody)) + " " + strconv.Itoa(int(v.EndingPosBody)))
 						fmt.Println(v.Part + " " + strconv.Itoa(int(v.StartingPos)) + " " + strconv.Itoa(int(v.StartingPosBody)) + " " + strconv.Itoa(int(v.EndingPosBody)))
 					}
 					}
@@ -70,10 +70,10 @@ func MysqlChunksaver() *StreamDecorator {
 				if len(envelope.Header) > 0 {
 				if len(envelope.Header) > 0 {
 
 
 				}
 				}
-				var parts []*mime.MimeHeader
+				var parts []*mime.Part
 				if val, ok := envelope.Values["MimeParts"]; !ok {
 				if val, ok := envelope.Values["MimeParts"]; !ok {
 					//envelope.Values["MimeParts"] = &parser.Parts
 					//envelope.Values["MimeParts"] = &parser.Parts
-					parts = val.([]*mime.MimeHeader)
+					parts = val.([]*mime.Part)
 					size := len(parts)
 					size := len(parts)
 					if currentPart != size {
 					if currentPart != size {
 						currentPart = size
 						currentPart = size

+ 42 - 38
mail/mime/mime.go

@@ -7,6 +7,7 @@ import (
 	"io"
 	"io"
 	"net/textproto"
 	"net/textproto"
 	"strconv"
 	"strconv"
+	"sync"
 )
 )
 
 
 // todo
 // todo
@@ -63,7 +64,7 @@ type Parser struct {
 	boundaryMatched       int
 	boundaryMatched       int
 	count                 uint
 	count                 uint
 	result                chan parserMsg
 	result                chan parserMsg
-
+	sync.Mutex
 	// mime variables
 	// mime variables
 
 
 	// Parts is the mime parts tree. The parser builds the parts as it consumes the input
 	// Parts is the mime parts tree. The parser builds the parts as it consumes the input
@@ -71,13 +72,13 @@ type Parser struct {
 	// each node. The name of the node is the *path* of the node. The root node is always
 	// each node. The name of the node is the *path* of the node. The root node is always
 	// "1". The child would be "1.1", the next sibling would be "1.2", while the child of
 	// "1". The child would be "1.1", the next sibling would be "1.2", while the child of
 	// "1.2" would be "1.2.1"
 	// "1.2" would be "1.2.1"
-	Parts           []*MimeHeader
+	Parts           []*Part
 	msgPos          uint
 	msgPos          uint
 	msgLine         uint
 	msgLine         uint
 	lastBoundaryPos uint
 	lastBoundaryPos uint
 }
 }
 
 
-type MimeHeader struct {
+type Part struct {
 	Headers textproto.MIMEHeader
 	Headers textproto.MIMEHeader
 
 
 	Part string
 	Part string
@@ -131,13 +132,13 @@ func (c *contentType) String() string {
 	return fmt.Sprintf("%s/%s", c.superType, c.subType)
 	return fmt.Sprintf("%s/%s", c.superType, c.subType)
 }
 }
 
 
-func newMimeHeader() *MimeHeader {
-	mh := new(MimeHeader)
+func newPart() *Part {
+	mh := new(Part)
 	mh.Headers = make(textproto.MIMEHeader, 1)
 	mh.Headers = make(textproto.MIMEHeader, 1)
 	return mh
 	return mh
 }
 }
 
 
-func (p *Parser) addPart(mh *MimeHeader, id string) {
+func (p *Parser) addPart(mh *Part, id string) {
 	mh.Part = id
 	mh.Part = id
 	p.Parts = append(p.Parts, mh)
 	p.Parts = append(p.Parts, mh)
 }
 }
@@ -343,7 +344,7 @@ func (p *Parser) transportPadding() (err error) {
 	}
 	}
 }
 }
 
 
-func (p *Parser) header(mh *MimeHeader) (err error) {
+func (p *Parser) header(mh *Part) (err error) {
 	var state int
 	var state int
 	var name string
 	var name string
 
 
@@ -372,7 +373,8 @@ func (p *Parser) header(mh *MimeHeader) (err error) {
 				state = 1
 				state = 1
 			} else {
 			} else {
 				pc := p.peek()
 				pc := p.peek()
-				err = errors.New("unexpected char:" + string(p.ch) + ", peek:" + string(pc))
+				err = errors.New("unexpected char:[" + string(p.ch) + "], peek:" +
+					string(pc) + ", pos:" + strconv.Itoa(int(p.msgPos)))
 				return
 				return
 			}
 			}
 			if state == 1 {
 			if state == 1 {
@@ -509,7 +511,6 @@ func (p *Parser) contentType() (result contentType, err error) {
 			} else {
 			} else {
 				break
 				break
 			}
 			}
-
 		}
 		}
 	}
 	}
 
 
@@ -677,7 +678,7 @@ func (p *Parser) parameter() (attribute, value string, err error) {
 
 
 // isBranch determines if we should branch this part, when building
 // isBranch determines if we should branch this part, when building
 // the mime tree
 // the mime tree
-func (p *Parser) isBranch(part *MimeHeader, parent *MimeHeader) bool {
+func (p *Parser) isBranch(part *Part, parent *Part) bool {
 	ct := part.ContentType
 	ct := part.ContentType
 	if ct == nil {
 	if ct == nil {
 		return false
 		return false
@@ -702,7 +703,7 @@ func (p *Parser) isBranch(part *MimeHeader, parent *MimeHeader) bool {
 }
 }
 
 
 // multi finds the boundary and call back to mime() itself
 // multi finds the boundary and call back to mime() itself
-func (p *Parser) multi(part *MimeHeader, depth string) (err error) {
+func (p *Parser) multi(part *Part, depth string) (err error) {
 	if part.ContentType != nil {
 	if part.ContentType != nil {
 		// scan until the start of the boundary
 		// scan until the start of the boundary
 		if part.ContentType.superType == "multipart" {
 		if part.ContentType.superType == "multipart" {
@@ -724,11 +725,11 @@ func (p *Parser) multi(part *MimeHeader, depth string) (err error) {
 
 
 // mime scans the mime content and builds the mime-part tree in
 // mime scans the mime content and builds the mime-part tree in
 // p.Parts on-the-fly, as more bytes get fed in.
 // p.Parts on-the-fly, as more bytes get fed in.
-func (p *Parser) mime(parent *MimeHeader, depth string) (err error) {
+func (p *Parser) mime(parent *Part, depth string) (err error) {
 
 
 	count := 1
 	count := 1
 	for {
 	for {
-		part := newMimeHeader()
+		part := newPart()
 		part.StartingPos = p.msgPos
 		part.StartingPos = p.msgPos
 
 
 		// parse the headers
 		// parse the headers
@@ -797,31 +798,29 @@ func (p *Parser) mime(parent *MimeHeader, depth string) (err error) {
 	return
 	return
 }
 }
 
 
+func (p *Parser) reset() {
+	p.lastBoundaryPos = 0
+	p.pos = startPos
+	p.msgPos = 0
+	p.msgLine = 0
+	p.count = 0
+}
+
 // Close tells the MIME Parser there's no more data & waits for it to return a result
 // Close tells the MIME Parser there's no more data & waits for it to return a result
 // it will return an io.EOF error if no error with parsing MIME was detected
 // it will return an io.EOF error if no error with parsing MIME was detected
-// Close is not concurrency safe, must be called synchronously after all calls to
-// Parse have completed
 func (p *Parser) Close() error {
 func (p *Parser) Close() error {
+	p.Lock()
 	defer func() {
 	defer func() {
-		p.lastBoundaryPos = 0
-		p.pos = startPos
-		p.msgPos = 0
-		p.msgLine = 0
+		p.reset()
+		p.Unlock()
 	}()
 	}()
-	if p.count > 0 {
-		for {
-			// dont't exit unless we get the result.
-			select {
-			case <-p.consumed: // wait for p.buf to be consumed
-				p.gotNewSlice <- false // tell next() that there's no more data
-			case r := <-p.result:
-				// mime() has returned with a result
-				p.count = 0
-				return r.err
-			}
-		}
+	if p.count == 0 {
+		// already closed
+		return nil
 	}
 	}
-	return nil
+	p.gotNewSlice <- false
+	r := <-p.result
+	return r.err
 }
 }
 
 
 // Parse takes a byte stream, and feeds it to the MIME Parser, then
 // Parse takes a byte stream, and feeds it to the MIME Parser, then
@@ -836,27 +835,32 @@ func (p *Parser) Close() error {
 func (p *Parser) Parse(buf []byte) error {
 func (p *Parser) Parse(buf []byte) error {
 	defer func() {
 	defer func() {
 		p.count++
 		p.count++
+		p.Unlock()
 	}()
 	}()
+	p.Lock()
+
+	p.set(buf)
+
 	if p.count == 0 {
 	if p.count == 0 {
-		p.set(buf)
-		p.next()
+		//open
 		go func() {
 		go func() {
+			p.next()
 			err := p.mime(nil, "")
 			err := p.mime(nil, "")
 			fmt.Println("mine() ret", err)
 			fmt.Println("mine() ret", err)
 			p.result <- parserMsg{err}
 			p.result <- parserMsg{err}
 		}()
 		}()
-		return nil
+	} else {
+		p.gotNewSlice <- true
 	}
 	}
+
 	select {
 	select {
 	case <-p.consumed: // wait for prev buf to be consumed
 	case <-p.consumed: // wait for prev buf to be consumed
-		p.set(buf)
-		p.gotNewSlice <- true
 		return nil
 		return nil
 	case r := <-p.result:
 	case r := <-p.result:
 		// mime() has returned with a result
 		// mime() has returned with a result
+		p.reset()
 		return r.err
 		return r.err
 	}
 	}
-
 }
 }
 
 
 func NewMimeParser() *Parser {
 func NewMimeParser() *Parser {

+ 12 - 6
mail/mime/mime_test.go

@@ -12,9 +12,10 @@ import (
 var p *Parser
 var p *Parser
 
 
 func init() {
 func init() {
-	p = NewMimeParser()
+
 }
 }
 func TestInject(t *testing.T) {
 func TestInject(t *testing.T) {
+	p = NewMimeParser()
 	var b bytes.Buffer
 	var b bytes.Buffer
 
 
 	// it should read from both slices
 	// it should read from both slices
@@ -32,7 +33,7 @@ func TestInject(t *testing.T) {
 	}
 	}
 }
 }
 func TestMimeType(t *testing.T) {
 func TestMimeType(t *testing.T) {
-
+	p = NewMimeParser()
 	if isTokenSpecial['-'] {
 	if isTokenSpecial['-'] {
 		t.Error("- should not be in the set")
 		t.Error("- should not be in the set")
 	}
 	}
@@ -49,6 +50,7 @@ func TestMimeType(t *testing.T) {
 }
 }
 
 
 func TestMimeContentType(t *testing.T) {
 func TestMimeContentType(t *testing.T) {
+	p = NewMimeParser()
 	go func() {
 	go func() {
 		<-p.consumed
 		<-p.consumed
 		p.gotNewSlice <- false
 		p.gotNewSlice <- false
@@ -68,6 +70,7 @@ func TestMimeContentType(t *testing.T) {
 }
 }
 
 
 func TestEmailHeader(t *testing.T) {
 func TestEmailHeader(t *testing.T) {
+	p = NewMimeParser()
 	in := `From: Al Gore <[email protected]>
 	in := `From: Al Gore <[email protected]>
 To: White House Transportation Coordinator <[email protected]>
 To: White House Transportation Coordinator <[email protected]>
 Subject: [Fwd: Map of Argentina with Description]
 Subject: [Fwd: Map of Argentina with Description]
@@ -96,7 +99,7 @@ Al
 This
 This
 `
 `
 	p.inject([]byte(in))
 	p.inject([]byte(in))
-	h := newMimeHeader()
+	h := newPart()
 	err := p.header(h)
 	err := p.header(h)
 	if err != nil {
 	if err != nil {
 		t.Error(err)
 		t.Error(err)
@@ -107,7 +110,7 @@ This
 		//_ = part
 		//_ = part
 		//p.addPart(part)
 		//p.addPart(part)
 
 
-		//nextPart := newMimeHeader()
+		//nextPart := newPart()
 		//err = p.body(part)
 		//err = p.body(part)
 		//if err != nil {
 		//if err != nil {
 		//	t.Error(err)
 		//	t.Error(err)
@@ -116,8 +119,9 @@ This
 }
 }
 
 
 func TestBoundary(t *testing.T) {
 func TestBoundary(t *testing.T) {
+	p = NewMimeParser()
 	var err error
 	var err error
-	part := newMimeHeader()
+	part := newPart()
 	part.ContentBoundary = "-wololo-"
 	part.ContentBoundary = "-wololo-"
 
 
 	// in the middle of the string
 	// in the middle of the string
@@ -166,7 +170,7 @@ func TestBoundary(t *testing.T) {
 }
 }
 
 
 func TestMimeContentQuotedParams(t *testing.T) {
 func TestMimeContentQuotedParams(t *testing.T) {
-
+	p = NewMimeParser()
 	// quoted
 	// quoted
 	p.inject([]byte("text/plain; charset=\"us-ascii\""))
 	p.inject([]byte("text/plain; charset=\"us-ascii\""))
 	contentType, err := p.contentType()
 	contentType, err := p.contentType()
@@ -411,6 +415,7 @@ TmV4dFBhcnRfMDAwX0FFNkJfNzI1RTA5QUYuODhCN0Y5MzQtLQ0K
 `
 `
 
 
 func TestNestedEmail(t *testing.T) {
 func TestNestedEmail(t *testing.T) {
+	p = NewMimeParser()
 	email = email3
 	email = email3
 	p.inject([]byte(email))
 	p.inject([]byte(email))
 
 
@@ -452,6 +457,7 @@ This is not a an MIME email
 `
 `
 
 
 func TestNonMineEmail(t *testing.T) {
 func TestNonMineEmail(t *testing.T) {
+	p = NewMimeParser()
 	p.inject([]byte(email4))
 	p.inject([]byte(email4))
 	if err := p.mime(nil, ""); err != nil && err != NotMime && err != io.EOF {
 	if err := p.mime(nil, ""); err != nil && err != NotMime && err != io.EOF {
 		t.Error(err)
 		t.Error(err)