Browse Source

- mime parser now implements io.Writer for more flexibility
- stream processors support basic dependency injection
- small refactoring (rename P to Decorate)
- update test

flashmob 6 years ago
parent
commit
fead324201

+ 30 - 17
backends/s_chunksaver.go

@@ -638,9 +638,8 @@ const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_
 func Chunksaver() *StreamDecorator {
 
 	sd := &StreamDecorator{}
-	sd.P =
-
-		func(sp StreamProcessor) StreamProcessor {
+	sd.Decorate =
+		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
 			var (
 				envelope    *mail.Envelope
 				chunkBuffer *chunkedBytesBufferMime
@@ -655,34 +654,48 @@ func Chunksaver() *StreamDecorator {
 			)
 
 			var config *chunkSaverConfig
+			// optional dependency injection
+			for i := range a {
+				if db, ok := a[i].(ChunkSaverStorage); ok {
+					database = db
+				}
+				if buff, ok := a[i].(*chunkedBytesBufferMime); ok {
+					chunkBuffer = buff
+				}
+			}
 
 			Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
-				chunkBuffer = newChunkedBytesBufferMime()
+				if chunkBuffer == nil {
+					chunkBuffer = newChunkedBytesBufferMime()
+				}
 				configType := BaseConfig(&chunkSaverConfig{})
 				bcfg, err := Svc.ExtractConfig(backendConfig, configType)
 				if err != nil {
 					return err
 				}
 				config = bcfg.(*chunkSaverConfig)
+				// configure storage if none was injected
+				if database == nil {
+					if config.StorageEngine == "memory" {
+						db := new(chunkSaverMemory)
+						database = db
+					} else {
+						db := new(chunkSaverSQL)
+						database = db
+					}
+				}
+				err = database.Initialize(backendConfig)
+				if err != nil {
+					return err
+				}
+				// configure the chunks buffer
 				if config.ChunkMaxBytes > 0 {
 					chunkBuffer.capTo(config.ChunkMaxBytes)
 				} else {
 					chunkBuffer.capTo(chunkMaxBytes)
 				}
-				if config.StorageEngine == "memory" {
-					db := new(chunkSaverMemory)
-					chunkBuffer.setDatabase(db)
-					database = db
-				} else {
-					db := new(chunkSaverSQL)
-					chunkBuffer.setDatabase(db)
-					database = db
-				}
+				chunkBuffer.setDatabase(database)
 
-				err = database.Initialize(backendConfig)
-				if err != nil {
-					return err
-				}
 				return nil
 			}))
 

+ 16 - 71
backends/s_chunksaver_test.go

@@ -6,7 +6,6 @@ import (
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mail/mime"
 	"io"
-	"net"
 	"testing"
 )
 
