Browse Source

- chunksaver WIP
- update "headers parser" stream processor to use latest headers parsing

flashmob 6 years ago
parent
commit
a7202c5822
5 changed files with 56 additions and 109 deletions
  1. 20 3
      backends/s_chunksaver.go
  2. 2 2
      backends/s_compress.go
  3. 32 95
      backends/s_headers_parser.go
  4. 2 1
      backends/s_mime.go
  5. 0 8
      mail/envelope.go

+ 20 - 3
backends/s_mysql_chunksaver.go → backends/s_chunksaver.go

@@ -9,11 +9,28 @@ import (
 )
 )
 
 
 func init() {
 func init() {
-	streamers["MysqlChunksaver"] = func() *StreamDecorator {
-		return MysqlChunksaver()
+	streamers["chunksaver"] = func() *StreamDecorator {
+		return Chunksaver()
 	}
 	}
 }
 }
 
 
+type partsInfo struct {
+	Count     uint32 // number of parts
+	TextPart  int    // id of the main text part to display
+	HTMLPart  int    // id of the main html part to display (if any)
+	HasAttach bool
+	Parts     []chunkedParts
+}
+
+type chunkedParts struct {
+	PartId             string
+	ChunkHash          [][32]byte // sequence of hashes the data is stored at
+	ContentType        string
+	Charset            string
+	TransferEncoding   string
+	ContentDisposition string
+}
+
 /**
 /**
  * messages: mid, part_tree, part_count, has_attach, created_at
  * messages: mid, part_tree, part_count, has_attach, created_at
  * parts: mid, part_id, chunk_md5, header_data, seq
  * parts: mid, part_id, chunk_md5, header_data, seq
@@ -24,7 +41,7 @@ func init() {
  * - if didn't receive first chunk for more than x bytes, save normally
  * - if didn't receive first chunk for more than x bytes, save normally
  *
  *
  */
  */
