Browse Source

use named struct fields instead of map for MimeParts and MessageID, plus other refactorings

flashmob 5 years ago
parent
commit
daa25103b4

+ 0 - 0
backends/stream_decorate.go → backends/decorate_stream.go


+ 2 - 2
backends/gateway.go

@@ -468,7 +468,7 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 
 		gw.streamers = append(gw.streamers, s)
 	}
-	// Initialize processors
+	// Initialize processors & stream processors
 	if err := Svc.Initialize(cfg); err != nil {
 		gw.State = BackendStateError
 		return err
@@ -614,7 +614,7 @@ func (gw *BackendGateway) workDispatcher(
 			} else if msg.task == TaskSaveMailStream {
 				err := stream.open(msg.e)
 				if err == nil {
-					if msg.e.Values["size"], err = io.CopyBuffer(stream, msg.r, gw.buf); err != nil {
+					if msg.e.Size, err = io.CopyBuffer(stream, msg.r, gw.buf); err != nil {
 						Log().WithError(err).Error("stream writing failed")
 					}
 					if err = stream.close(); err != nil {

+ 0 - 5
backends/p_guerrilla_db_redis.go

@@ -54,7 +54,6 @@ type GuerrillaDBAndRedisBackend struct {
 type stmtCache [GuerrillaDBAndRedisBatchMax]*sql.Stmt
 
 type guerrillaDBAndRedisConfig struct {
-	NumberOfWorkers    int    `json:"save_workers_size"`
 	Table              string `json:"mail_table"`
 	Driver             string `json:"sql_driver"`
 	DSN                string `json:"sql_dsn"`
@@ -78,10 +77,6 @@ func (g *GuerrillaDBAndRedisBackend) loadConfig(backendConfig BackendConfig) (er
 	return nil
 }
 
-func (g *GuerrillaDBAndRedisBackend) getNumberOfWorkers() int {
-	return g.config.NumberOfWorkers
-}
-
 type redisClient struct {
 	isConnected bool
 	conn        RedisConn

+ 1 - 0
backends/processor.go

@@ -71,4 +71,5 @@ func (w DefaultStreamProcessor) Write(p []byte) (n int, err error) {
 	return len(p), nil
 }
 
+// NoopStreamProcessor does nothing, it's a placeholder when no stream processors have been configured
 type NoopStreamProcessor struct{ DefaultStreamProcessor }

+ 47 - 0
backends/s_buffer.go

@@ -0,0 +1,47 @@
+package backends
+
+import (
+	"bytes"
+	"github.com/flashmob/go-guerrilla/mail"
+	"io"
+)
+
+// ----------------------------------------------------------------------------------
+// Processor Name: buffer
+// ----------------------------------------------------------------------------------
+// Description   : Buffers the message data to envelope.Data
+// ----------------------------------------------------------------------------------
+// Config Options:
+// --------------:-------------------------------------------------------------------
+// Input         :
+// ----------------------------------------------------------------------------------
+// Output        : envelope.Data
+// ----------------------------------------------------------------------------------
+
+func init() {
+	Streamers["buffer"] = func() *StreamDecorator {
+		return StreamProcess()
+	}
+}
+
+// Buffers to envelope.Data so that processors can be called on it at the end
+func StreamProcess() *StreamDecorator {
+	sd := &StreamDecorator{}
+	sd.Decorate =
+
+		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
+			var envelope *mail.Envelope
+			sd.Open = func(e *mail.Envelope) error {
+				envelope = e
+				return nil
+			}
+
+			return StreamProcessWith(func(p []byte) (int, error) {
+				tr := io.TeeReader(bytes.NewReader(p), sp)
+				n, err := envelope.Data.ReadFrom(tr)
+				return int(n), err
+			})
+		}
+
+	return sd
+}

+ 6 - 6
backends/s_headers_parser.go

@@ -2,19 +2,19 @@ package backends
 
 import (
 	"github.com/flashmob/go-guerrilla/mail"
-	"github.com/flashmob/go-guerrilla/mail/mime"
 )
 
 // ----------------------------------------------------------------------------------
 // Processor Name: HeadersParser
 // ----------------------------------------------------------------------------------
-// Description   : Populates the envelope.Header value
+// Description   : Populates the envelope.Header value.
+//               : It also decodes the subject to UTF-8
 //-----------------------------------------------------------------------------------
 // Requires      : "mimeanalyzer" stream processor to be enabled before it
 // ----------------------------------------------------------------------------------
 // Config Options: None
 // --------------:-------------------------------------------------------------------
-// Input         : e.Values["MimeParts"] generated by the mime processor
+// Input         : e.MimeParts generated by the mimeanalyzer processor
 // ----------------------------------------------------------------------------------
 // Output        : populates e.Header and e.Subject values of the envelope.
 //               : Any encoded data in the subject is decoded to UTF-8
@@ -52,11 +52,11 @@ func StreamHeadersParser() *StreamDecorator {
 			return StreamProcessWith(func(p []byte) (int, error) {
 				switch state {
 				case stateHeaderScanning:
-					if mimeParts, ok := envelope.Values["MimeParts"].(*mime.Parts); ok {
+					if envelope.MimeParts != nil {
 						// copy the the headers of the first mime-part to envelope.Header
 						// then call envelope.ParseHeaders()
-						if len(*mimeParts) > 0 {
-							headers := (*mimeParts)[0].Headers
+						if len(*envelope.MimeParts) > 0 {
+							headers := (*envelope.MimeParts)[0].Headers
 							if headers != nil && len(headers) > 0 {
 								state = stateHeaderNotScanning
 								envelope.Header = headers

+ 9 - 7
backends/s_mime.go → backends/s_mimeanalyzer.go

@@ -2,19 +2,21 @@ package backends
 
 import (
 	"github.com/flashmob/go-guerrilla/mail"
-	"github.com/flashmob/go-guerrilla/mail/mime"
+	"github.com/flashmob/go-guerrilla/mail/mimeparse"
 )
 
 // ----------------------------------------------------------------------------------
 // Name          : Mime Analyzer
 // ----------------------------------------------------------------------------------
-// Description   : analyse the MIME structure of a stream
+// Description   : Analyse the MIME structure of a stream.
+//               : Headers of each part are unfolded and saved in a *mime.Parts struct.
+//               : No decoding or any other processing.
 // ----------------------------------------------------------------------------------
 // Config Options:
 // --------------:-------------------------------------------------------------------
 // Input         :
 // ----------------------------------------------------------------------------------
-// Output        : MimeParts (of type *mime.Parts) stored in the envelope.Values map
+// Output        : MimeParts (of type *mime.Parts) stored in the envelope.MimeParts field
 // ----------------------------------------------------------------------------------
 
 func init() {
@@ -32,10 +34,10 @@ func StreamMimeAnalyzer() *StreamDecorator {
 			var (
 				envelope *mail.Envelope
 				parseErr error
-				parser   *mime.Parser
+				parser   *mimeparse.Parser
 			)
 			Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
-				parser = mime.NewMimeParser()
+				parser = mimeparse.NewMimeParser()
 				return nil
 			}))
 
@@ -63,8 +65,8 @@ func StreamMimeAnalyzer() *StreamDecorator {
 			}
 
 			return StreamProcessWith(func(p []byte) (int, error) {
-				if _, ok := envelope.Values["MimeParts"]; !ok {
-					envelope.Values["MimeParts"] = &parser.Parts
+				if envelope.MimeParts == nil {
+					envelope.MimeParts = &parser.Parts
 				}
 				if parseErr == nil {
 					parseErr = parser.Parse(p)

+ 0 - 35
backends/s_process.go

@@ -1,35 +0,0 @@
-package backends
-
-import (
-	"bytes"
-	"github.com/flashmob/go-guerrilla/mail"
-	"io"
-)
-
-func init() {
-	Streamers["process"] = func() *StreamDecorator {
-		return StreamProcess()
-	}
-}
-
-// Buffers to envelope.Data so that processors can be called on it at the end
-func StreamProcess() *StreamDecorator {
-	sd := &StreamDecorator{}
-	sd.Decorate =
-
-		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
-			var envelope *mail.Envelope
-			sd.Open = func(e *mail.Envelope) error {
-				envelope = e
-				return nil
-			}
-
-			return StreamProcessWith(func(p []byte) (int, error) {
-				tr := io.TeeReader(bytes.NewReader(p), sp)
-				n, err := envelope.Data.ReadFrom(tr)
-				return int(n), err
-			})
-		}
-
-	return sd
-}

+ 22 - 20
backends/s_transformer.go

@@ -8,7 +8,7 @@ import (
 
 	"github.com/flashmob/go-guerrilla/chunk/transfer"
 	"github.com/flashmob/go-guerrilla/mail"
-	"github.com/flashmob/go-guerrilla/mail/mime"
+	"github.com/flashmob/go-guerrilla/mail/mimeparse"
 )
 
 // ----------------------------------------------------------------------------------
@@ -18,11 +18,11 @@ import (
 // ----------------------------------------------------------------------------------
 // Config Options:
 // --------------:-------------------------------------------------------------------
-// Input         : envelope.Values["MimeParts"]
+// Input         : envelope.MimeParts
 // ----------------------------------------------------------------------------------
 // Output        : 8bit mime message, with charsets decoded to UTF-8
 //               : Note that this processor changes the body counts. Therefore, it makes
-//               : a new instance of envelope.Values["MimeParts"] which is then populated
+//               : a new instance of envelope.MimeParts which is then populated
 //               : by parsing the new re-written message
 // ----------------------------------------------------------------------------------
 
@@ -37,42 +37,43 @@ type TransformerConfig struct {
 
 }
 
+// Transform stream processor: convert an email to UTF-8
 type Transform struct {
 	sp                  io.Writer
 	isBody              bool // the next bytes to be sent are body?
 	buf                 bytes.Buffer
-	current             *mime.Part
+	current             *mimeparse.Part
 	decoder             io.Reader
 	pr                  *io.PipeReader
 	pw                  *io.PipeWriter
-	partsCachedOriginal *mime.Parts
+	partsCachedOriginal *mimeparse.Parts
 	envelope            *mail.Envelope
 
 	// we re-parse the output since the counts have changed
 	// parser implements the io.Writer interface, here output will be sent to it and then forwarded to the next processor
-	parser *mime.Parser
+	parser *mimeparse.Parser
 }
 
-// cache the original parts from envelope.Values
+// swap caches the original parts from envelope.MimeParts
 // and point them to our parts
-func (t *Transform) swap() *mime.Parts {
-
-	if parts, ok := t.envelope.Values["MimeParts"].(*mime.Parts); ok {
+func (t *Transform) swap() *mimeparse.Parts {
+	parts := t.envelope.MimeParts
+	if parts != nil {
 		t.partsCachedOriginal = parts
 		parts = &t.parser.Parts
 		return parts
 	}
 	return nil
-
 }
 
-// point the parts from envelope.Values back to the original ones
+// unswap points the parts from MimeParts back to the original ones
 func (t *Transform) unswap() {
-	if _, ok := t.envelope.Values["MimeParts"].(*mime.Parts); ok {
-		t.envelope.Values["MimeParts"] = t.partsCachedOriginal
+	if t.envelope.MimeParts != nil {
+		t.envelope.MimeParts = t.partsCachedOriginal
 	}
 }
 
+// regexpCharset captures the charset value
 var regexpCharset = regexp.MustCompile("(?i)charset=\"?(.+)\"?") // (?i) is a flag for case-insensitive
 
 func (t *Transform) ReWrite(b []byte, last bool, offset uint) (count int, err error) {
@@ -139,7 +140,7 @@ func (t *Transform) ReWrite(b []byte, last bool, offset uint) (count int, err er
 		// Body Decode, how it works:
 		// First, the decoder is setup, depending on the source encoding type.
 		// Next, since the decoder is an io.Reader, we need to use a pipe to connect it.
-		// Subsequent calls write to the pipe in a gouritine and the parent-thread copies the result to the output stream
+		// Subsequent calls write to the pipe in a goroutine and the parent-thread copies the result to the output stream
 		// The routine stops feeding the decoder data before EndingPosBody, and not decoding anything after, but still
 		// outputting the un-decoded remainder.
 		// The decoder is destroyed at the end of the body (when last == true)
@@ -260,7 +261,7 @@ func Transformer() *StreamDecorator {
 			sd.Open = func(e *mail.Envelope) error {
 				envelope = e
 				if reWriter.parser == nil {
-					reWriter.parser = mime.NewMimeParserWriter(sp)
+					reWriter.parser = mimeparse.NewMimeParserWriter(sp)
 					reWriter.parser.Open()
 				}
 				reWriter.envelope = envelope
@@ -272,7 +273,7 @@ func Transformer() *StreamDecorator {
 				return reWriter.parser.Close()
 			}
 
-			end := func(part *mime.Part, offset uint, p []byte, start uint) (int, error) {
+			end := func(part *mimeparse.Part, offset uint, p []byte, start uint) (int, error) {
 				var err error
 				var count int
 
@@ -290,10 +291,11 @@ func Transformer() *StreamDecorator {
 			return StreamProcessWith(func(p []byte) (count int, err error) {
 				pos = 0
 				written = 0
-				if parts, ok := envelope.Values["MimeParts"].(*mime.Parts); ok && len(*parts) > 0 {
+				parts := envelope.MimeParts
+				if parts != nil && len(*parts) > 0 {
 
-					// we are going to change envelope.Values["MimeParts"] to our own copy with our own counts
-					envelope.Values["MimeParts"] = reWriter.swap()
+					// we are going to change envelope.MimeParts to our own copy with our own counts
+					parts = reWriter.swap()
 					defer func() {
 						reWriter.unswap()
 						total += int64(written)

+ 3 - 3
chunk/buffer.go

@@ -6,7 +6,7 @@ import (
 	"hash"
 	"strings"
 
-	"github.com/flashmob/go-guerrilla/mail/mime"
+	"github.com/flashmob/go-guerrilla/mail/mimeparse"
 )
 
 type flushEvent func() error
@@ -74,7 +74,7 @@ func (c *chunkingBuffer) CapTo(n int) {
 // ChunkingBufferMime decorates chunkingBuffer, specifying that to do when a flush event is triggered
 type ChunkingBufferMime struct {
 	chunkingBuffer
-	current  *mime.Part
+	current  *mimeparse.Part
 	Info     PartsInfo
 	md5      hash.Hash
 	database Storage
@@ -159,7 +159,7 @@ func (b *ChunkingBufferMime) Reset() {
 	b.chunkingBuffer.Reset()
 }
 
-func (b *ChunkingBufferMime) CurrentPart(cp *mime.Part) {
+func (b *ChunkingBufferMime) CurrentPart(cp *mimeparse.Part) {
 	if b.current == nil {
 		b.Info = *NewPartsInfo()
 		b.Info.Parts = make([]ChunkedPart, 0, 3)

+ 12 - 12
chunk/processor.go

@@ -6,7 +6,7 @@ import (
 
 	"github.com/flashmob/go-guerrilla/backends"
 	"github.com/flashmob/go-guerrilla/mail"
-	"github.com/flashmob/go-guerrilla/mail/mime"
+	"github.com/flashmob/go-guerrilla/mail/mimeparse"
 )
 
 // ----------------------------------------------------------------------------------
@@ -26,9 +26,10 @@ import (
 // ----------------------------------------------------------------------------------
 // Config Options: chunksaver_chunk_size - maximum chunk size, in bytes
 // --------------:-------------------------------------------------------------------
-// Input         : e.Values["MimeParts"] Which is of type *mime.Parts, as populated by "mimeanalyzer"
+// Input         : e.MimeParts Which is of type *mime.Parts, as populated by "mimeanalyzer"
 // ----------------------------------------------------------------------------------
-// Output        :
+// Output        : Messages are saved using the Storage interface
+//               : See store_sql.go and store_sql.go as examples
 // ----------------------------------------------------------------------------------
 
 func init() {
@@ -151,7 +152,7 @@ func Chunksaver() *backends.StreamDecorator {
 				if err != nil {
 					return err
 				}
-				e.Values["messageID"] = mid
+				e.MessageID = mid
 				envelope = e
 				return nil
 			}
@@ -163,9 +164,9 @@ func Chunksaver() *backends.StreamDecorator {
 					return err
 				}
 				defer chunkBuffer.Reset()
-				if mid, ok := envelope.Values["messageID"].(uint64); ok {
+				if envelope.MessageID > 0 {
 					err = database.CloseMessage(
-						mid,
+						envelope.MessageID,
 						written,
 						&chunkBuffer.Info,
 						subject,
@@ -180,7 +181,7 @@ func Chunksaver() *backends.StreamDecorator {
 				return nil
 			}
 
-			fillVars := func(parts *mime.Parts, subject, to, from string) (string, string, string) {
+			fillVars := func(parts *mimeparse.Parts, subject, to, from string) (string, string, string) {
 				if len(*parts) > 0 {
 					if subject == "" {
 						if val, ok := (*parts)[0].Headers["Subject"]; ok {
@@ -208,7 +209,7 @@ func Chunksaver() *backends.StreamDecorator {
 			}
 
 			// end() triggers a buffer flush, at the end of a header or part-boundary
-			end := func(part *mime.Part, offset uint, p []byte, start uint) (int, error) {
+			end := func(part *mimeparse.Part, offset uint, p []byte, start uint) (int, error) {
 				var err error
 				var count int
 				// write out any unwritten bytes
@@ -237,11 +238,10 @@ func Chunksaver() *backends.StreamDecorator {
 
 			return backends.StreamProcessWith(func(p []byte) (count int, err error) {
 				pos = 0
-				if envelope.Values == nil {
+				if envelope.MimeParts == nil {
 					return count, errors.New("no message headers found")
-				}
-				if parts, ok := envelope.Values["MimeParts"].(*mime.Parts); ok && len(*parts) > 0 {
-
+				} else if len(*envelope.MimeParts) > 0 {
+					parts := envelope.MimeParts
 					subject, to, from = fillVars(parts, subject, to, from)
 					offset := msgPos
 					chunkBuffer.CurrentPart((*parts)[0])

+ 3 - 1
chunk/store_sql.go

@@ -43,6 +43,7 @@ func (s *StoreSQL) prepareSql() error {
 		s.statements = make(map[string]*sql.Stmt)
 	}
 
+	// begin inserting an email (before saving chunks)
 	if stmt, err := s.db.Prepare(`INSERT INTO ` +
 		s.config.EmailTable +
 		` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, is_tls, is_8bit) 
@@ -52,7 +53,7 @@ func (s *StoreSQL) prepareSql() error {
 		s.statements["insertEmail"] = stmt
 	}
 
-	// begin inserting an email (before saving chunks)
+	// insert a chunk of email's data
 	if stmt, err := s.db.Prepare(`INSERT INTO ` +
 		s.config.ChunkTable +
 		` (data, hash) 
@@ -74,6 +75,7 @@ func (s *StoreSQL) prepareSql() error {
 
 	// Check the existence of a chunk (the reference_count col is incremented if it exists)
 	// This means we can avoid re-inserting an existing chunk, only update its reference_count
+	// check the "affected rows" count after executing query
 	if stmt, err := s.db.Prepare(`
 		UPDATE ` + s.config.ChunkTable + ` 
 			SET reference_count=reference_count+1 

+ 11 - 1
mail/envelope.go

@@ -13,6 +13,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/flashmob/go-guerrilla/mail/mimeparse"
 	"github.com/flashmob/go-guerrilla/mail/smtp"
 )
 
@@ -127,7 +128,7 @@ type Envelope struct {
 	MailFrom Address
 	// Recipients
 	RcptTo []Address
-	// Data stores the header and message body
+	// Data stores the header and message body (when using the non-streaming processor)
 	Data bytes.Buffer
 	// Subject stores the subject of the email, extracted and decoded after calling ParseHeaders()
 	Subject string
@@ -145,6 +146,12 @@ type Envelope struct {
 	QueuedId string
 	// TransportType indicates whenever 8BITMIME extension has been signaled
 	TransportType smtp.TransportType
+	// Size is the length of message, after being written
+	Size int64
+	// MimeParts contain the information about the mime-parts after they have been parsed
+	MimeParts *mimeparse.Parts
+	// MessageID contains the id of the message after it has been written
+	MessageID uint64
 	// ESMTP: true if EHLO was used
 	ESMTP bool
 	// When locked, it means that the envelope is being processed by the backend
@@ -218,6 +225,9 @@ func (e *Envelope) ResetTransaction() {
 	e.Header = nil
 	e.Hashes = make([]string, 0)
 	e.DeliveryHeader = ""
+	e.Size = 0
+	e.MessageID = 0
+	e.MimeParts = nil
 	e.Values = make(map[string]interface{})
 }
 

+ 1 - 1
mail/mime/mime.go → mail/mimeparse/mime.go

@@ -1,4 +1,4 @@
-package mime
+package mimeparse
 
 /*
 

+ 1 - 1
mail/mime/mime_test.go → mail/mimeparse/mime_test.go

@@ -1,4 +1,4 @@
-package mime
+package mimeparse
 
 import (
 	"bytes"

+ 5 - 5
mail/reader.go

@@ -5,13 +5,13 @@ import (
 	"io"
 	"net/textproto"
 
-	"github.com/flashmob/go-guerrilla/mail/mime"
+	"github.com/flashmob/go-guerrilla/mail/mimeparse"
 )
 
 // MimeDotReader parses the mime structure while reading using the underlying reader
 type MimeDotReader struct {
 	R       io.Reader
-	p       *mime.Parser
+	p       *mimeparse.Parser
 	mimeErr error
 }
 
@@ -48,7 +48,7 @@ func (r MimeDotReader) Close() (err error) {
 }
 
 // Parts returns the mime-header parts built by the parser
-func (r *MimeDotReader) Parts() mime.Parts {
+func (r *MimeDotReader) Parts() mimeparse.Parts {
 	return r.p.Parts
 }
 
@@ -65,9 +65,9 @@ func NewMimeDotReader(br *bufio.Reader, maxNodes int) *MimeDotReader {
 	r := new(MimeDotReader)
 	r.R = textproto.NewReader(br).DotReader()
 	if maxNodes > 0 {
-		r.p = mime.NewMimeParserLimited(maxNodes)
+		r.p = mimeparse.NewMimeParserLimited(maxNodes)
 	} else {
-		r.p = mime.NewMimeParser()
+		r.p = mimeparse.NewMimeParser()
 	}
 	r.p.Open()
 	return r