@@ -145,21 +144,25 @@ U6ZGxseyk8SasGw3J9GRzdTQky1iHNvcPNNI4TLeKdfMvy0vMqLrItvuxfDW8ubjueDtJufz
 
 func TestChunkSaverWrite(t *testing.T) {
 
-	// parse an email
-	parser := mime.NewMimeParser()
-
 	// place the parse result in an envelope
 	e := mail.NewEnvelope("127.0.0.1", 1)
 	to, _ := mail.NewAddress("[email protected]")
 	e.RcptTo = append(e.RcptTo, to)
-	e.Values["MimeParts"] = &parser.Parts
 
+	store := new(chunkSaverMemory)
+	chunkBuffer := newChunkedBytesBufferMime()
 	// instantiate the chunk saver
 	chunksaver := streamers["chunksaver"]()
+	mimeanalyzer := streamers["mimeanalyzer"]()
 
 	// add the default processor as the underlying processor for chunksaver
-	// this will also set our Open, Close and Initialize functions
-	stream := chunksaver.P(DefaultStreamProcessor{})
+	// and chain it with mimeanalyzer.
+	// Call order: mimeanalyzer -> chunksaver -> default (terminator)
+	// This will also set our Open, Close and Initialize functions
+	// we also inject a ChunkSaverStorage and a ChunkedBytesBufferMime
+
+	stream := mimeanalyzer.Decorate(chunksaver.Decorate(DefaultStreamProcessor{}, store, chunkBuffer))
+
 	// configure the buffer cap
 	bc := BackendConfig{}
 	bc["chunksaver_chunk_size"] = 64
@@ -168,72 +171,14 @@ func TestChunkSaverWrite(t *testing.T) {
 
 	// give it the envelope with the parse results
 	_ = chunksaver.Open(e)
+	_ = mimeanalyzer.Open(e)
 
-	// let's test it
-
-	writeIt(parser, t, stream, 128)
-
-	_ = chunksaver.Close()
-	//writeIt(parser, t, stream, 128000)
-}
-
-func writeIt(parser *mime.Parser, t *testing.T, stream StreamProcessor, size int) {
-
-	if size > len(email) {
-		size = len(email)
-	}
-	total := 0
-
-	// break up the email in to chunks of size. Feed them through the mime parser
-	for msgPos := 0; msgPos < len(email); msgPos += size {
-		err := parser.Parse([]byte(email)[msgPos : msgPos+size])
-		if err != nil {
-			t.Error(err)
-			t.Fail()
-		}
-		cut := msgPos + size
-		if cut > len(email) {
-			// the last chunk may be shorter than size
-			cut -= cut - len(email)
-		}
-		i, _ := stream.Write([]byte(email)[msgPos:cut])
-		total += i
-	}
-	if total != len(email) {
-		t.Error("short write, total is", total, "but len(email) is", len(email))
-	}
-}
-
-func TestMemDB(t *testing.T) {
-
-	m := new(chunkSaverMemory)
-
-	from := "[email protected]"
-	helo := "home-host"
-	recipient := "[email protected]"
-	ipAddress := net.IPAddr{IP: net.ParseIP("127.0.0.1")}
-	returnPath := "[email protected]"
-	isTLS := false
-	_ = m.Initialize(nil)
-	mailID, err := m.OpenMessage(from, helo, recipient, ipAddress, returnPath, isTLS)
-	if err != nil {
+	buf := make([]byte, 128)
+	if written, err := io.CopyBuffer(stream, bytes.NewBuffer([]byte(email)), buf); err != nil {
 		t.Error(err)
-	}
-	buff := newChunkedBytesBufferMime()
-	buff.capTo(64)
-	buff.setDatabase(m)
-	written, err := io.Copy(buff, bytes.NewBuffer([]byte(email)))
-	if err != nil {
-		t.Error(err, "written:", written)
 	} else {
-		err = m.CloseMessage(mailID, written, &PartsInfo{}, "a subject", "1234abc", "[email protected]", "[email protected]")
-		if err != nil {
-			t.Error(err, "close message:", written)
-		}
-		email, _ := m.GetEmail(mailID)
-		fmt.Println(email)
-		//_ = m.GetChunks()
-		_ = m.Shutdown()
+		_ = mimeanalyzer.Close()
+		_ = chunksaver.Close()
+		fmt.Println("written:", written)
 	}
-
 }

+ 2 - 2
backends/s_compress.go

@@ -15,8 +15,8 @@ func init() {
 
 func StreamCompress() *StreamDecorator {
 	sd := &StreamDecorator{}
-	sd.P =
-		func(sp StreamProcessor) StreamProcessor {
+	sd.Decorate =
+		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
 			var zw io.WriteCloser
 			sd.Open = func(e *mail.Envelope) error {
 				var err error

+ 2 - 2
backends/s_debug.go

@@ -14,9 +14,9 @@ func init() {
 
 func StreamDebug() *StreamDecorator {
 	sd := &StreamDecorator{}
-	sd.P =
+	sd.Decorate =
 
-		func(sp StreamProcessor) StreamProcessor {
+		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
 
 			sd.Open = func(e *mail.Envelope) error {
 				return nil

+ 2 - 2
backends/s_decompress.go

@@ -19,8 +19,8 @@ func init() {
 
 func StreamDecompress() *StreamDecorator {
 	sd := &StreamDecorator{}
-	sd.P =
-		func(sp StreamProcessor) StreamProcessor {
+	sd.Decorate =
+		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
 			var (
 				zr io.ReadCloser
 				pr *io.PipeReader

+ 2 - 2
backends/s_header.go

@@ -72,9 +72,9 @@ func StreamHeader() *StreamDecorator {
 	}))
 
 	sd := &StreamDecorator{}
-	sd.P =
+	sd.Decorate =
 
-		func(sp StreamProcessor) StreamProcessor {
+		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
 			var sh *streamHeader
 
 			sd.Open = func(e *mail.Envelope) error {

+ 2 - 2
backends/s_headers_parser.go

@@ -32,8 +32,8 @@ const stateHeaderNotScanning = 1
 func StreamHeadersParser() *StreamDecorator {
 
 	sd := &StreamDecorator{}
-	sd.P =
-		func(sp StreamProcessor) StreamProcessor {
+	sd.Decorate =
+		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
 			var (
 				state    byte
 				envelope *mail.Envelope

+ 2 - 2
backends/s_mime.go

@@ -27,9 +27,9 @@ func init() {
 func StreamMimeAnalyzer() *StreamDecorator {
 
 	sd := &StreamDecorator{}
-	sd.P =
+	sd.Decorate =
 
-		func(sp StreamProcessor) StreamProcessor {
+		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
 
 			var (
 				envelope *mail.Envelope

+ 2 - 2
backends/s_process.go

@@ -15,9 +15,9 @@ func init() {
 // Buffers to envelope.Data so that processors can be called on it at the end
 func StreamProcess() *StreamDecorator {
 	sd := &StreamDecorator{}
-	sd.P =
+	sd.Decorate =
 
-		func(sp StreamProcessor) StreamProcessor {
+		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
 			var envelope *mail.Envelope
 			sd.Open = func(e *mail.Envelope) error {
 				envelope = e

+ 7 - 5
backends/stream_decorate.go

@@ -9,17 +9,19 @@ type streamOpenWith func(e *mail.Envelope) error
 type streamCloseWith func() error
 
 // We define what a decorator to our processor will look like
+// StreamProcessor argument is the underlying processor that we're decorating
+// the additional ...interface argument is not needed, but can be useful for dependency injection
 type StreamDecorator struct {
-	P     func(StreamProcessor) StreamProcessor
-	e     *mail.Envelope
-	Close streamCloseWith
-	Open  streamOpenWith
+	Decorate func(StreamProcessor, ...interface{}) StreamProcessor
+	e        *mail.Envelope
+	Close    streamCloseWith
+	Open     streamOpenWith
 }
 
 // DecorateStream will decorate a StreamProcessor with a slice of passed decorators
 func DecorateStream(c StreamProcessor, ds []*StreamDecorator) (StreamProcessor, []*StreamDecorator) {
 	for i := range ds {
-		c = ds[i].P(c)
+		c = ds[i].Decorate(c)
 	}
 	return c, ds
 }

+ 18 - 0
mail/mime/mime.go

@@ -80,6 +80,8 @@ type Parser struct {
 	lastBoundaryPos uint // the last msgPos where a boundary was detected
 
 	maxNodes int // the desired number of maximum nodes the parser is limited to
+
+	w io.Writer // underlying io.Writer
 }
 
 type Part struct {
@@ -941,6 +943,16 @@ func (p *Parser) Close() error {
 
 }
 
+func (p *Parser) Write(buf []byte) (int, error) {
+	if err := p.Parse(buf); err != nil {
+		return len(buf), err
+	}
+	if p.w != nil {
+		return p.w.Write(buf)
+	}
+	return len(buf), nil
+}
+
 // Parse takes a byte stream, and feeds it to the MIME Parser, then
 // waits for the Parser to consume all input before returning.
 // The parser will build a parse tree in p.Parts
@@ -1003,6 +1015,12 @@ func NewMimeParser() *Parser {
 	return p
 }
 
+func NewMimeParserWriter(w io.Writer) *Parser {
+	p := NewMimeParser()
+	p.w = w
+	return p
+}
+
 // NewMimeParser returns a mime parser with a custom MaxNodes value
 func NewMimeParserLimited(maxNodes int) *Parser {
 	p := NewMimeParser()