-func MysqlChunksaver() *StreamDecorator {
+func Chunksaver() *StreamDecorator {
 
 
 	sd := &StreamDecorator{}
 	sd := &StreamDecorator{}
 	sd.p =
 	sd.p =

+ 2 - 2
backends/s_compress.go

@@ -2,8 +2,9 @@ package backends
 
 
 import (
 import (
 	"compress/zlib"
 	"compress/zlib"
-	"github.com/flashmob/go-guerrilla/mail"
 	"io"
 	"io"
+
+	"github.com/flashmob/go-guerrilla/mail"
 )
 )
 
 
 func init() {
 func init() {
@@ -17,7 +18,6 @@ func StreamCompress() *StreamDecorator {
 	sd.p =
 	sd.p =
 		func(sp StreamProcessor) StreamProcessor {
 		func(sp StreamProcessor) StreamProcessor {
 			var zw io.WriteCloser
 			var zw io.WriteCloser
-			_ = zw
 			sd.Open = func(e *mail.Envelope) error {
 			sd.Open = func(e *mail.Envelope) error {
 				var err error
 				var err error
 				zw, err = zlib.NewWriterLevel(sp, zlib.BestSpeed)
 				zw, err = zlib.NewWriterLevel(sp, zlib.BestSpeed)

+ 32 - 95
backends/s_headers_parser.go

@@ -1,14 +1,25 @@
 package backends
 package backends
 
 
 import (
 import (
-	"bufio"
-	"bytes"
-	"io"
-	"net/textproto"
-
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mail"
+	"github.com/flashmob/go-guerrilla/mail/mime"
 )
 )
 
 
+// ----------------------------------------------------------------------------------
+// Processor Name: HeadersParser
+// ----------------------------------------------------------------------------------
+// Description   : Populates the envelope.Header value
+//-----------------------------------------------------------------------------------
+// Requires      : "mime" stream stream processor to be enabled before it
+// ----------------------------------------------------------------------------------
+// Config Options: None
+// --------------:-------------------------------------------------------------------
+// Input         : e.Values["MimeParts"] generated by the mime processor
+// ----------------------------------------------------------------------------------
+// Output        : populates e.Header and e.Subject values of the envelope.
+//               : Any encoded data in the subject is decoded to UTF-8
+// ----------------------------------------------------------------------------------
+
 func init() {
 func init() {
 	streamers["headersparser"] = func() *StreamDecorator {
 	streamers["headersparser"] = func() *StreamDecorator {
 		return StreamHeadersParser()
 		return StreamHeadersParser()
@@ -17,122 +28,48 @@ func init() {
 
 
 const stateHeaderScanning = 0
 const stateHeaderScanning = 0
 const stateHeaderNotScanning = 1
 const stateHeaderNotScanning = 1
-const headerMaxBytes = 1024 * 4
-
-type streamHeaderConfig struct {
-	MaxBytes int64 `json:"s_header_max_bytes,omitempty"`
-}
 
 
 func StreamHeadersParser() *StreamDecorator {
 func StreamHeadersParser() *StreamDecorator {
-	var config *streamHeaderConfig
-	initFunc := InitializeWith(func(backendConfig BackendConfig) error {
-		configType := BaseConfig(&streamHeaderConfig{})
-		bcfg, err := Svc.ExtractConfig(backendConfig, configType)
-		if err != nil {
-			return err
-		}
-		config = bcfg.(*streamHeaderConfig)
-		if config.MaxBytes == 0 {
-			config.MaxBytes = headerMaxBytes
-		}
-		return nil
-	})
-	Svc.AddInitializer(initFunc)
+
 	sd := &StreamDecorator{}
 	sd := &StreamDecorator{}
 	sd.p =
 	sd.p =
-
 		func(sp StreamProcessor) StreamProcessor {
 		func(sp StreamProcessor) StreamProcessor {
-
 			var (
 			var (
-				buf      bytes.Buffer // buf buffers the header
 				state    byte
 				state    byte
-				lastByte byte
-				total    int64
 				envelope *mail.Envelope
 				envelope *mail.Envelope
 			)
 			)
 
 
-			parse := func() error {
-				var err error
-				// use a TeeReader to split the write to both sp and headerReader
-				r := bufio.NewReader(io.TeeReader(&buf, sp))
-				headerReader := textproto.NewReader(r)
-				envelope.Header, err = headerReader.ReadMIMEHeader()
-
-				if err != nil {
-					if subject, ok := envelope.Header["Subject"]; ok {
-						envelope.Subject = mail.MimeHeaderDecode(subject[0])
-					}
-				}
-
-				return err
-			}
 			sd.Open = func(e *mail.Envelope) error {
 			sd.Open = func(e *mail.Envelope) error {
-				buf.Reset()
-				state = 0
-				lastByte = 0
-				total = 0
+				state = stateHeaderScanning
 				envelope = e
 				envelope = e
 				return nil
 				return nil
 			}
 			}
 
 
 			sd.Close = func() error {
 			sd.Close = func() error {
-				// If header wasn't detected
-				// pump out whatever is in the buffer to the underlying writer
-				if state == stateHeaderScanning {
-					_, err := io.Copy(sp, &buf)
-					return err
-				}
 				return nil
 				return nil
 			}
 			}
-			return StreamProcessWith(func(p []byte) (int, error) {
 
 
+			return StreamProcessWith(func(p []byte) (int, error) {
 				switch state {
 				switch state {
 				case stateHeaderScanning:
 				case stateHeaderScanning:
-					// detect end of header \n\n
-					headerEnd := bytes.Index(p, []byte{'\n', '\n'})
-					if headerEnd == -1 && (lastByte == '\n' && p[0] == '\n') {
-						headerEnd = 0
-					}
-					var remainder []byte // remainder are the non-header bytes after the \n\n
-					if headerEnd > -1 {
-
-						if len(p) > headerEnd {
-							remainder = p[headerEnd:]
+					if mimeParts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok {
+						// copy the the headers of the first mime-part to envelope.Header
+						// then call envelope.ParseHeaders()
+						if len(*mimeParts) > 0 {
+							headers := (*mimeParts)[0].Headers
+							if headers != nil && len(headers) > 0 {
+								state = stateHeaderNotScanning
+								envelope.Header = headers
+								_ = envelope.ParseHeaders()
+							}
 						}
 						}
-						p = p[:headerEnd]
 					}
 					}
-
-					// read in the header to a temp buffer
-					n, err := io.Copy(&buf, bytes.NewReader(p))
-					lastByte = p[n-1] // remember the last byte read
-					if headerEnd > -1 {
-						// header found, parse it
-						parseErr := parse()
-						if parseErr != nil {
-							Log().WithError(parseErr).Error("cannot parse headers")
-						}
-						// flush the remainder to the underlying writer
-						if remainder != nil {
-							n1, _ := sp.Write(remainder)
-							n = n + int64(n1)
-						}
-						state = stateHeaderNotScanning
-					} else {
-						total += n
-						// give up if we didn't detect the header after x bytes
-						if total > config.MaxBytes {
-							state = stateHeaderNotScanning
-							n, err = io.Copy(sp, &buf)
-							return int(n), err
-						}
-					}
-					return int(n), err
-				case stateHeaderNotScanning:
-					// just forward everything to the next writer without buffering
 					return sp.Write(p)
 					return sp.Write(p)
 				}
 				}
-
+				// state is stateHeaderNotScanning
+				// just forward everything to the underlying writer
 				return sp.Write(p)
 				return sp.Write(p)
+
 			})
 			})
 		}
 		}
 
 

+ 2 - 1
backends/s_mime.go

@@ -16,7 +16,7 @@ import (
 // --------------:-------------------------------------------------------------------
 // --------------:-------------------------------------------------------------------
 // Input         :
 // Input         :
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
-// Output        :
+// Output        : MimeParts (of type *[]*mime.Part) stored in the envelope.Values map
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
 
 
 func init() {
 func init() {
@@ -78,6 +78,7 @@ func StreamMimeAnalyzer() *StreamDecorator {
 				if _, ok := envelope.Values["MimeParts"]; !ok {
 				if _, ok := envelope.Values["MimeParts"]; !ok {
 					envelope.Values["MimeParts"] = &parser.Parts
 					envelope.Values["MimeParts"] = &parser.Parts
 				}
 				}
+
 				if parseErr == nil {
 				if parseErr == nil {
 					parseErr = parser.Parse(p)
 					parseErr = parser.Parse(p)
 					if parseErr != nil {
 					if parseErr != nil {

+ 0 - 8
mail/envelope.go

@@ -12,8 +12,6 @@ import (
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 	"time"
 	"time"
-
-	mimelib "github.com/flashmob/go-guerrilla/mail/mime"
 )
 )
 
 
 // A WordDecoder decodes MIME headers containing RFC 2047 encoded-words.
 // A WordDecoder decodes MIME headers containing RFC 2047 encoded-words.
@@ -112,12 +110,6 @@ func queuedID(clientID uint64) string {
 	return fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID))))
 	return fmt.Sprintf("%x", md5.Sum([]byte(string(time.Now().Unix())+string(clientID))))
 }
 }
 
 
-func (e *Envelope) setHeaders(p []*mimelib.Part) {
-	if p != nil && len(p) > 0 {
-		e.Header = p[0].Headers
-	}
-}
-
 // ParseHeaders parses the headers into Header field of the Envelope struct.
 // ParseHeaders parses the headers into Header field of the Envelope struct.
 // Data buffer must be full before calling.
 // Data buffer must be full before calling.
 // It assumes that at most 30kb of email data can be a header
 // It assumes that at most 30kb of email data can be a header