Browse Source

- chunksaver is now chunks emails correctly
- database backend taking shape

flashmob 6 years ago
parent
commit
ec97c4f347
4 changed files with 400 additions and 41 deletions
  1. 1 1
      api_test.go
  2. 376 32
      backends/s_chunksaver.go
  3. 22 7
      backends/s_chunksaver_test.go
  4. 1 1
      mail/mime/mime_test.go

+ 1 - 1
api_test.go

@@ -1153,7 +1153,7 @@ Bill
 --DC8------------DC8638F443D87A7F0726DEF7
 Content-Type: image/gif; name="map_of_Argentina.gif"
 Content-Transfer-Encoding: base64
-Content-Disposition: in1ine; fi1ename="map_of_Argentina.gif"
+Content-Disposition: inline; filename="map_of_Argentina.gif"
 
 R01GOD1hJQA1AKIAAP/////78P/omn19fQAAAAAAAAAAAAAAACwAAAAAJQA1AAAD7Qi63P5w
 wEmjBCLrnQnhYCgM1wh+pkgqqeC9XrutmBm7hAK3tP31gFcAiFKVQrGFR6kscnonTe7FAAad

+ 376 - 32
backends/s_chunksaver.go

@@ -23,10 +23,17 @@ package backends
 // ----------------------------------------------------------------------------------
 
 import (
+	"crypto/md5"
+	"database/sql"
+	"encoding/binary"
+	"encoding/json"
 	"errors"
 	"fmt"
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mail/mime"
+	"hash"
+	"net"
+	"strings"
 )
 
 type chunkSaverConfig struct {
@@ -42,29 +49,38 @@ func init() {
 }
 
 type partsInfo struct {
-	Count     uint32 // number of parts
-	TextPart  int    // id of the main text part to display
-	HTMLPart  int    // id of the main html part to display (if any)
-	HasAttach bool
-	Parts     []chunkedParts
+	Count     uint32        `json:"c"`  // number of parts
+	TextPart  int           `json:"tp"` // id of the main text part to display
+	HTMLPart  int           `json:"hp"` // id of the main html part to display (if any)
+	HasAttach bool          `json:"a"`
+	Parts     []chunkedPart `json:"p"`
 }
 
-type chunkedParts struct {
-	PartId             string
-	ChunkHash          [][32]byte // sequence of hashes the data is stored at
-	ContentType        string
-	Charset            string
-	TransferEncoding   string
-	ContentDisposition string
+type chunkedPart struct {
+	PartId             string     `json:"i"`
+	ChunkHash          [][16]byte `json:"h"` // sequence of hashes the data is stored at
+	ContentType        string     `json:"t"`
+	Charset            string     `json:"c"`
+	TransferEncoding   string     `json:"e"`
+	ContentDisposition string     `json:"d"`
 }
 
+type flushEvent func()
+
 type chunkedBytesBuffer struct {
-	buf []byte
+	buf          []byte
+	flushTrigger flushEvent
 }
 
 // flush signals that it's time to write the buffer out to disk
 func (c *chunkedBytesBuffer) flush() {
+	if len(c.buf) == 0 {
+		return
+	}
 	fmt.Print(string(c.buf))
+	if c.flushTrigger != nil {
+		c.flushTrigger()
+	}
 	c.Reset()
 }
 
@@ -74,8 +90,7 @@ func (c *chunkedBytesBuffer) Reset() {
 }
 
 // Write takes a p slice of bytes and writes it to the buffer.
-// It will never grow the buffer, flushing it as
-// soon as it's full. It will also flush it after the entire slice is written
+// It will never grow the buffer, flushing it as soon as it's full.
 func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
 	remaining := len(p)
 	bufCap := cap(c.buf)
@@ -85,26 +100,337 @@ func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
 			// enough of room in the buffer
 			c.buf = append(c.buf, p[i:i+remaining]...)
 			i += remaining
-			c.flush()
 			return
 		} else {
 			// fill the buffer to the 'brim' with a slice from p
-			c.buf = append(c.buf, p[i:i+bufCap]...)
-			remaining -= bufCap
-			i += bufCap
+			c.buf = append(c.buf, p[i:i+free]...)
+			remaining -= free
+			i += free
 			c.flush()
+			if remaining == 0 {
+				return
+			}
 		}
 	}
+
 }
 
