Browse Source

add mime parse error
make ChunkPrefetchCount & ChunkMaxBytes configurable
update comments to parseInfo

flashmob 5 years ago
parent
commit
076e0aa5c7
8 changed files with 90 additions and 26 deletions
  1. 1 0
      chunk/buffer.go
  2. 2 2
      chunk/chunk.go
  3. 11 1
      chunk/processor.go
  4. 6 13
      chunk/reader.go
  5. 0 5
      chunk/store_sql.go
  6. 1 1
      mail/envelope.go
  7. 22 4
      mail/mimeparse/mime.go
  8. 47 0
      mail/mimeparse/mime_test.go

+ 1 - 0
chunk/buffer.go

@@ -160,6 +160,7 @@ func (b *ChunkingBufferMime) Reset() {
 	b.chunkingBuffer.Reset()
 	b.chunkingBuffer.Reset()
 }
 }
 
 
+// CurrentPart sets the current mime part that's being buffered
 func (b *ChunkingBufferMime) CurrentPart(cp *mimeparse.Part) {
 func (b *ChunkingBufferMime) CurrentPart(cp *mimeparse.Part) {
 	if b.current == nil {
 	if b.current == nil {
 		b.Info = *NewPartsInfo()
 		b.Info = *NewPartsInfo()

+ 2 - 2
chunk/chunk.go

@@ -24,7 +24,7 @@ func (h *HashKey) Pack(b []byte) {
 	copy(h[:], b[0:hashByteSize])
 	copy(h[:], b[0:hashByteSize])
 }
 }
 
 
-// String implements the Stringer interface from fmt
+// String implements the Stringer interface from fmt.Stringer
 func (h HashKey) String() string {
 func (h HashKey) String() string {
 	return base64.RawStdEncoding.EncodeToString(h[0:hashByteSize])
 	return base64.RawStdEncoding.EncodeToString(h[0:hashByteSize])
 }
 }
@@ -60,7 +60,7 @@ type PartsInfo struct {
 	HasAttach   bool          `json:"a"`   // is there an attachment?
 	HasAttach   bool          `json:"a"`   // is there an attachment?
 	Parts       []ChunkedPart `json:"p"`   // info describing a mime-part
 	Parts       []ChunkedPart `json:"p"`   // info describing a mime-part
 	CBoundaries []string      `json:"cbl"` // content boundaries list
 	CBoundaries []string      `json:"cbl"` // content boundaries list
-
+	Err         error         `json:"e"`   // any error encountered (mimeparse.MimeError)
 }
 }
 
 
 var bp sync.Pool // bytes.buffer pool
 var bp sync.Pool // bytes.buffer pool

+ 11 - 1
chunk/processor.go

@@ -26,6 +26,7 @@ import (
 // Requires      : "mimeanalyzer" stream processor to be enabled before it
 // Requires      : "mimeanalyzer" stream processor to be enabled before it
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
 // Config Options: chunk_size - maximum chunk size, in bytes
 // Config Options: chunk_size - maximum chunk size, in bytes
+//               : storage_engine - "sql" or "memory", or make your own extension
 // --------------:-------------------------------------------------------------------
 // --------------:-------------------------------------------------------------------
 // Input         : e.MimeParts Which is of type *mime.Parts, as populated by "mimeanalyzer"
 // Input         : e.MimeParts Which is of type *mime.Parts, as populated by "mimeanalyzer"
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
@@ -42,7 +43,11 @@ func init() {
 type Config struct {
 type Config struct {
 	// ChunkMaxBytes controls the maximum buffer size for saving
 	// ChunkMaxBytes controls the maximum buffer size for saving
 	// 16KB default.
 	// 16KB default.
-	ChunkMaxBytes int    `json:"chunk_size,omitempty"`
+	ChunkMaxBytes int `json:"chunk_size,omitempty"`
+	// ChunkPrefetchCount specifies how many chunks to pre-fetch when reading from storage.
+	// It may reduce the number of trips required to storage
+	ChunkPrefetchCount int `json:"chunk_prefetch_count,omitempty"`
+	// StorageEngine specifies which storage engine to use (see the StorageEngines map)
 	StorageEngine string `json:"storage_engine,omitempty"`
 	StorageEngine string `json:"storage_engine,omitempty"`
 }
 }
 
 
@@ -122,6 +127,7 @@ func Chunksaver() *backends.StreamDecorator {
 
 
 		}
 		}
 		r, err := NewChunkedReader(database, email, 0)
 		r, err := NewChunkedReader(database, email, 0)
+		r.ChunkPrefetchCount = config.ChunkPrefetchCount
 		return r, err
 		return r, err
 	}
 	}
 
 
@@ -175,6 +181,10 @@ func Chunksaver() *backends.StreamDecorator {
 					// TODO we could delete the half saved message here
 					// TODO we could delete the half saved message here
 					return err
 					return err
 				}
 				}