-// CapTo caps the internal buffer to specified number of bytes, sets the length back to 0
-func (c *chunkedBytesBuffer) CapTo(n int) {
+// capTo caps the internal buffer to specified number of bytes, sets the length back to 0
+func (c *chunkedBytesBuffer) capTo(n int) {
 	if cap(c.buf) == n {
 		return
 	}
 	c.buf = make([]byte, 0, n)
 }
 
+type chunkedBytesBufferMime struct {
+	chunkedBytesBuffer
+	current *mime.Part
+	info    partsInfo
+	md5     hash.Hash
+}
+
+func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
+	b := new(chunkedBytesBufferMime)
+	var chash [16]byte
+	b.chunkedBytesBuffer.flushTrigger = func() {
+		b.md5.Write(b.buf)
+		copy(chash[:], b.md5.Sum([]byte{}))
+		if b.current != nil {
+			if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
+				// existing part, just append the hash
+				lastPart := &b.info.Parts[size-1]
+				lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
+				b.fillInfo(lastPart, size-1)
+			} else {
+				// add it as a new part
+				part := chunkedPart{
+					PartId:    b.current.Node,
+					ChunkHash: [][16]byte{chash},
+				}
+				b.fillInfo(&part, 0)
+				b.info.Parts = append(b.info.Parts, part)
+				b.info.Count++
+			}
+			// TODO : send chunk to db
+			// db.savechunk(
+		}
+	}
+	b.md5 = md5.New()
+	return b
+}
+
+func (b *chunkedBytesBufferMime) fillInfo(cp *chunkedPart, index int) {
+	if cp.ContentType == "" && b.current.ContentType != nil {
+		cp.ContentType = b.current.ContentType.String()
+	}
+	if cp.Charset == "" && b.current.Charset != "" {
+		cp.Charset = b.current.Charset
+	}
+	if cp.TransferEncoding == "" && b.current.TransferEncoding != "" {
+		cp.TransferEncoding = b.current.TransferEncoding
+	}
+	if cp.ContentDisposition == "" && b.current.ContentDisposition != "" {
+		cp.ContentDisposition = b.current.ContentDisposition
+		if strings.Contains(cp.ContentDisposition, "attach") {
+			b.info.HasAttach = true
+		}
+	}
+	if cp.ContentType != "" {
+		if b.info.TextPart == -1 && strings.Contains(cp.ContentType, "text/plain") {
+			b.info.TextPart = index
+		} else if b.info.HTMLPart == -1 && strings.Contains(cp.ContentType, "text/html") {
+			b.info.HTMLPart = index
+		}
+	}
+}
+
+func (b *chunkedBytesBufferMime) Reset() {
+	b.md5.Reset()
+	b.chunkedBytesBuffer.Reset()
+}
+
+func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
+	if b.current == nil {
+		b.info = partsInfo{Parts: make([]chunkedPart, 0, 3), TextPart: -1, HTMLPart: -1}
+	}
+	b.current = cp
+
+}
+
+type ChunkSaverStorage interface {
+	OpenMessage(
+		from []byte,
+		helo []byte,
+		recipient []byte,
+		ipAddress net.IPAddr,
+		returnPath []byte,
+		isTLS bool) (mailID uint64, err error)
+	CloseMessage(
+		mailID uint64,
+		size uint,
+		partsInfo *partsInfo,
+		subject []byte,
+		charset []byte,
+		deliveryID []byte,
+		to []byte) error
+	AddChunk(data []byte, hash []byte) error
+	Initialize(cfg BackendConfig) error
+	Shutdown() (err error)
+}
+
+type chunkSaverSQLConfig struct {
+	EmailTable  string `json:"email_table"`
+	ChunkTable  string `json:"chunk_table"`
+	Driver      string `json:"sql_driver"`
+	DSN         string `json:"sql_dsn"`
+	PrimaryHost string `json:"primary_mail_host"`
+}
+
+type chunkSaverSQL struct {
+	config     *chunkSaverSQLConfig
+	statements map[string]*sql.Stmt
+	db         *sql.DB
+}
+
+func (c *chunkSaverSQL) connect() (*sql.DB, error) {
+	var err error
+	if c.db, err = sql.Open(c.config.Driver, c.config.DSN); err != nil {
+		Log().Error("cannot open database: ", err)
+		return nil, err
+	}
+	// do we have permission to access the table?
+	_, err = c.db.Query("SELECT mail_id FROM " + c.config.EmailTable + " LIMIT 1")
+	if err != nil {
+		return nil, err
+	}
+	return c.db, err
+}
+
+func (c *chunkSaverSQL) prepareSql() error {
+	if c.statements == nil {
+		c.statements = make(map[string]*sql.Stmt)
+	}
+
+	if stmt, err := c.db.Prepare(`INSERT INTO ` +
+		c.config.EmailTable +
+		` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, is_tls) 
+ VALUES(?, ?, ?, ?, ?, ?, ?)`); err != nil {
+		return err
+	} else {
+		c.statements["insertEmail"] = stmt
+	}
+
+	// begin inserting an email (before saving chunks)
+	if stmt, err := c.db.Prepare(`INSERT INTO ` +
+		c.config.ChunkTable +
+		` (data, hash) 
+ VALUES(?, ?)`); err != nil {
+		return err
+	} else {
+		c.statements["insertChunk"] = stmt
+	}
+
+	// finalize the email (the connection closed)
+	if stmt, err := c.db.Prepare(`
+		UPDATE ` + c.config.EmailTable + ` 
+			SET size=?, parts_info = ?, subject, charset = ?, delivery_id = ?, to = ? 
+		WHERE mail_id = ? `); err != nil {
+		return err
+	} else {
+		c.statements["finalizeEmail"] = stmt
+	}
+
+	// Check the existence of a chunk (the reference_count col is incremented if it exists)
+	// This means we can avoid re-inserting an existing chunk, only update its reference_count
+	if stmt, err := c.db.Prepare(`
+		UPDATE ` + c.config.ChunkTable + ` 
+			SET reference_count=reference_count+1 
+		WHERE hash = ? `); err != nil {
+		return err
+	} else {
+		c.statements["chunkReferenceIncr"] = stmt
+	}
+
+	// If the reference_count is 0 then it means the chunk has been deleted
+	// Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
+	if stmt, err := c.db.Prepare(`
+		UPDATE ` + c.config.ChunkTable + ` 
+			SET reference_count=reference_count-1 
+		WHERE hash = ? AND reference_count > 0`); err != nil {
+		return err
+	} else {
+		c.statements["chunkReferenceDecr"] = stmt
+	}
+
+	// fetch an email
+	if stmt, err := c.db.Prepare(`
+		SELECT * 
+		from ` + c.config.EmailTable + ` 
+		where mail_id=?`); err != nil {
+		return err
+	} else {
+		c.statements["selectMail"] = stmt
+	}
+
+	// fetch a chunk
+	if stmt, err := c.db.Prepare(`
+		SELECT * 
+		from ` + c.config.ChunkTable + ` 
+		where hash=?`); err != nil {
+		return err
+	} else {
+		c.statements["selectChunk"] = stmt
+	}
+
+	// sweep old chunks
+
+	// sweep incomplete emails
+
+	return nil
+}
+
+func (c *chunkSaverSQL) OpenMessage(
+	from []byte,
+	helo []byte,
+	recipient []byte,
+	ipAddress net.IPAddr,
+	returnPath []byte,
+	isTLS bool) (mailID uint64, err error) {
+
+	// if it's ipv4 then we want ipv6 to be 0, and vice-versa
+	var ip4 uint32
+	ip6 := make([]byte, 16)
+	if ip := ipAddress.IP.To4(); ip != nil {
+		ip4 = binary.BigEndian.Uint32(ip)
+	} else {
+		_ = copy(ip6, []byte(ipAddress.IP))
+	}
+	r, err := c.statements["insertEmail"].Exec(from, helo, recipient, ip4, ip6, returnPath, isTLS)
+	if err != nil {
+		return 0, err
+	}
+	id, err := r.LastInsertId()
+	if err != nil {
+		return 0, err
+	}
+	return uint64(id), err
+}
+
+func (c *chunkSaverSQL) AddChunk(data []byte, hash []byte) error {
+	// attempt to increment the reference_count (it means the chunk is already in there)
+	r, err := c.statements["chunkReferenceIncr"].Exec(hash)
+	if err != nil {
+		return err
+	}
+	affected, err := r.RowsAffected()
+	if err != nil {
+		return err
+	}
+	if affected == 0 {
+		// chunk isn't in there, let's insert it
+		_, err := c.statements["insertChunk"].Exec(data, hash)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (c *chunkSaverSQL) CloseMessage(
+	mailID uint64,
+	size uint,
+	partsInfo *partsInfo,
+	subject []byte,
+	charset []byte,
+	deliveryID []byte,
+	to []byte) error {
+	partsInfoJson, err := json.Marshal(partsInfo)
+	if err != nil {
+		return err
+	}
+	_, err = c.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, charset, deliveryID, to, mailID)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// Initialize loads the specific database config, connects to the db, prepares statements
+func (c *chunkSaverSQL) Initialize(cfg BackendConfig) error {
+	configType := BaseConfig(&chunkSaverSQLConfig{})
+	bcfg, err := Svc.ExtractConfig(cfg, configType)
+	if err != nil {
+		return err
+	}
+	c.config = bcfg.(*chunkSaverSQLConfig)
+	c.db, err = c.connect()
+	if err != nil {
+		return err
+	}
+	err = c.prepareSql()
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (c *chunkSaverSQL) Shutdown() (err error) {
+	defer func() {
+		closeErr := c.db.Close()
+		if closeErr != err {
+			Log().WithError(err).Error("failed to close sql database")
+			err = closeErr
+		}
+	}()
+	for i := range c.statements {
+		if err = c.statements[i].Close(); err != nil {
+			Log().WithError(err).Error("failed to close sql statement")
+		}
+	}
+	return err
+}
+
 const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
 /**
 *
@@ -123,14 +449,15 @@ func Chunksaver() *StreamDecorator {
 		func(sp StreamProcessor) StreamProcessor {
 			var (
 				envelope    *mail.Envelope
-				chunkBuffer chunkedBytesBuffer
-				info        partsInfo
+				chunkBuffer *chunkedBytesBufferMime
 				msgPos      uint
+				database    ChunkSaverStorage
 			)
 
 			var config *chunkSaverConfig
 
 			Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
+				chunkBuffer = newChunkedBytesBufferMime()
 				configType := BaseConfig(&chunkSaverConfig{})
 				bcfg, err := Svc.ExtractConfig(backendConfig, configType)
 				if err != nil {
@@ -138,26 +465,31 @@ func Chunksaver() *StreamDecorator {
 				}
 				config = bcfg.(*chunkSaverConfig)
 				if config.ChunkMaxBytes > 0 {
-					chunkBuffer.CapTo(config.ChunkMaxBytes)
+					chunkBuffer.capTo(config.ChunkMaxBytes)
 				} else {
-					chunkBuffer.CapTo(chunkMaxBytes)
+					chunkBuffer.capTo(chunkMaxBytes)
+				}
+				database = new(chunkSaverSQL)
+				err = database.Initialize(backendConfig)
+				if err != nil {
+					return err
 				}
 				return nil
 			}))
 
 			Svc.AddShutdowner(ShutdownWith(func() error {
-				return nil
+				err := database.Shutdown()
+				return err
 			}))
 
 			sd.Open = func(e *mail.Envelope) error {
 				// create a new entry & grab the id
 				envelope = e
-				info = partsInfo{Parts: make([]chunkedParts, 0, 3)}
-				_ = info.Count
 				return nil
 			}
 
 			sd.Close = func() error {
+				chunkBuffer.flush()
 				chunkBuffer.Reset()
 				return nil
 			}
@@ -169,14 +501,25 @@ func Chunksaver() *StreamDecorator {
 				}
 
 				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok {
-					var pos int
+					var (
+						pos      int
+						progress int
+					)
+					if len(*parts) > 2 {
+						progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already out
+					}
+
 					offset := msgPos
-					for i := range *parts {
+					for i := progress; i < len(*parts); i++ {
 						part := (*parts)[i]
+						chunkBuffer.currentPart(part)
 
 						// break chunk on new part
 						if part.StartingPos > 0 && part.StartingPos > msgPos {
 							count, _ := chunkBuffer.Write(p[pos : part.StartingPos-offset])
+
+							chunkBuffer.flush()
+							fmt.Println("->N")
 							pos += count
 							msgPos = part.StartingPos
 						}
@@ -184,7 +527,9 @@ func Chunksaver() *StreamDecorator {
 						// break chunk on header
 						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
 							count, _ := chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
+
 							chunkBuffer.flush()
+							fmt.Println("->H")
 							pos += count
 							msgPos = part.StartingPosBody
 						}
@@ -196,7 +541,7 @@ func Chunksaver() *StreamDecorator {
 							msgPos += uint(count)
 						}
 
-						// break out if there's no more data to write out
+						// if there's no more data
 						if pos >= len(p) {
 							break
 						}
@@ -204,7 +549,6 @@ func Chunksaver() *StreamDecorator {
 				}
 				return sp.Write(p)
 			})
-
 		}
 
 	return sd

+ 22 - 7
backends/s_chunksaver_test.go

@@ -1,7 +1,6 @@
 package backends
 
 import (
-	"fmt"
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mail/mime"
 	"testing"
@@ -11,7 +10,7 @@ func TestChunkedBytesBuffer(t *testing.T) {
 	var in string
 
 	var buf chunkedBytesBuffer
-	buf.CapTo(64)
+	buf.capTo(64)
 
 	// the data to write is over-aligned
 	in = `123456789012345678901234567890123456789012345678901234567890abcde12345678901234567890123456789012345678901234567890123456789abcdef` // len == 130
@@ -22,7 +21,7 @@ func TestChunkedBytesBuffer(t *testing.T) {
 
 	// the data to write is aligned
 	var buf2 chunkedBytesBuffer
-	buf2.CapTo(64)
+	buf2.capTo(64)
 	in = `123456789012345678901234567890123456789012345678901234567890abcde12345678901234567890123456789012345678901234567890123456789abcd` // len == 128
 	i, _ = buf2.Write([]byte(in[:]))
 	if i != len(in) {
@@ -31,7 +30,7 @@ func TestChunkedBytesBuffer(t *testing.T) {
 
 	// the data to write is under-aligned
 	var buf3 chunkedBytesBuffer
-	buf3.CapTo(64)
+	buf3.capTo(64)
 	in = `123456789012345678901234567890123456789012345678901234567890abcde12345678901234567890123456789012345678901234567890123456789ab` // len == 126
 	i, _ = buf3.Write([]byte(in[:]))
 	if i != len(in) {
@@ -40,12 +39,23 @@ func TestChunkedBytesBuffer(t *testing.T) {
 
 	// the data to write is smaller than the buffer
 	var buf4 chunkedBytesBuffer
-	buf4.CapTo(64)
+	buf4.capTo(64)
 	in = `1234567890` // len == 10
 	i, _ = buf4.Write([]byte(in[:]))
 	if i != len(in) {
 		t.Error("did not write", len(in), "bytes")
 	}
+
+	// what if the buffer already contains stuff before Write is called
+	// and the buffer len is smaller than the len of the slice of bytes we pass it?
+	var buf5 chunkedBytesBuffer
+	buf5.capTo(5)
+	buf5.buf = append(buf5.buf, []byte{'a', 'b', 'c'}...)
+	in = `1234567890` // len == 10
+	i, _ = buf5.Write([]byte(in[:]))
+	if i != len(in) {
+		t.Error("did not write", len(in), "bytes")
+	}
 }
 
 var email = `From:  Al Gore <[email protected]>
@@ -115,7 +125,7 @@ Bill
 --DC8------------DC8638F443D87A7F0726DEF7
 Content-Type: image/gif; name="map_of_Argentina.gif"
 Content-Transfer-Encoding: base64
-Content-Disposition: in1ine; fi1ename="map_of_Argentina.gif"
+Content-Disposition: attachment; filename="map_of_Argentina.gif"
 
 R01GOD1hJQA1AKIAAP/////78P/omn19fQAAAAAAAAAAAAAAACwAAAAAJQA1AAAD7Qi63P5w
 wEmjBCLrnQnhYCgM1wh+pkgqqeC9XrutmBm7hAK3tP31gFcAiFKVQrGFR6kscnonTe7FAAad
@@ -155,6 +165,8 @@ func TestChunkSaverWrite(t *testing.T) {
 	// let's test it
 
 	writeIt(parser, t, stream, 128)
+
+	_ = chunksaver.Close()
 	//writeIt(parser, t, stream, 128000)
 }
 
@@ -164,15 +176,18 @@ func writeIt(parser *mime.Parser, t *testing.T, stream StreamProcessor, size int
 		size = len(email)
 	}
 	total := 0
+
+	// break up the email in to chunks of size. Feed them through the mime parser
 	for msgPos := 0; msgPos < len(email); msgPos += size {
 		err := parser.Parse([]byte(email)[msgPos : msgPos+size])
 		if err != nil {
 			t.Error(err)
 			t.Fail()
 		}
+		// todo: close parser on last chunk! (and finalize save)
 		cut := msgPos + size
 		if cut > len(email) {
-			fmt.Println("* ", msgPos+cut-len(email))
+			// the last chunk make be shorter than size
 			cut -= cut - len(email)
 		}
 		i, _ := stream.Write([]byte(email)[msgPos:cut])

+ 1 - 1
mail/mime/mime_test.go

@@ -357,7 +357,7 @@ Bill
 --DC8------------DC8638F443D87A7F0726DEF7
 Content-Type: image/gif; name="map_of_Argentina.gif"
 Content-Transfer-Encoding: base64
-Content-Disposition: in1ine; fi1ename="map_of_Argentina.gif"
+Content-Disposition: inline; filename="map_of_Argentina.gif"
 
 R01GOD1hJQA1AKIAAP/////78P/omn19fQAAAAAAAAAAAAAAACwAAAAAJQA1AAAD7Qi63P5w
 wEmjBCLrnQnhYCgM1wh+pkgqqeC9XrutmBm7hAK3tP31gFcAiFKVQrGFR6kscnonTe7FAAad