+				if mimeErr, ok := envelope.MimeError.(*mimeparse.Error); ok {
+					mErr := mimeErr.Unwrap()
+					chunkBuffer.Info.Err = mErr
+				}
 				defer chunkBuffer.Reset()
 				defer chunkBuffer.Reset()
 				if envelope.MessageID > 0 {
 				if envelope.MessageID > 0 {
 					err = database.CloseMessage(
 					err = database.CloseMessage(

+ 6 - 13
chunk/reader.go

@@ -15,7 +15,8 @@ type chunkedReader struct {
 	// i is which part it's currently reading, j is which chunk of a part
 	// i is which part it's currently reading, j is which chunk of a part
 	i, j int
 	i, j int
 
 
-	cache cachedChunks
+	cache              cachedChunks
+	ChunkPrefetchCount int
 }
 }
 
 
 // NewChunkedReader loads the email and selects which mime-part Read will read, starting from 1
 // NewChunkedReader loads the email and selects which mime-part Read will read, starting from 1
@@ -34,6 +35,7 @@ func NewChunkedReader(db Storage, email *Email, part int) (*chunkedReader, error
 	r.cache = cachedChunks{
 	r.cache = cachedChunks{
 		db: db,
 		db: db,
 	}
 	}
+	r.ChunkPrefetchCount = chunkCachePreload
 	return r, nil
 	return r, nil
 }
 }
 
 
@@ -68,7 +70,7 @@ type cachedChunks struct {
 const chunkCachePreload = 2
 const chunkCachePreload = 2
 
 
 // warm allocates the chunk cache, and gets the first few and stores them in the cache
 // warm allocates the chunk cache, and gets the first few and stores them in the cache
-func (c *cachedChunks) warm(hashes ...HashKey) (int, error) {
+func (c *cachedChunks) warm(preload int, hashes []HashKey) (int, error) {
 
 
 	if c.hashIndex == nil {
 	if c.hashIndex == nil {
 		c.hashIndex = make(map[int]HashKey, len(hashes))
 		c.hashIndex = make(map[int]HashKey, len(hashes))
@@ -81,7 +83,6 @@ func (c *cachedChunks) warm(hashes ...HashKey) (int, error) {
 		return len(c.chunks), nil
 		return len(c.chunks), nil
 	}
 	}
 	// let's pre-load some hashes.
 	// let's pre-load some hashes.
-	preload := chunkCachePreload
 	if len(hashes) < preload {
 	if len(hashes) < preload {
 		preload = len(hashes)
 		preload = len(hashes)
 	}
 	}
@@ -107,13 +108,6 @@ func (c *cachedChunks) get(i int) (*Chunk, error) {
 	if i > len(c.chunks) {
 	if i > len(c.chunks) {
 		return nil, errors.New("not enough chunks")
 		return nil, errors.New("not enough chunks")
 	}
 	}
-	defer func() {
-		if len(c.chunks) > 15 {
-			fmt.Println("moo")
-			//fmt.Println("hash", hash[i].Hex(),  "i", i)
-		}
-	}()
-
 	if c.chunks[i] != nil {
 	if c.chunks[i] != nil {
 		// cache hit!
 		// cache hit!
 		return c.chunks[i], nil
 		return c.chunks[i], nil
@@ -142,13 +136,12 @@ func (c *cachedChunks) get(i int) (*Chunk, error) {
 			if i-1 > -1 {
 			if i-1 > -1 {
 				for j := i - 1; j > -1; j-- {
 				for j := i - 1; j > -1; j-- {
 					if c.chunks[j] != nil {
 					if c.chunks[j] != nil {
-						// todo		c.chunks[j] = nil
+						c.chunks[j] = nil
 					} else {
 					} else {
 						break
 						break
 					}
 					}
 				}
 				}
 			}
 			}
-
 			// return the chunk asked for
 			// return the chunk asked for
 			return chunks[0], nil
 			return chunks[0], nil
 		}
 		}
@@ -169,7 +162,7 @@ func (c *cachedChunks) empty() {
 func (r *chunkedReader) Read(p []byte) (n int, err error) {
 func (r *chunkedReader) Read(p []byte) (n int, err error) {
 	var length int
 	var length int
 	for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
 	for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
-		length, err = r.cache.warm(r.email.partsInfo.Parts[r.i].ChunkHash...)
+		length, err = r.cache.warm(r.ChunkPrefetchCount, r.email.partsInfo.Parts[r.i].ChunkHash)
 		if err != nil {
 		if err != nil {
 			return
 			return
 		}
 		}

+ 0 - 5
chunk/store_sql.go

@@ -141,8 +141,6 @@ func (s *StoreSQL) connect() (*sql.DB, error) {
 		}
 		}
 		s.db.SetConnMaxLifetime(t)
 		s.db.SetConnMaxLifetime(t)
 	}
 	}
-	stats := s.db.Stats()
-	fmt.Println(stats)
 	// do we have permission to access the table?
 	// do we have permission to access the table?
 	_, err = s.db.Query("SELECT mail_id FROM " + s.config.EmailTable + " LIMIT 1")
 	_, err = s.db.Query("SELECT mail_id FROM " + s.config.EmailTable + " LIMIT 1")
 	if err != nil {
 	if err != nil {
@@ -452,7 +450,6 @@ func (s *StoreSQL) GetChunks(hash ...HashKey) ([]*Chunk, error) {
 		); err != nil {
 		); err != nil {
 			return result, err
 			return result, err
 		}
 		}
-
 		c.data = bytes.NewBuffer(data)
 		c.data = bytes.NewBuffer(data)
 		c.modifiedAt = createdAt.Time
 		c.modifiedAt = createdAt.Time
 		temp[h] = &c
 		temp[h] = &c
@@ -537,8 +534,6 @@ type protocol struct {
 	mail.Protocol
 	mail.Protocol
 }
 }
 
 
-// todo scanners for protocol & transport
-
 // Scan implements database/sql scanner interface, for parsing smtp.TransportType
 // Scan implements database/sql scanner interface, for parsing smtp.TransportType
 func (t *transportType) Scan(value interface{}) error {
 func (t *transportType) Scan(value interface{}) error {
 	if data, ok := value.([]uint8); ok {
 	if data, ok := value.([]uint8); ok {

+ 1 - 1
mail/envelope.go

@@ -160,7 +160,7 @@ type Envelope struct {
 	MimeParts *mimeparse.Parts
 	MimeParts *mimeparse.Parts
 	// MimeError contains any error encountered when parsing mime using the mimeanalyzer
 	// MimeError contains any error encountered when parsing mime using the mimeanalyzer
 	MimeError error
 	MimeError error
-	// MessageID contains theR id of the message after it has been written
+	// MessageID contains the id of the message after it has been written
 	MessageID uint64
 	MessageID uint64
 	// Remote IP address
 	// Remote IP address
 	RemoteIP string
 	RemoteIP string

+ 22 - 4
mail/mimeparse/mime.go

@@ -115,6 +115,20 @@ func (e MimeError) Error() string {
 	return "unknown mime error"
 	return "unknown mime error"
 }
 }
 
 
+func (e *MimeError) UnmarshalJSON(b []byte) error {
+	v, err := strconv.ParseInt(string(b), 10, 32)
+	if err != nil {
+		return err
+	}
+	*e = MimeError(v)
+	return nil
+}
+
+// MarshalJSON implements json.Marshaler
+func (e MimeError) MarshalJSON() ([]byte, error) {
+	return []byte(strconv.Itoa(int(e))), nil
+}
+
 // Error implements the error interface
 // Error implements the error interface
 type Error struct {
 type Error struct {
 	err  error
 	err  error
@@ -127,18 +141,22 @@ func (e Error) Error() string {
 	if e.char == 0 {
 	if e.char == 0 {
 		return e.err.Error()
 		return e.err.Error()
 	}
 	}
-	return e.err.Error() + " char:[" + string(e.char) + "], peek:" +
-		string(e.peek) + ", pos:" + strconv.Itoa(int(e.pos))
+	return e.err.Error() + " char:[" + string(e.char) + "], peek:[" +
+		string(e.peek) + "], pos:" + strconv.Itoa(int(e.pos))
+}
+
+func (e Error) Unwrap() error {
+	return e.err
 }
 }
 
 
 func (e *Error) ParseError() bool {
 func (e *Error) ParseError() bool {
-	if e.err != io.EOF && e.err != NotMineErr && e.err != MaxNodesErr {
+	if e.err != io.EOF && error(e.err) != NotMineErr && error(e.err) != MaxNodesErr {
 		return true
 		return true
 	}
 	}
 	return false
 	return false
 }
 }
 
 
-func (p *Parser) newParseError(e error) *Error {
+func (p *Parser) newParseError(e MimeError) *Error {
 	var peek byte
 	var peek byte
 	offset := 1
 	offset := 1
 	for {
 	for {

+ 47 - 0
mail/mimeparse/mime_test.go

@@ -2,6 +2,7 @@ package mimeparse
 
 
 import (
 import (
 	"bytes"
 	"bytes"
+	"encoding/json"
 	"fmt"
 	"fmt"
 	"io"
 	"io"
 	"strconv"
 	"strconv"
@@ -695,3 +696,49 @@ func TestNonMineEmailBigBody(t *testing.T) {
 	}
 	}
 
 
 }
 }
+
+func TestMimeErr(t *testing.T) {
+	p := NewMimeParser()
+	p.Open()
+	// the error is missing subtype
+	data :=
+
+		`To "moo": j m
+Subject: and a predicate
+MIME-Version: 1.0
+Content-Type: text;
+Content-Transfer-Encoding: 1
+
+Rock the microphone and then I’m gone
+
+`
+	i, err := p.Write([]byte(data))
+
+	if err != nil {
+		if mimeErr, ok := err.(*Error); !ok {
+			t.Error("not a *MimeError type")
+			return
+		} else {
+			b, err := json.Marshal(mimeErr.Unwrap())
+			if err != nil {
+				t.Error(err)
+				return
+			}
+			if string(b) != "8" {
+				t.Error("expecting error be 8")
+				return
+			}
+			var parsedErr MimeError
+			json.Unmarshal(b, &parsedErr)
+			if parsedErr != ErrorMissingSubtype {
+				t.Error("expecting error to be ErrorMissingSubtype, got:", parsedErr)
+				return
+			}
+		}
+	}
+	if i != 148 {
+		t.Error("test was expecting to read 148 bytes, got", i)
+	}
+	err = p.Close()
+
+}