瀏覽代碼

refactor chunksaver to its own package

flashmob 6 年之前
父節點
當前提交
6535833a92

+ 6 - 4
api_test.go

@@ -4,16 +4,18 @@ import (
 	"bufio"
 	"errors"
 	"fmt"
-	"github.com/flashmob/go-guerrilla/backends"
-	"github.com/flashmob/go-guerrilla/log"
-	"github.com/flashmob/go-guerrilla/mail"
-	"github.com/flashmob/go-guerrilla/response"
 	"io/ioutil"
 	"net"
 	"os"
 	"strings"
 	"testing"
 	"time"
+
+	"github.com/flashmob/go-guerrilla/backends"
+	_ "github.com/flashmob/go-guerrilla/chunk"
+	"github.com/flashmob/go-guerrilla/log"
+	"github.com/flashmob/go-guerrilla/mail"
+	"github.com/flashmob/go-guerrilla/response"
 )
 
 // Test Starting smtp without setting up logger / backend

+ 3 - 3
backends/backend.go

@@ -25,7 +25,7 @@ var (
 func init() {
 	Svc = &service{}
 	processors = make(map[string]ProcessorConstructor)
-	streamers = make(map[string]StreamProcessorConstructor)
+	Streamers = make(map[string]StreamProcessorConstructor)
 }
 
 type ProcessorConstructor func() Decorator
@@ -198,7 +198,7 @@ func (s *service) reset() {
 // Initialize initializes all the processors one-by-one and returns any errors.
 // Subsequent calls to Initialize will not call the initializer again unless it failed on the previous call
 // so Initialize may be called again to retry after getting errors
-func (s *service) initialize(backend BackendConfig) Errors {
+func (s *service) Initialize(backend BackendConfig) Errors {
 	s.Lock()
 	defer s.Unlock()
 	var errors Errors = nil
@@ -253,7 +253,7 @@ func (s *service) AddStreamProcessor(name string, p StreamProcessorConstructor)
 		return p()
 	}
 	// add to our processors list
-	streamers[strings.ToLower(name)] = c
+	Streamers[strings.ToLower(name)] = c
 }
 
 // extractConfig loads the backend config. It has already been unmarshalled

+ 3 - 3
backends/gateway.go

@@ -397,7 +397,7 @@ func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 	items := strings.Split(cfg, "|")
 	for i := range items {
 		name := items[len(items)-1-i] // reverse order, since decorators are stacked
-		if makeFunc, ok := streamers[name]; ok {
+		if makeFunc, ok := Streamers[name]; ok {
 			decorators = append(decorators, makeFunc())
 		} else {
 			ErrProcessorNotFound = errors.New(fmt.Sprintf("stream processor [%s] not found", name))
@@ -467,8 +467,8 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 
 		gw.streamers = append(gw.streamers, s)
 	}
-	// initialize processors
-	if err := Svc.initialize(cfg); err != nil {
+	// Initialize processors
+	if err := Svc.Initialize(cfg); err != nil {
 		gw.State = BackendStateError
 		return err
 	}

+ 0 - 1271
backends/s_chunksaver.go

@@ -1,1271 +0,0 @@
-package backends
-
-// ----------------------------------------------------------------------------------
-// Processor Name: ChunkSaver
-// ----------------------------------------------------------------------------------
-// Description   : Takes the stream and saves it in chunks. Chunks are split on the
-//               : chunksaver_chunk_size config setting, and also at the end of MIME parts,
-//               : and after a header. This allows for basic de-duplication: we can take a
-//               : hash of each chunk, then check the database to see if we have it already.
-//               : We don't need to write it to the database, but take the reference of the
-//               : previously saved chunk and only increment the reference count.
-//               : The rationale to put headers and bodies into separate chunks is
-//               : due to headers often containing more unique data, while the bodies are
-//               : often duplicated, especially for messages that are CC'd or forwarded
-// ----------------------------------------------------------------------------------
-// Requires      : "mimeanalyzer" stream processor to be enabled before it
-// ----------------------------------------------------------------------------------
-// Config Options: chunksaver_chunk_size - maximum chunk size, in bytes
-// --------------:-------------------------------------------------------------------
-// Input         : e.Values["MimeParts"] Which is of type *[]*mime.Part, as populated by "mimeanalyzer"
-// ----------------------------------------------------------------------------------
-// Output        :
-// ----------------------------------------------------------------------------------
-
-import (
-	"bytes"
-	"compress/zlib"
-	"crypto/md5"
-	"database/sql"
-	"encoding/base64"
-	"encoding/binary"
-	"encoding/json"
-	"errors"
-	"fmt"
-	"github.com/flashmob/go-guerrilla/mail"
-	"github.com/flashmob/go-guerrilla/mail/mime"
-	"hash"
-	"io"
-	"io/ioutil"
-	"net"
-	"strings"
-	"sync"
-	"time"
-)
-
-type chunkSaverConfig struct {
-	// ChunkMaxBytes controls the maximum buffer size for saving
-	// 16KB default.
-	ChunkMaxBytes int    `json:"chunksaver_chunk_size,omitempty"`
-	StorageEngine string `json:"chunksaver_storage_engine,omitempty"`
-	CompressLevel int    `json:"chunksaver_compress_level,omitempty"`
-}
-
-func init() {
-	streamers["chunksaver"] = func() *StreamDecorator {
-		return Chunksaver()
-	}
-}
-
-const hashByteSize = 16
-
-type HashKey [hashByteSize]byte
-
-// Pack takes a slice and copies each byte to HashKey internal representation
-func (h *HashKey) Pack(b []byte) {
-	if len(b) < hashByteSize {
-		return
-	}
-	copy(h[:], b[0:hashByteSize])
-}
-
-// String implements the Stringer interface from fmt
-func (h HashKey) String() string {
-	return base64.RawStdEncoding.EncodeToString(h[0:hashByteSize])
-}
-
-// UnmarshalJSON implements the Unmarshaler interface from encoding/json
-func (h *HashKey) UnmarshalJSON(b []byte) error {
-	dbuf := make([]byte, base64.RawStdEncoding.DecodedLen(len(b[1:len(b)-1])))
-	_, err := base64.RawStdEncoding.Decode(dbuf, b[1:len(b)-1])
-	if err != nil {
-		return err
-	}
-	h.Pack(dbuf)
-	return nil
-}
-
-// MarshalJSON implements the Marshaler interface from encoding/json
-// The value is marshaled as a raw base64 to save some bytes
-// eg. instead of typically using hex, de17038001170380011703ff01170380 would be represented as 3hcDgAEXA4ABFwP/ARcDgA
-func (h *HashKey) MarshalJSON() ([]byte, error) {
-	return []byte(`"` + h.String() + `"`), nil
-}
-
-// PartsInfo describes the mime-parts contained in the email
-type PartsInfo struct {
-	Count       uint32        `json:"c"`   // number of parts
-	TextPart    int           `json:"tp"`  // index of the main text part to display
-	HTMLPart    int           `json:"hp"`  // index of the main html part to display (if any)
-	HasAttach   bool          `json:"a"`   // is there an attachment?
-	Parts       []ChunkedPart `json:"p"`   // info describing a mime-part
-	CBoundaries []string      `json:"cbl"` // content boundaries list
-
-	bp sync.Pool // bytes.buffer pool
-}
-
-// ChunkedPart contains header information about a mime-part, including keys pointing to where the data is stored at
-type ChunkedPart struct {
-	PartId             string    `json:"i"`
-	Size               uint      `json:"s"`
-	ChunkHash          []HashKey `json:"h"` // sequence of hashes the data is stored at
-	ContentType        string    `json:"t"`
-	Charset            string    `json:"c"`
-	TransferEncoding   string    `json:"e"`
-	ContentDisposition string    `json:"d"`
-	ContentBoundary    int       `json:"cb"` // index to the CBoundaries list in PartsInfo
-}
-
-func NewPartsInfo() *PartsInfo {
-	pi := new(PartsInfo)
-	pi.bp = sync.Pool{
-		// if not available, then create a new one
-		New: func() interface{} {
-			var b bytes.Buffer
-			return &b
-		},
-	}
-	return pi
-}
-
-// boundary takes a string and returns the index of the string in the info.CBoundaries slice
-func (info *PartsInfo) boundary(cb string) int {
-	for i := range info.CBoundaries {
-		if info.CBoundaries[i] == cb {
-			return i
-		}
-	}
-	info.CBoundaries = append(info.CBoundaries, cb)
-	return len(info.CBoundaries) - 1
-}
-
-// UnmarshalJSON unmarshals the JSON and decompresses using zlib
-func (info *PartsInfo) UnmarshalJSONZlib(b []byte) error {
-
-	r, err := zlib.NewReader(bytes.NewReader(b[1 : len(b)-1]))
-	if err != nil {
-		return err
-	}
-	all, err := ioutil.ReadAll(r)
-	if err != nil {
-		return err
-	}
-	err = json.Unmarshal(all, info)
-	if err != nil {
-		return err
-	}
-	return nil
-}
-
-// MarshalJSONZlib marshals and compresses the bytes using zlib
-func (info *PartsInfo) MarshalJSONZlib() ([]byte, error) {
-
-	buf, err := json.Marshal(info)
-	if err != nil {
-		return buf, err
-	}
-	// borrow a buffer form the pool
-	compressed := info.bp.Get().(*bytes.Buffer)
-	// put back in the pool
-	defer func() {
-		compressed.Reset()
-		info.bp.Put(b)
-	}()
-
-	zlibw, err := zlib.NewWriterLevel(compressed, 9)
-	if err != nil {
-		return buf, err
-	}
-	if _, err := zlibw.Write(buf); err != nil {
-		return buf, err
-	}
-	if err := zlibw.Close(); err != nil {
-		return buf, err
-	}
-	return []byte(`"` + compressed.String() + `"`), nil
-}
-
-type flushEvent func() error
-
-type chunkedBytesBuffer struct {
-	buf          []byte
-	flushTrigger flushEvent
-}
-
-// flush signals that it's time to write the buffer out to storage
-func (c *chunkedBytesBuffer) flush() error {
-	if len(c.buf) == 0 {
-		return nil
-	}
-	fmt.Print(string(c.buf))
-	if c.flushTrigger != nil {
-		if err := c.flushTrigger(); err != nil {
-			return err
-		}
-	}
-	c.Reset()
-	return nil
-}
-
-// Reset sets the length back to 0, making it re-usable
-func (c *chunkedBytesBuffer) Reset() {
-	c.buf = c.buf[:0] // set the length back to 0
-}
-
-// Write takes a p slice of bytes and writes it to the buffer.
-// It will never grow the buffer, flushing it as soon as it's full.
-func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
-	remaining := len(p)
-	bufCap := cap(c.buf)
-	for {
-		free := bufCap - len(c.buf)
-		if free > remaining {
-			// enough of room in the buffer
-			c.buf = append(c.buf, p[i:i+remaining]...)
-			i += remaining
-			return
-		} else {
-			// fill the buffer to the 'brim' with a slice from p
-			c.buf = append(c.buf, p[i:i+free]...)
-			remaining -= free
-			i += free
-			err = c.flush()
-			if err != nil {
-				return i, err
-			}
-			if remaining == 0 {
-				return
-			}
-		}
-	}
-}
-
-// capTo caps the internal buffer to specified number of bytes, sets the length back to 0
-func (c *chunkedBytesBuffer) capTo(n int) {
-	if cap(c.buf) == n {
-		return
-	}
-	c.buf = make([]byte, 0, n)
-}
-
-// chunkedBytesBufferMime decorates chunkedBytesBuffer, specifying that to do when a flush event is triggered
-type chunkedBytesBufferMime struct {
-	chunkedBytesBuffer
-	current  *mime.Part
-	info     PartsInfo
-	md5      hash.Hash
-	database ChunkSaverStorage
-}
-
-func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
-	b := new(chunkedBytesBufferMime)
-	b.chunkedBytesBuffer.flushTrigger = func() error {
-		return b.onFlush()
-	}
-	b.md5 = md5.New()
-	b.buf = make([]byte, 0, chunkMaxBytes)
-	return b
-}
-
-func (b *chunkedBytesBufferMime) setDatabase(database ChunkSaverStorage) {
-	b.database = database
-}
-
-// onFlush is called whenever the flush event fires.
-// - It saves the chunk to disk and adds the chunk's hash to the list.
-// - It builds the b.info.Parts structure
-func (b *chunkedBytesBufferMime) onFlush() error {
-	b.md5.Write(b.buf)
-	var chash HashKey
-	copy(chash[:], b.md5.Sum([]byte{}))
-	if b.current == nil {
-		return errors.New("b.current part is nil")
-	}
-	if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
-		// existing part, just append the hash
-		lastPart := &b.info.Parts[size-1]
-		lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
-		b.fillInfo(lastPart, size-1)
-		lastPart.Size += uint(len(b.buf))
-	} else {
-		// add it as a new part
-		part := ChunkedPart{
-			PartId:          b.current.Node,
-			ChunkHash:       []HashKey{chash},
-			ContentBoundary: b.info.boundary(b.current.ContentBoundary),
-			Size:            uint(len(b.buf)),
-		}
-		b.fillInfo(&part, 0)
-		b.info.Parts = append(b.info.Parts, part)
-		b.info.Count++
-	}
-	if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
-		return err
-	}
-	return nil
-}
-
-func (b *chunkedBytesBufferMime) fillInfo(cp *ChunkedPart, index int) {
-	if cp.ContentType == "" && b.current.ContentType != nil {
-		cp.ContentType = b.current.ContentType.String()
-	}
-	if cp.Charset == "" && b.current.Charset != "" {
-		cp.Charset = b.current.Charset
-	}
-	if cp.TransferEncoding == "" && b.current.TransferEncoding != "" {
-		cp.TransferEncoding = b.current.TransferEncoding
-	}
-	if cp.ContentDisposition == "" && b.current.ContentDisposition != "" {
-		cp.ContentDisposition = b.current.ContentDisposition
-		if strings.Contains(cp.ContentDisposition, "attach") {
-			b.info.HasAttach = true
-		}
-	}
-	if cp.ContentType != "" {
-		if b.info.TextPart == -1 && strings.Contains(cp.ContentType, "text/plain") {
-			b.info.TextPart = index
-		} else if b.info.HTMLPart == -1 && strings.Contains(cp.ContentType, "text/html") {
-			b.info.HTMLPart = index
-		}
-	}
-}
-
-// Reset decorates the Reset method of the chunkedBytesBuffer
-func (b *chunkedBytesBufferMime) Reset() {
-	b.md5.Reset()
-	b.chunkedBytesBuffer.Reset()
-}
-
-func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
-	if b.current == nil {
-		b.info = *NewPartsInfo()
-		b.info.Parts = make([]ChunkedPart, 0, 3)
-		b.info.TextPart = -1
-		b.info.HTMLPart = -1
-	}
-	b.current = cp
-}
-
-// ChunkSaverStorage defines an interface to the storage layer (the database)
-type ChunkSaverStorage interface {
-	// OpenMessage is used to begin saving an email. An email id is returned and used to call CloseMessage later
-	OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error)
-	// CloseMessage finalizes the writing of an email. Additional data collected while parsing the email is saved
-	CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error
-	// AddChunk saves a chunk of bytes to a given hash key
-	AddChunk(data []byte, hash []byte) error
-	// GetEmail returns an email that's been saved
-	GetEmail(mailID uint64) (*ChunkSaverEmail, error)
-	// GetChunks loads in the specified chunks of bytes from storage
-	GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error)
-	// Initialize is called when the backend is started
-	Initialize(cfg BackendConfig) error
-	// Shutdown is called when the backend gets shutdown.
-	Shutdown() (err error)
-}
-
-// ChunkSaverEmail represents an email
-type ChunkSaverEmail struct {
-	mailID     uint64
-	createdAt  time.Time
-	size       int64
-	from       string // from stores the email address found in the "From" header field
-	to         string // to stores the email address found in the "From" header field
-	partsInfo  PartsInfo
-	helo       string // helo message given by the client when the message was transmitted
-	subject    string // subject stores the value from the first "Subject" header field
-	deliveryID string
-	recipient  string     // recipient is the email address that the server received from the RCPT TO command
-	ipv4       net.IPAddr // set to a value if client connected via ipv4
-	ipv6       net.IPAddr // set to a value if client connected via ipv6
-	returnPath string     // returnPath is the email address that the server received from the MAIL FROM command
-	isTLS      bool       // isTLS is true when TLS was used to connect
-}
-
-type ChunkSaverChunk struct {
-	modifiedAt     time.Time
-	referenceCount uint // referenceCount counts how many emails reference this chunk
-	data           io.Reader
-}
-
-type chunkSaverMemoryEmail struct {
-	mailID     uint64
-	createdAt  time.Time
-	size       int64
-	from       string
-	to         string
-	partsInfo  []byte
-	helo       string
-	subject    string
-	deliveryID string
-	recipient  string
-	ipv4       net.IPAddr
-	ipv6       net.IPAddr
-	returnPath string
-	isTLS      bool
-}
-
-type chunkSaverMemoryChunk struct {
-	modifiedAt     time.Time
-	referenceCount uint
-	data           []byte
-}
-
-type chunkSaverMemory struct {
-	chunks        map[HashKey]*chunkSaverMemoryChunk
-	emails        []*chunkSaverMemoryEmail
-	nextID        uint64
-	IDOffset      uint64
-	compressLevel int
-}
-
-// OpenMessage implements the ChunkSaverStorage interface
-func (m *chunkSaverMemory) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
-	var ip4, ip6 net.IPAddr
-	if ip := ipAddress.IP.To4(); ip != nil {
-		ip4 = ipAddress
-	} else {
-		ip6 = ipAddress
-	}
-	email := chunkSaverMemoryEmail{
-		mailID:     m.nextID,
-		createdAt:  time.Now(),
-		from:       from,
-		helo:       helo,
-		recipient:  recipient,
-		ipv4:       ip4,
-		ipv6:       ip6,
-		returnPath: returnPath,
-		isTLS:      isTLS,
-	}
-	m.emails = append(m.emails, &email)
-	m.nextID++
-	return email.mailID, nil
-}
-
-// CloseMessage implements the ChunkSaverStorage interface
-func (m *chunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
-	if email := m.emails[mailID-m.IDOffset]; email == nil {
-		return errors.New("email not found")
-	} else {
-		email.size = size
-		if info, err := partsInfo.MarshalJSONZlib(); err != nil {
-			return err
-		} else {
-			email.partsInfo = info
-		}
-		email.subject = subject
-		email.deliveryID = deliveryID
-		email.to = to
-		email.from = from
-		email.size = size
-	}
-	return nil
-}
-
-// AddChunk implements the ChunkSaverStorage interface
-func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
-	var key HashKey
-	if len(hash) != hashByteSize {
-		return errors.New("invalid hash")
-	}
-	key.Pack(hash)
-	var compressed bytes.Buffer
-	zlibw, err := zlib.NewWriterLevel(&compressed, m.compressLevel)
-	if err != nil {
-		return err
-	}
-	if chunk, ok := m.chunks[key]; ok {
-		// only update the counters and update time
-		chunk.referenceCount++
-		chunk.modifiedAt = time.Now()
-	} else {
-		if _, err := zlibw.Write(data); err != nil {
-			return err
-		}
-		if err := zlibw.Close(); err != nil {
-			return err
-		}
-		// add a new chunk
-		newChunk := chunkSaverMemoryChunk{
-			modifiedAt:     time.Now(),
-			referenceCount: 1,
-			data:           compressed.Bytes(),
-		}
-		m.chunks[key] = &newChunk
-	}
-	return nil
-}
-
-// Initialize implements the ChunkSaverStorage interface
-func (m *chunkSaverMemory) Initialize(cfg BackendConfig) error {
-	m.IDOffset = 1
-	m.nextID = m.IDOffset
-	m.emails = make([]*chunkSaverMemoryEmail, 0, 100)
-	m.chunks = make(map[HashKey]*chunkSaverMemoryChunk, 1000)
-	m.compressLevel = zlib.NoCompression
-	return nil
-}
-
-// Shutdown implements the ChunkSaverStorage interface
-func (m *chunkSaverMemory) Shutdown() (err error) {
-	m.emails = nil
-	m.chunks = nil
-	return nil
-}
-
-// GetEmail implements the ChunkSaverStorage interface
-func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
-	if size := uint64(len(m.emails)) - m.IDOffset; size > mailID-m.IDOffset {
-		return nil, errors.New("mail not found")
-	}
-	email := m.emails[mailID-m.IDOffset]
-	pi := NewPartsInfo()
-	if err := pi.UnmarshalJSONZlib(email.partsInfo); err != nil {
-		return nil, err
-	}
-	return &ChunkSaverEmail{
-		mailID:     email.mailID,
-		createdAt:  email.createdAt,
-		size:       email.size,
-		from:       email.from,
-		to:         email.to,
-		partsInfo:  *pi,
-		helo:       email.helo,
-		subject:    email.subject,
-		deliveryID: email.deliveryID,
-		recipient:  email.recipient,
-		ipv4:       email.ipv4,
-		ipv6:       email.ipv6,
-		returnPath: email.returnPath,
-		isTLS:      email.isTLS,
-	}, nil
-}
-
-// GetChunk implements the ChunkSaverStorage interface
-func (m *chunkSaverMemory) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
-	result := make([]*ChunkSaverChunk, 0, len(hash))
-	var key HashKey
-	for i := range hash {
-		key = hash[i]
-		if c, ok := m.chunks[key]; ok {
-			zwr, err := zlib.NewReader(bytes.NewReader(c.data))
-			if err != nil {
-				return nil, err
-			}
-			result = append(result, &ChunkSaverChunk{
-				modifiedAt:     c.modifiedAt,
-				referenceCount: c.referenceCount,
-				data:           zwr,
-			})
-		}
-	}
-	return result, nil
-}
-
-type chunkSaverSQLConfig struct {
-	EmailTable  string `json:"chunksaver_email_table,omitempty"`
-	ChunkTable  string `json:"chunksaver_chunk_table,omitempty"`
-	Driver      string `json:"chunksaver_sql_driver,omitempty"`
-	DSN         string `json:"chunksaver_sql_dsn,omitempty"`
-	PrimaryHost string `json:"chunksaver_primary_mail_host,omitempty"`
-}
-
-// chunkSaverSQL implements the ChunkSaverStorage interface
-type chunkSaverSQL struct {
-	config     *chunkSaverSQLConfig
-	statements map[string]*sql.Stmt
-	db         *sql.DB
-}
-
-func (c *chunkSaverSQL) connect() (*sql.DB, error) {
-	var err error
-	if c.db, err = sql.Open(c.config.Driver, c.config.DSN); err != nil {
-		Log().Error("cannot open database: ", err)
-		return nil, err
-	}
-	// do we have permission to access the table?
-	_, err = c.db.Query("SELECT mail_id FROM " + c.config.EmailTable + " LIMIT 1")
-	if err != nil {
-		return nil, err
-	}
-	return c.db, err
-}
-
-func (c *chunkSaverSQL) prepareSql() error {
-	if c.statements == nil {
-		c.statements = make(map[string]*sql.Stmt)
-	}
-
-	if stmt, err := c.db.Prepare(`INSERT INTO ` +
-		c.config.EmailTable +
-		` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, is_tls) 
- VALUES(?, ?, ?, ?, ?, ?, ?)`); err != nil {
-		return err
-	} else {
-		c.statements["insertEmail"] = stmt
-	}
-
-	// begin inserting an email (before saving chunks)
-	if stmt, err := c.db.Prepare(`INSERT INTO ` +
-		c.config.ChunkTable +
-		` (data, hash) 
- VALUES(?, ?)`); err != nil {
-		return err
-	} else {
-		c.statements["insertChunk"] = stmt
-	}
-
-	// finalize the email (the connection closed)
-	if stmt, err := c.db.Prepare(`
-		UPDATE ` + c.config.EmailTable + ` 
-			SET size=?, parts_info = ?, subject, delivery_id = ?, to = ? 
-		WHERE mail_id = ? `); err != nil {
-		return err
-	} else {
-		c.statements["finalizeEmail"] = stmt
-	}
-
-	// Check the existence of a chunk (the reference_count col is incremented if it exists)
-	// This means we can avoid re-inserting an existing chunk, only update its reference_count
-	if stmt, err := c.db.Prepare(`
-		UPDATE ` + c.config.ChunkTable + ` 
-			SET reference_count=reference_count+1 
-		WHERE hash = ? `); err != nil {
-		return err
-	} else {
-		c.statements["chunkReferenceIncr"] = stmt
-	}
-
-	// If the reference_count is 0 then it means the chunk has been deleted
-	// Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
-	if stmt, err := c.db.Prepare(`
-		UPDATE ` + c.config.ChunkTable + ` 
-			SET reference_count=reference_count-1 
-		WHERE hash = ? AND reference_count > 0`); err != nil {
-		return err
-	} else {
-		c.statements["chunkReferenceDecr"] = stmt
-	}
-
-	// fetch an email
-	if stmt, err := c.db.Prepare(`
-		SELECT * 
-		from ` + c.config.EmailTable + ` 
-		where mail_id=?`); err != nil {
-		return err
-	} else {
-		c.statements["selectMail"] = stmt
-	}
-
-	// fetch a chunk
-	if stmt, err := c.db.Prepare(`
-		SELECT * 
-		from ` + c.config.ChunkTable + ` 
-		where hash=?`); err != nil {
-		return err
-	} else {
-		c.statements["selectChunk"] = stmt
-	}
-
-	// TODO sweep old chunks
-
-	// TODO sweep incomplete emails
-
-	return nil
-}
-
-// OpenMessage implements the ChunkSaverStorage interface
-func (c *chunkSaverSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
-
-	// if it's ipv4 then we want ipv6 to be 0, and vice-versa
-	var ip4 uint32
-	ip6 := make([]byte, 16)
-	if ip := ipAddress.IP.To4(); ip != nil {
-		ip4 = binary.BigEndian.Uint32(ip)
-	} else {
-		_ = copy(ip6, ipAddress.IP)
-	}
-	r, err := c.statements["insertEmail"].Exec(from, helo, recipient, ip4, ip6, returnPath, isTLS)
-	if err != nil {
-		return 0, err
-	}
-	id, err := r.LastInsertId()
-	if err != nil {
-		return 0, err
-	}
-	return uint64(id), err
-}
-
-// AddChunk implements the ChunkSaverStorage interface
-func (c *chunkSaverSQL) AddChunk(data []byte, hash []byte) error {
-	// attempt to increment the reference_count (it means the chunk is already in there)
-	r, err := c.statements["chunkReferenceIncr"].Exec(hash)
-	if err != nil {
-		return err
-	}
-	affected, err := r.RowsAffected()
-	if err != nil {
-		return err
-	}
-	if affected == 0 {
-		// chunk isn't in there, let's insert it
-		_, err := c.statements["insertChunk"].Exec(data, hash)
-		if err != nil {
-			return err
-		}
-	}
-	return nil
-}
-
-// CloseMessage implements the ChunkSaverStorage interface
-func (c *chunkSaverSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
-	partsInfoJson, err := json.Marshal(partsInfo)
-	if err != nil {
-		return err
-	}
-	_, err = c.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, deliveryID, to, mailID)
-	if err != nil {
-		return err
-	}
-	return nil
-}
-
-// Initialize loads the specific database config, connects to the db, prepares statements
-func (c *chunkSaverSQL) Initialize(cfg BackendConfig) error {
-	configType := BaseConfig(&chunkSaverSQLConfig{})
-	bcfg, err := Svc.ExtractConfig(cfg, configType)
-	if err != nil {
-		return err
-	}
-	c.config = bcfg.(*chunkSaverSQLConfig)
-	c.db, err = c.connect()
-	if err != nil {
-		return err
-	}
-	err = c.prepareSql()
-	if err != nil {
-		return err
-	}
-	return nil
-}
-
-// Shutdown implements the ChunkSaverStorage interface
-func (c *chunkSaverSQL) Shutdown() (err error) {
-	defer func() {
-		closeErr := c.db.Close()
-		if closeErr != err {
-			Log().WithError(err).Error("failed to close sql database")
-			err = closeErr
-		}
-	}()
-	for i := range c.statements {
-		if err = c.statements[i].Close(); err != nil {
-			Log().WithError(err).Error("failed to close sql statement")
-		}
-	}
-	return err
-}
-
-// GetEmail implements the ChunkSaverStorage interface
-func (c *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
-	return &ChunkSaverEmail{}, nil
-}
-
-// GetChunk implements the ChunkSaverStorage interface
-func (c *chunkSaverSQL) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
-	result := make([]*ChunkSaverChunk, 0, len(hash))
-	return result, nil
-}
-
-type chunkMailReader struct {
-	db    ChunkSaverStorage
-	email *ChunkSaverEmail
-	// part requests a part. If 0, all the parts are read sequentially
-	part int
-	i, j int
-
-	cache cachedChunks
-}
-
-// NewChunkMailReader loads the email and selects which mime-part Read will read, starting from 1
-// if part is 0, Read will read in the entire message. 1 selects the first part, 2 2nd, and so on..
-func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int) (*chunkMailReader, error) {
-	r := new(chunkMailReader)
-	r.db = db
-	r.part = part
-	if email == nil {
-		return nil, errors.New("nil email")
-	} else {
-		r.email = email
-	}
-	if err := r.SeekPart(part); err != nil {
-		return nil, err
-	}
-	r.cache = cachedChunks{
-		db: db,
-	}
-	return r, nil
-}
-
-// SeekPart resets the reader. The part argument chooses which part Read will read in
-// If part is 0, Read will return the entire message
-func (r *chunkMailReader) SeekPart(part int) error {
-	if parts := len(r.email.partsInfo.Parts); parts == 0 {
-		return errors.New("email has mime parts missing")
-	} else if part > parts {
-		return errors.New("no such part available")
-	}
-	r.i = part
-	r.j = 0
-	return nil
-}
-
-type cachedChunks struct {
-	chunks    []*ChunkSaverChunk
-	hashIndex map[int]HashKey
-	db        ChunkSaverStorage
-}
-
-const chunkCachePreload = 2
-
-// warm allocates the chunk cache, and gets the first few and stores them in the cache
-func (c *cachedChunks) warm(hashes ...HashKey) (int, error) {
-
-	if c.hashIndex == nil {
-		c.hashIndex = make(map[int]HashKey, len(hashes))
-	}
-	if c.chunks == nil {
-		c.chunks = make([]*ChunkSaverChunk, 0, 100)
-	}
-	if len(c.chunks) > 0 {
-		// already been filled
-		return len(c.chunks), nil
-	}
-	// let's pre-load some hashes.
-	preload := chunkCachePreload
-	if len(hashes) < preload {
-		preload = len(hashes)
-	}
-	if chunks, err := c.db.GetChunks(hashes[0:preload]...); err != nil {
-		return 0, err
-	} else {
-		for i := range hashes {
-			c.hashIndex[i] = hashes[i]
-			if i < preload {
-				c.chunks = append(c.chunks, chunks[i])
-			} else {
-				// don't pre-load
-				c.chunks = append(c.chunks, nil) // nil will be a placeholder for our chunk
-			}
-		}
-	}
-	return len(c.chunks), nil
-}
-
-// get returns a chunk. If the chunk doesn't exist, it gets it and pre-loads the next few
-// also removes the previous chunks that now have become stale
-func (c *cachedChunks) get(i int) (*ChunkSaverChunk, error) {
-	if i > len(c.chunks) {
-		return nil, errors.New("not enough chunks")
-	}
-	if c.chunks[i] != nil {
-		// cache hit!
-		return c.chunks[i], nil
-	} else {
-		var toGet []HashKey
-		if key, ok := c.hashIndex[i]; ok {
-			toGet = append(toGet, key)
-		} else {
-			return nil, errors.New(fmt.Sprintf("hash for key [%s] not found", key))
-		}
-		// make a list of chunks to load (extra ones to be pre-loaded)
-		for to := i + 1; to < len(c.chunks) || to > chunkCachePreload+i; to++ {
-			if key, ok := c.hashIndex[to]; ok {
-				toGet = append(toGet, key)
-			}
-		}
-		if chunks, err := c.db.GetChunks(toGet...); err != nil {
-			return nil, err
-		} else {
-			// cache the pre-loaded chunks
-			for j := i; j < len(c.chunks); j++ {
-				c.chunks[j] = chunks[j-i]
-				c.hashIndex[j] = toGet[j-i]
-			}
-			// remove any old ones (walk back)
-			for j := i; j > -1; j-- {
-				if c.chunks[j] != nil {
-					c.chunks[j] = nil
-				} else {
-					break
-				}
-			}
-			// return the chunk asked for
-			return chunks[0], nil
-		}
-	}
-
-}
-
-func (c *cachedChunks) empty() {
-	for i := range c.chunks {
-		c.chunks[i] = nil
-	}
-	c.chunks = c.chunks[:] // set len to 0
-	for key := range c.hashIndex {
-		delete(c.hashIndex, key)
-	}
-}
-
-// Read implements the io.Reader interface
-func (r *chunkMailReader) Read(p []byte) (n int, err error) {
-	var length int
-	for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
-		length, err = r.cache.warm(r.email.partsInfo.Parts[r.i].ChunkHash...)
-		if err != nil {
-			return
-		}
-		var nRead int
-		for r.j < length {
-			chunk, err := r.cache.get(r.j)
-			if err != nil {
-				return nRead, err
-			}
-			nRead, err = chunk.data.Read(p)
-			if err == io.EOF {
-				r.j++ // advance to the next chunk
-				err = nil
-			}
-			if r.j == length { // last chunk in a part?
-				r.j = 0 // reset chunk index
-				r.i++   // advance to the next part
-				if r.i == len(r.email.partsInfo.Parts) || r.part > 0 {
-					// there are no more parts to return
-					err = io.EOF
-					r.cache.empty()
-				}
-			}
-			// unless there's an error, the next time this function will be
-			// called, it will read the next chunk
-			return nRead, err
-		}
-	}
-	err = io.EOF
-	return n, err
-}
-
-type transportEncoding int
-
-const (
-	encodingTypeBase64 transportEncoding = iota
-	encodingTypeQP
-)
-
-// chunkPartDecoder decodes base64 and q-printable, then converting charset to utf8-8
-type chunkPartDecoder struct {
-	*chunkMailReader
-	buf     []byte
-	state   int
-	charset string
-
-	r io.Reader
-}
-
-// example
-// db ChunkSaverStorage, email *ChunkSaverEmail, part int)
-/*
-
-r, err := NewChunkMailReader(db, email, part)
-	if err != nil {
-		return nil, err
-	}
-
-*/
-
-// NewChunkPartDecoder reads from an underlying reader r and decodes base64, quoted-printable and decodes
-func NewChunkPartDecoder(r io.Reader, enc transportEncoding, charset string) (*chunkPartDecoder, error) {
-
-	decoder := new(chunkPartDecoder)
-	decoder.r = r
-	return decoder, nil
-}
-
-const chunkSaverNL = '\n'
-
-const (
-	decoderStateFindHeader int = iota
-	decoderStateMatchNL
-	decoderStateDecode
-)
-
-func (r *chunkPartDecoder) Read(p []byte) (n int, err error) {
-	var part *ChunkedPart
-	//if cap(p) != cap(r.buf) {
-	r.buf = make([]byte, len(p), cap(p))
-	var start, buffered int
-	part = &r.email.partsInfo.Parts[r.part]
-	_ = part
-	buffered, err = r.chunkMailReader.Read(r.buf)
-	if buffered == 0 {
-		return
-	}
-	for {
-		switch r.state {
-		case decoderStateFindHeader:
-			// finding the start of the header
-			if start = bytes.Index(r.buf, []byte{chunkSaverNL, chunkSaverNL}); start != -1 {
-				start += 2                   // skip the \n\n
-				r.state = decoderStateDecode // found the header
-				continue                     // continue scanning
-			} else if r.buf[len(r.buf)-1] == chunkSaverNL {
-				// the last char is a \n so next call to Read will check if it starts with a matching \n
-				r.state = decoderStateMatchNL
-			}
-		case decoderStateMatchNL:
-			if r.buf[0] == '\n' {
-				// found the header
-				start = 1
-				r.state = decoderStateDecode
-				continue
-			} else {
-				r.state = decoderStateFindHeader
-				continue
-			}
-
-		case decoderStateDecode:
-			if start < len(r.buf) {
-				// todo decode here (q-printable, base64, charset)
-				n += copy(p[:], r.buf[start:buffered])
-			}
-			return
-		}
-
-		buffered, err = r.chunkMailReader.Read(r.buf)
-		if buffered == 0 {
-			return
-		}
-	}
-
-}
-
-const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
-/**
-*
- * A chunk ends ether:
- * after xKB or after end of a part, or end of header
- *
- * - buffer first chunk
- * - if didn't receive first chunk for more than x bytes, save normally
- *
-*/
-func Chunksaver() *StreamDecorator {
-
-	sd := &StreamDecorator{}
-	sd.Decorate =
-		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
-			var (
-				envelope    *mail.Envelope
-				chunkBuffer *chunkedBytesBufferMime
-				msgPos      uint
-				database    ChunkSaverStorage
-				written     int64
-
-				// just some headers from the first mime-part
-				subject string
-				to      string
-				from    string
-
-				progress int // tracks which mime parts were processed
-			)
-
-			var config *chunkSaverConfig
-			// optional dependency injection
-			for i := range a {
-				if db, ok := a[i].(ChunkSaverStorage); ok {
-					database = db
-				}
-				if buff, ok := a[i].(*chunkedBytesBufferMime); ok {
-					chunkBuffer = buff
-				}
-			}
-
-			Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
-
-				configType := BaseConfig(&chunkSaverConfig{})
-				bcfg, err := Svc.ExtractConfig(backendConfig, configType)
-				if err != nil {
-					return err
-				}
-				config = bcfg.(*chunkSaverConfig)
-				if chunkBuffer == nil {
-					chunkBuffer = newChunkedBytesBufferMime()
-				}
-				// configure storage if none was injected
-				if database == nil {
-					if config.StorageEngine == "memory" {
-						db := new(chunkSaverMemory)
-						db.compressLevel = config.CompressLevel
-						database = db
-					} else {
-						db := new(chunkSaverSQL)
-						database = db
-					}
-				}
-				err = database.Initialize(backendConfig)
-				if err != nil {
-					return err
-				}
-				// configure the chunks buffer
-				if config.ChunkMaxBytes > 0 {
-					chunkBuffer.capTo(config.ChunkMaxBytes)
-				} else {
-					chunkBuffer.capTo(chunkMaxBytes)
-				}
-				chunkBuffer.setDatabase(database)
-
-				return nil
-			}))
-
-			Svc.AddShutdowner(ShutdownWith(func() error {
-				err := database.Shutdown()
-				return err
-			}))
-
-			sd.Open = func(e *mail.Envelope) error {
-				// create a new entry & grab the id
-				written = 0
-				progress = 0
-				var ip net.IPAddr
-				if ret := net.ParseIP(e.RemoteIP); ret != nil {
-					ip = net.IPAddr{IP: ret}
-				}
-				mid, err := database.OpenMessage(
-					e.MailFrom.String(),
-					e.Helo,
-					e.RcptTo[0].String(),
-					ip,
-					e.MailFrom.String(),
-					e.TLS)
-				if err != nil {
-					return err
-				}
-				e.Values["messageID"] = mid
-				envelope = e
-				return nil
-			}
-
-			sd.Close = func() (err error) {
-				err = chunkBuffer.flush()
-				if err != nil {
-					// TODO we could delete the half saved message here
-					return err
-				}
-				defer chunkBuffer.Reset()
-				if mid, ok := envelope.Values["messageID"].(uint64); ok {
-					err = database.CloseMessage(
-						mid,
-						written,
-						&chunkBuffer.info,
-						subject,
-						envelope.QueuedId,
-						to,
-						from,
-					)
-					if err != nil {
-						return err
-					}
-				}
-				return nil
-			}
-
-			fillVars := func(parts *[]*mime.Part, subject, to, from string) (string, string, string) {
-				if len(*parts) > 0 {
-					if subject == "" {
-						if val, ok := (*parts)[0].Headers["Subject"]; ok {
-							subject = val[0]
-						}
-					}
-					if to == "" {
-						if val, ok := (*parts)[0].Headers["To"]; ok {
-							addr, err := mail.NewAddress(val[0])
-							if err == nil {
-								to = addr.String()
-							}
-						}
-					}
-					if from == "" {
-						if val, ok := (*parts)[0].Headers["From"]; ok {
-							addr, err := mail.NewAddress(val[0])
-							if err == nil {
-								from = addr.String()
-							}
-						}
-					}
-
-				}
-				return subject, to, from
-			}
-
-			return StreamProcessWith(func(p []byte) (count int, err error) {
-				if envelope.Values == nil {
-					return count, errors.New("no message headers found")
-				}
-				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok && len(*parts) > 0 {
-					var pos int
-
-					subject, to, from = fillVars(parts, subject, to, from)
-					offset := msgPos
-					chunkBuffer.currentPart((*parts)[0])
-					for i := progress; i < len(*parts); i++ {
-						part := (*parts)[i]
-
-						// break chunk on new part
-						if part.StartingPos > 0 && part.StartingPos > msgPos {
-							count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
-							written += int64(count)
-
-							err = chunkBuffer.flush()
-							if err != nil {
-								return count, err
-							}
-							chunkBuffer.currentPart(part)
-							fmt.Println("->N")
-							pos += count
-							msgPos = part.StartingPos
-						}
-						// break chunk on header
-						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
-							count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
-							written += int64(count)
-
-							err = chunkBuffer.flush()
-							if err != nil {
-								return count, err
-							}
-							chunkBuffer.currentPart(part)
-							fmt.Println("->H")
-							pos += count
-							msgPos = part.StartingPosBody
-						}
-						// if on the latest (last) part, and yet there is still data to be written out
-						if len(*parts)-1 == i && len(p)-1 > pos {
-							count, _ = chunkBuffer.Write(p[pos:])
-							written += int64(count)
-							pos += count
-							msgPos += uint(count)
-						}
-						// if there's no more data
-						if pos >= len(p) {
-							break
-						}
-					}
-					if len(*parts) > 2 {
-						progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
-					}
-				}
-				return sp.Write(p)
-			})
-		}
-	return sd
-}

+ 1 - 1
backends/s_compress.go

@@ -8,7 +8,7 @@ import (
 )
 
 func init() {
-	streamers["compress"] = func() *StreamDecorator {
+	Streamers["compress"] = func() *StreamDecorator {
 		return StreamCompress()
 	}
 }

+ 1 - 1
backends/s_debug.go

@@ -7,7 +7,7 @@ import (
 )
 
 func init() {
-	streamers["debug"] = func() *StreamDecorator {
+	Streamers["debug"] = func() *StreamDecorator {
 		return StreamDebug()
 	}
 }

+ 1 - 1
backends/s_decompress.go

@@ -8,7 +8,7 @@ import (
 )
 
 func init() {
-	streamers["decompress"] = func() *StreamDecorator {
+	Streamers["decompress"] = func() *StreamDecorator {
 		return StreamDecompress()
 	}
 }

+ 1 - 1
backends/s_header.go

@@ -23,7 +23,7 @@ import (
 // ----------------------------------------------------------------------------------
 
 func init() {
-	streamers["header"] = func() *StreamDecorator {
+	Streamers["header"] = func() *StreamDecorator {
 		return StreamHeader()
 	}
 }

+ 1 - 1
backends/s_headers_parser.go

@@ -21,7 +21,7 @@ import (
 // ----------------------------------------------------------------------------------
 
 func init() {
-	streamers["headersparser"] = func() *StreamDecorator {
+	Streamers["headersparser"] = func() *StreamDecorator {
 		return StreamHeadersParser()
 	}
 }

+ 1 - 1
backends/s_mime.go

@@ -19,7 +19,7 @@ import (
 // ----------------------------------------------------------------------------------
 
 func init() {
-	streamers["mimeanalyzer"] = func() *StreamDecorator {
+	Streamers["mimeanalyzer"] = func() *StreamDecorator {
 		return StreamMimeAnalyzer()
 	}
 }

+ 1 - 1
backends/s_process.go

@@ -7,7 +7,7 @@ import (
 )
 
 func init() {
-	streamers["process"] = func() *StreamDecorator {
+	Streamers["process"] = func() *StreamDecorator {
 		return StreamProcess()
 	}
 }

+ 1 - 1
backends/stream_backend.go

@@ -1,7 +1,7 @@
 package backends
 
 var (
-	streamers map[string]StreamProcessorConstructor
+	Streamers map[string]StreamProcessorConstructor
 )
 
 func init() {

+ 172 - 0
chunk/buffer.go

@@ -0,0 +1,172 @@
+package chunk
+
+import (
+	"crypto/md5"
+	"errors"
+	"fmt"
+	"hash"
+	"strings"
+
+	"github.com/flashmob/go-guerrilla/mail/mime"
+)
+
+type flushEvent func() error
+
+type chunkedBytesBuffer struct {
+	buf          []byte
+	flushTrigger flushEvent
+}
+
+// Flush signals that it's time to write the buffer out to storage
+func (c *chunkedBytesBuffer) Flush() error {
+	if len(c.buf) == 0 {
+		return nil
+	}
+	fmt.Print(string(c.buf))
+	if c.flushTrigger != nil {
+		if err := c.flushTrigger(); err != nil {
+			return err
+		}
+	}
+	c.Reset()
+	return nil
+}
+
+// Reset sets the length back to 0, making it re-usable
+func (c *chunkedBytesBuffer) Reset() {
+	c.buf = c.buf[:0] // set the length back to 0
+}
+
+// Write takes a p slice of bytes and writes it to the buffer.
+// It will never grow the buffer, flushing it as soon as it's full.
+func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
+	remaining := len(p)
+	bufCap := cap(c.buf)
+	for {
+		free := bufCap - len(c.buf)
+		if free > remaining {
+			// enough of room in the buffer
+			c.buf = append(c.buf, p[i:i+remaining]...)
+			i += remaining
+			return
+		} else {
+			// fill the buffer to the 'brim' with a slice from p
+			c.buf = append(c.buf, p[i:i+free]...)
+			remaining -= free
+			i += free
+			err = c.Flush()
+			if err != nil {
+				return i, err
+			}
+			if remaining == 0 {
+				return
+			}
+		}
+	}
+}
+
+// CapTo caps the internal buffer to specified number of bytes, sets the length back to 0
+func (c *chunkedBytesBuffer) CapTo(n int) {
+	if cap(c.buf) == n {
+		return
+	}
+	c.buf = make([]byte, 0, n)
+}
+
+// ChunkedBytesBufferMime decorates chunkedBytesBuffer, specifying that to do when a flush event is triggered
+type ChunkedBytesBufferMime struct {
+	chunkedBytesBuffer
+	current  *mime.Part
+	Info     PartsInfo
+	md5      hash.Hash
+	database ChunkSaverStorage
+}
+
+func NewChunkedBytesBufferMime() *ChunkedBytesBufferMime {
+	b := new(ChunkedBytesBufferMime)
+	b.chunkedBytesBuffer.flushTrigger = func() error {
+		return b.onFlush()
+	}
+	b.md5 = md5.New()
+	b.buf = make([]byte, 0, chunkMaxBytes)
+	return b
+}
+
+func (b *ChunkedBytesBufferMime) SetDatabase(database ChunkSaverStorage) {
+	b.database = database
+}
+
+// onFlush is called whenever the flush event fires.
+// - It saves the chunk to disk and adds the chunk's hash to the list.
+// - It builds the b.Info.Parts structure
+func (b *ChunkedBytesBufferMime) onFlush() error {
+	b.md5.Write(b.buf)
+	var chash HashKey
+	copy(chash[:], b.md5.Sum([]byte{}))
+	if b.current == nil {
+		return errors.New("b.current part is nil")
+	}
+	if size := len(b.Info.Parts); size > 0 && b.Info.Parts[size-1].PartId == b.current.Node {
+		// existing part, just append the hash
+		lastPart := &b.Info.Parts[size-1]
+		lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
+		b.fillInfo(lastPart, size-1)
+		lastPart.Size += uint(len(b.buf))
+	} else {
+		// add it as a new part
+		part := ChunkedPart{
+			PartId:          b.current.Node,
+			ChunkHash:       []HashKey{chash},
+			ContentBoundary: b.Info.boundary(b.current.ContentBoundary),
+			Size:            uint(len(b.buf)),
+		}
+		b.fillInfo(&part, 0)
+		b.Info.Parts = append(b.Info.Parts, part)
+		b.Info.Count++
+	}
+	if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (b *ChunkedBytesBufferMime) fillInfo(cp *ChunkedPart, index int) {
+	if cp.ContentType == "" && b.current.ContentType != nil {
+		cp.ContentType = b.current.ContentType.String()
+	}
+	if cp.Charset == "" && b.current.Charset != "" {
+		cp.Charset = b.current.Charset
+	}
+	if cp.TransferEncoding == "" && b.current.TransferEncoding != "" {
+		cp.TransferEncoding = b.current.TransferEncoding
+	}
+	if cp.ContentDisposition == "" && b.current.ContentDisposition != "" {
+		cp.ContentDisposition = b.current.ContentDisposition
+		if strings.Contains(cp.ContentDisposition, "attach") {
+			b.Info.HasAttach = true
+		}
+	}
+	if cp.ContentType != "" {
+		if b.Info.TextPart == -1 && strings.Contains(cp.ContentType, "text/plain") {
+			b.Info.TextPart = index
+		} else if b.Info.HTMLPart == -1 && strings.Contains(cp.ContentType, "text/html") {
+			b.Info.HTMLPart = index
+		}
+	}
+}
+
+// Reset decorates the Reset method of the chunkedBytesBuffer
+func (b *ChunkedBytesBufferMime) Reset() {
+	b.md5.Reset()
+	b.chunkedBytesBuffer.Reset()
+}
+
+func (b *ChunkedBytesBufferMime) CurrentPart(cp *mime.Part) {
+	if b.current == nil {
+		b.Info = *NewPartsInfo()
+		b.Info.Parts = make([]ChunkedPart, 0, 3)
+		b.Info.TextPart = -1
+		b.Info.HTMLPart = -1
+	}
+	b.current = cp
+}

+ 139 - 0
chunk/chunk.go

@@ -0,0 +1,139 @@
+package chunk
+
+import (
+	"bytes"
+	"compress/zlib"
+	"encoding/base64"
+	"encoding/json"
+	"io/ioutil"
+	"sync"
+)
+
+const chunkMaxBytes = 1024 * 16
+const hashByteSize = 16
+
+type HashKey [hashByteSize]byte
+
+// Pack takes a slice and copies each byte to HashKey internal representation
+func (h *HashKey) Pack(b []byte) {
+	if len(b) < hashByteSize {
+		return
+	}
+	copy(h[:], b[0:hashByteSize])
+}
+
+// String implements the Stringer interface from fmt
+func (h HashKey) String() string {
+	return base64.RawStdEncoding.EncodeToString(h[0:hashByteSize])
+}
+
+// UnmarshalJSON implements the Unmarshaler interface from encoding/json
+func (h *HashKey) UnmarshalJSON(b []byte) error {
+	dbuf := make([]byte, base64.RawStdEncoding.DecodedLen(len(b[1:len(b)-1])))
+	_, err := base64.RawStdEncoding.Decode(dbuf, b[1:len(b)-1])
+	if err != nil {
+		return err
+	}
+	h.Pack(dbuf)
+	return nil
+}
+
+// MarshalJSON implements the Marshaler interface from encoding/json
+// The value is marshaled as a raw base64 to save some bytes
+// eg. instead of typically using hex, de17038001170380011703ff01170380 would be represented as 3hcDgAEXA4ABFwP/ARcDgA
+func (h *HashKey) MarshalJSON() ([]byte, error) {
+	return []byte(`"` + h.String() + `"`), nil
+}
+
+// PartsInfo describes the mime-parts contained in the email
+type PartsInfo struct {
+	Count       uint32        `json:"c"`   // number of parts
+	TextPart    int           `json:"tp"`  // index of the main text part to display
+	HTMLPart    int           `json:"hp"`  // index of the main html part to display (if any)
+	HasAttach   bool          `json:"a"`   // is there an attachment?
+	Parts       []ChunkedPart `json:"p"`   // info describing a mime-part
+	CBoundaries []string      `json:"cbl"` // content boundaries list
+
+	bp sync.Pool // bytes.buffer pool
+}
+
+// ChunkedPart contains header information about a mime-part, including keys pointing to where the data is stored at
+type ChunkedPart struct {
+	PartId             string    `json:"i"`
+	Size               uint      `json:"s"`
+	ChunkHash          []HashKey `json:"h"` // sequence of hashes the data is stored at
+	ContentType        string    `json:"t"`
+	Charset            string    `json:"c"`
+	TransferEncoding   string    `json:"e"`
+	ContentDisposition string    `json:"d"`
+	ContentBoundary    int       `json:"cb"` // index to the CBoundaries list in PartsInfo
+}
+
+func NewPartsInfo() *PartsInfo {
+	pi := new(PartsInfo)
+	pi.bp = sync.Pool{
+		// if not available, then create a new one
+		New: func() interface{} {
+			var b bytes.Buffer
+			return &b
+		},
+	}
+	return pi
+}
+
+// boundary takes a string and returns the index of the string in the info.CBoundaries slice
+func (info *PartsInfo) boundary(cb string) int {
+	for i := range info.CBoundaries {
+		if info.CBoundaries[i] == cb {
+			return i
+		}
+	}
+	info.CBoundaries = append(info.CBoundaries, cb)
+	return len(info.CBoundaries) - 1
+}
+
+// UnmarshalJSON unmarshals the JSON and decompresses using zlib
+func (info *PartsInfo) UnmarshalJSONZlib(b []byte) error {
+
+	r, err := zlib.NewReader(bytes.NewReader(b[1 : len(b)-1]))
+	if err != nil {
+		return err
+	}
+	all, err := ioutil.ReadAll(r)
+	if err != nil {
+		return err
+	}
+	err = json.Unmarshal(all, info)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// MarshalJSONZlib marshals and compresses the bytes using zlib
+func (info *PartsInfo) MarshalJSONZlib() ([]byte, error) {
+
+	buf, err := json.Marshal(info)
+	if err != nil {
+		return buf, err
+	}
+	// borrow a buffer form the pool
+	compressed := info.bp.Get().(*bytes.Buffer)
+	// put back in the pool
+	defer func() {
+		compressed.Reset()
+		info.bp.Put(compressed)
+	}()
+
+	zlibw, err := zlib.NewWriterLevel(compressed, 9)
+	if err != nil {
+		return buf, err
+	}
+	if _, err := zlibw.Write(buf); err != nil {
+		return buf, err
+	}
+	if err := zlibw.Close(); err != nil {
+		return buf, err
+	}
+	return []byte(`"` + compressed.String() + `"`), nil
+}

+ 99 - 0
chunk/decoder.go

@@ -0,0 +1,99 @@
+package chunk
+
+import (
+	"bytes"
+	"io"
+)
+
+type transportEncoding int
+
+const (
+	encodingTypeBase64 transportEncoding = iota
+	encodingTypeQP
+)
+
+// chunkPartDecoder decodes base64 and q-printable, then converting charset to utf8-8
+type chunkPartDecoder struct {
+	*chunkMailReader
+	buf     []byte
+	state   int
+	charset string
+
+	r io.Reader
+}
+
+// db ChunkSaverStorage, email *ChunkSaverEmail, part int)
+/*
+
+r, err := NewChunkMailReader(db, email, part)
+	if err != nil {
+		return nil, err
+	}
+
+*/
+
+// NewChunkPartDecoder reads from an underlying reader r and decodes base64, quoted-printable and decodes
+func NewChunkPartDecoder(r io.Reader, enc transportEncoding, charset string) (*chunkPartDecoder, error) {
+
+	decoder := new(chunkPartDecoder)
+	decoder.r = r
+	return decoder, nil
+}
+
+const chunkSaverNL = '\n'
+
+const (
+	decoderStateFindHeader int = iota
+	decoderStateMatchNL
+	decoderStateDecode
+)
+
+func (r *chunkPartDecoder) Read(p []byte) (n int, err error) {
+	var part *ChunkedPart
+	//if cap(p) != cap(r.buf) {
+	r.buf = make([]byte, len(p), cap(p))
+	var start, buffered int
+	part = &r.email.partsInfo.Parts[r.part]
+	_ = part
+	buffered, err = r.chunkMailReader.Read(r.buf)
+	if buffered == 0 {
+		return
+	}
+	for {
+		switch r.state {
+		case decoderStateFindHeader:
+			// finding the start of the header
+			if start = bytes.Index(r.buf, []byte{chunkSaverNL, chunkSaverNL}); start != -1 {
+				start += 2                   // skip the \n\n
+				r.state = decoderStateDecode // found the header
+				continue                     // continue scanning
+			} else if r.buf[len(r.buf)-1] == chunkSaverNL {
+				// the last char is a \n so next call to Read will check if it starts with a matching \n
+				r.state = decoderStateMatchNL
+			}
+		case decoderStateMatchNL:
+			if r.buf[0] == '\n' {
+				// found the header
+				start = 1
+				r.state = decoderStateDecode
+				continue
+			} else {
+				r.state = decoderStateFindHeader
+				continue
+			}
+
+		case decoderStateDecode:
+			if start < len(r.buf) {
+				// todo decode here (q-printable, base64, charset)
+				n += copy(p[:], r.buf[start:buffered])
+			}
+			return
+		}
+
+		buffered, err = r.chunkMailReader.Read(r.buf)
+		if buffered == 0 {
+			return
+		}
+	}
+
+}

+ 268 - 0
chunk/processor.go

@@ -0,0 +1,268 @@
+package chunk
+
+import (
+	"errors"
+	"fmt"
+	"net"
+
+	"github.com/flashmob/go-guerrilla/backends"
+	"github.com/flashmob/go-guerrilla/mail"
+	"github.com/flashmob/go-guerrilla/mail/mime"
+)
+
+// ----------------------------------------------------------------------------------
+// Processor Name: ChunkSaver
+// ----------------------------------------------------------------------------------
+// Description   : Takes the stream and saves it in chunks. Chunks are split on the
+//               : chunksaver_chunk_size config setting, and also at the end of MIME parts,
+//               : and after a header. This allows for basic de-duplication: we can take a
+//               : hash of each chunk, then check the database to see if we have it already.
+//               : We don't need to write it to the database, but take the reference of the
+//               : previously saved chunk and only increment the reference count.
+//               : The rationale to put headers and bodies into separate chunks is
+//               : due to headers often containing more unique data, while the bodies are
+//               : often duplicated, especially for messages that are CC'd or forwarded
+// ----------------------------------------------------------------------------------
+// Requires      : "mimeanalyzer" stream processor to be enabled before it
+// ----------------------------------------------------------------------------------
+// Config Options: chunksaver_chunk_size - maximum chunk size, in bytes
+// --------------:-------------------------------------------------------------------
+// Input         : e.Values["MimeParts"] Which is of type *[]*mime.Part, as populated by "mimeanalyzer"
+// ----------------------------------------------------------------------------------
+// Output        :
+// ----------------------------------------------------------------------------------
+
+func init() {
+	backends.Streamers["chunksaver"] = func() *backends.StreamDecorator {
+		return Chunksaver()
+	}
+}
+
+type ChunkSaverConfig struct {
+	// ChunkMaxBytes controls the maximum buffer size for saving
+	// 16KB default.
+	ChunkMaxBytes int    `json:"chunksaver_chunk_size,omitempty"`
+	StorageEngine string `json:"chunksaver_storage_engine,omitempty"`
+	CompressLevel int    `json:"chunksaver_compress_level,omitempty"`
+}
+
+//const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
+/**
+*
+ * A chunk ends ether:
+ * after xKB or after end of a part, or end of header
+ *
+ * - buffer first chunk
+ * - if didn't receive first chunk for more than x bytes, save normally
+ *
+*/
+func Chunksaver() *backends.StreamDecorator {
+
+	sd := &backends.StreamDecorator{}
+	sd.Decorate =
+		func(sp backends.StreamProcessor, a ...interface{}) backends.StreamProcessor {
+			var (
+				envelope    *mail.Envelope
+				chunkBuffer *ChunkedBytesBufferMime
+				msgPos      uint
+				database    ChunkSaverStorage
+				written     int64
+
+				// just some headers from the first mime-part
+				subject string
+				to      string
+				from    string
+
+				progress int // tracks which mime parts were processed
+			)
+
+			var config *ChunkSaverConfig
+			// optional dependency injection
+			for i := range a {
+				if db, ok := a[i].(ChunkSaverStorage); ok {
+					database = db
+				}
+				if buff, ok := a[i].(*ChunkedBytesBufferMime); ok {
+					chunkBuffer = buff
+				}
+			}
+
+			backends.Svc.AddInitializer(backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
+
+				configType := backends.BaseConfig(&ChunkSaverConfig{})
+				bcfg, err := backends.Svc.ExtractConfig(backendConfig, configType)
+				if err != nil {
+					return err
+				}
+				config = bcfg.(*ChunkSaverConfig)
+				if chunkBuffer == nil {
+					chunkBuffer = NewChunkedBytesBufferMime()
+				}
+				// configure storage if none was injected
+				if database == nil {
+					if config.StorageEngine == "memory" {
+						db := new(ChunkSaverMemory)
+						db.CompressLevel = config.CompressLevel
+						database = db
+					} else {
+						db := new(ChunkSaverSQL)
+						database = db
+					}
+				}
+				err = database.Initialize(backendConfig)
+				if err != nil {
+					return err
+				}
+				// configure the chunks buffer
+				if config.ChunkMaxBytes > 0 {
+					chunkBuffer.CapTo(config.ChunkMaxBytes)
+				} else {
+					chunkBuffer.CapTo(chunkMaxBytes)
+				}
+				chunkBuffer.SetDatabase(database)
+
+				return nil
+			}))
+
+			backends.Svc.AddShutdowner(backends.ShutdownWith(func() error {
+				err := database.Shutdown()
+				return err
+			}))
+
+			sd.Open = func(e *mail.Envelope) error {
+				// create a new entry & grab the id
+				written = 0
+				progress = 0
+				var ip net.IPAddr
+				if ret := net.ParseIP(e.RemoteIP); ret != nil {
+					ip = net.IPAddr{IP: ret}
+				}
+				mid, err := database.OpenMessage(
+					e.MailFrom.String(),
+					e.Helo,
+					e.RcptTo[0].String(),
+					ip,
+					e.MailFrom.String(),
+					e.TLS)
+				if err != nil {
+					return err
+				}
+				e.Values["messageID"] = mid
+				envelope = e
+				return nil
+			}
+
+			sd.Close = func() (err error) {
+				err = chunkBuffer.Flush()
+				if err != nil {
+					// TODO we could delete the half saved message here
+					return err
+				}
+				defer chunkBuffer.Reset()
+				if mid, ok := envelope.Values["messageID"].(uint64); ok {
+					err = database.CloseMessage(
+						mid,
+						written,
+						&chunkBuffer.Info,
+						subject,
+						envelope.QueuedId,
+						to,
+						from,
+					)
+					if err != nil {
+						return err
+					}
+				}
+				return nil
+			}
+
+			fillVars := func(parts *[]*mime.Part, subject, to, from string) (string, string, string) {
+				if len(*parts) > 0 {
+					if subject == "" {
+						if val, ok := (*parts)[0].Headers["Subject"]; ok {
+							subject = val[0]
+						}
+					}
+					if to == "" {
+						if val, ok := (*parts)[0].Headers["To"]; ok {
+							addr, err := mail.NewAddress(val[0])
+							if err == nil {
+								to = addr.String()
+							}
+						}
+					}
+					if from == "" {
+						if val, ok := (*parts)[0].Headers["From"]; ok {
+							addr, err := mail.NewAddress(val[0])
+							if err == nil {
+								from = addr.String()
+							}
+						}
+					}
+
+				}
+				return subject, to, from
+			}
+
+			return backends.StreamProcessWith(func(p []byte) (count int, err error) {
+				if envelope.Values == nil {
+					return count, errors.New("no message headers found")
+				}
+				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok && len(*parts) > 0 {
+					var pos int
+
+					subject, to, from = fillVars(parts, subject, to, from)
+					offset := msgPos
+					chunkBuffer.CurrentPart((*parts)[0])
+					for i := progress; i < len(*parts); i++ {
+						part := (*parts)[i]
+
+						// break chunk on new part
+						if part.StartingPos > 0 && part.StartingPos > msgPos {
+							count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
+							written += int64(count)
+
+							err = chunkBuffer.Flush()
+							if err != nil {
+								return count, err
+							}
+							chunkBuffer.CurrentPart(part)
+							fmt.Println("->N")
+							pos += count
+							msgPos = part.StartingPos
+						}
+						// break chunk on header
+						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
+							count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
+							written += int64(count)
+
+							err = chunkBuffer.Flush()
+							if err != nil {
+								return count, err
+							}
+							chunkBuffer.CurrentPart(part)
+							fmt.Println("->H")
+							pos += count
+							msgPos = part.StartingPosBody
+						}
+						// if on the latest (last) part, and yet there is still data to be written out
+						if len(*parts)-1 == i && len(p)-1 > pos {
+							count, _ = chunkBuffer.Write(p[pos:])
+							written += int64(count)
+							pos += count
+							msgPos += uint(count)
+						}
+						// if there's no more data
+						if pos >= len(p) {
+							break
+						}
+					}
+					if len(*parts) > 2 {
+						progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
+					}
+				}
+				return sp.Write(p)
+			})
+		}
+	return sd
+}

+ 183 - 0
chunk/reader.go

@@ -0,0 +1,183 @@
+package chunk
+
+import (
+	"errors"
+	"fmt"
+	"io"
+)
+
+type chunkMailReader struct {
+	db    ChunkSaverStorage
+	email *ChunkSaverEmail
+	// part requests a part. If 0, all the parts are read sequentially
+	part int
+	i, j int
+
+	cache cachedChunks
+}
+
+// NewChunkMailReader loads the email and selects which mime-part Read will read, starting from 1
+// if part is 0, Read will read in the entire message. 1 selects the first part, 2 2nd, and so on..
+func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int) (*chunkMailReader, error) {
+	r := new(chunkMailReader)
+	r.db = db
+	r.part = part
+	if email == nil {
+		return nil, errors.New("nil email")
+	} else {
+		r.email = email
+	}
+	if err := r.SeekPart(part); err != nil {
+		return nil, err
+	}
+	r.cache = cachedChunks{
+		db: db,
+	}
+	return r, nil
+}
+
+// SeekPart resets the reader. The part argument chooses which part Read will read in
+// If part is 0, Read will return the entire message
+func (r *chunkMailReader) SeekPart(part int) error {
+	if parts := len(r.email.partsInfo.Parts); parts == 0 {
+		return errors.New("email has mime parts missing")
+	} else if part > parts {
+		return errors.New("no such part available")
+	}
+	r.i = part
+	r.j = 0
+	return nil
+}
+
+type cachedChunks struct {
+	chunks    []*ChunkSaverChunk
+	hashIndex map[int]HashKey
+	db        ChunkSaverStorage
+}
+
+const chunkCachePreload = 2
+
+// warm allocates the chunk cache, and gets the first few and stores them in the cache
+func (c *cachedChunks) warm(hashes ...HashKey) (int, error) {
+
+	if c.hashIndex == nil {
+		c.hashIndex = make(map[int]HashKey, len(hashes))
+	}
+	if c.chunks == nil {
+		c.chunks = make([]*ChunkSaverChunk, 0, 100)
+	}
+	if len(c.chunks) > 0 {
+		// already been filled
+		return len(c.chunks), nil
+	}
+	// let's pre-load some hashes.
+	preload := chunkCachePreload
+	if len(hashes) < preload {
+		preload = len(hashes)
+	}
+	if chunks, err := c.db.GetChunks(hashes[0:preload]...); err != nil {
+		return 0, err
+	} else {
+		for i := range hashes {
+			c.hashIndex[i] = hashes[i]
+			if i < preload {
+				c.chunks = append(c.chunks, chunks[i])
+			} else {
+				// don't pre-load
+				c.chunks = append(c.chunks, nil) // nil will be a placeholder for our chunk
+			}
+		}
+	}
+	return len(c.chunks), nil
+}
+
+// get returns a chunk. If the chunk doesn't exist, it gets it and pre-loads the next few
+// also removes the previous chunks that now have become stale
+func (c *cachedChunks) get(i int) (*ChunkSaverChunk, error) {
+	if i > len(c.chunks) {
+		return nil, errors.New("not enough chunks")
+	}
+	if c.chunks[i] != nil {
+		// cache hit!
+		return c.chunks[i], nil
+	} else {
+		var toGet []HashKey
+		if key, ok := c.hashIndex[i]; ok {
+			toGet = append(toGet, key)
+		} else {
+			return nil, errors.New(fmt.Sprintf("hash for key [%s] not found", key))
+		}
+		// make a list of chunks to load (extra ones to be pre-loaded)
+		for to := i + 1; to < len(c.chunks) || to > chunkCachePreload+i; to++ {
+			if key, ok := c.hashIndex[to]; ok {
+				toGet = append(toGet, key)
+			}
+		}
+		if chunks, err := c.db.GetChunks(toGet...); err != nil {
+			return nil, err
+		} else {
+			// cache the pre-loaded chunks
+			for j := i; j < len(c.chunks); j++ {
+				c.chunks[j] = chunks[j-i]
+				c.hashIndex[j] = toGet[j-i]
+			}
+			// remove any old ones (walk back)
+			for j := i; j > -1; j-- {
+				if c.chunks[j] != nil {
+					c.chunks[j] = nil
+				} else {
+					break
+				}
+			}
+			// return the chunk asked for
+			return chunks[0], nil
+		}
+	}
+}
+
+func (c *cachedChunks) empty() {
+	for i := range c.chunks {
+		c.chunks[i] = nil
+	}
+	c.chunks = c.chunks[:] // set len to 0
+	for key := range c.hashIndex {
+		delete(c.hashIndex, key)
+	}
+}
+
+// Read implements the io.Reader interface
+func (r *chunkMailReader) Read(p []byte) (n int, err error) {
+	var length int
+	for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
+		length, err = r.cache.warm(r.email.partsInfo.Parts[r.i].ChunkHash...)
+		if err != nil {
+			return
+		}
+		var nRead int
+		for r.j < length {
+			chunk, err := r.cache.get(r.j)
+			if err != nil {
+				return nRead, err
+			}
+			nRead, err = chunk.data.Read(p)
+			if err == io.EOF {
+				r.j++ // advance to the next chunk
+				err = nil
+			}
+			if r.j == length { // last chunk in a part?
+				r.j = 0 // reset chunk index
+				r.i++   // advance to the next part
+				if r.i == len(r.email.partsInfo.Parts) || r.part > 0 {
+					// there are no more parts to return
+					err = io.EOF
+					r.cache.empty()
+				}
+			}
+			// unless there's an error, the next time this function will be
+			// called, it will read the next chunk
+			return nRead, err
+		}
+	}
+	err = io.EOF
+	return n, err
+}

+ 24 - 23
backends/s_chunksaver_test.go → chunk/s_chunksaver_test.go

@@ -1,8 +1,9 @@
-package backends
+package chunk
 
 import (
 	"bytes"
 	"fmt"
+	"github.com/flashmob/go-guerrilla/backends"
 	"github.com/flashmob/go-guerrilla/mail"
 	"io"
 	"os"
@@ -13,7 +14,7 @@ func TestChunkedBytesBuffer(t *testing.T) {
 	var in string
 
 	var buf chunkedBytesBuffer
-	buf.capTo(64)
+	buf.CapTo(64)
 
 	// the data to write is over-aligned
 	in = `123456789012345678901234567890123456789012345678901234567890abcde12345678901234567890123456789012345678901234567890123456789abcdef` // len == 130
@@ -24,7 +25,7 @@ func TestChunkedBytesBuffer(t *testing.T) {
 
 	// the data to write is aligned
 	var buf2 chunkedBytesBuffer
-	buf2.capTo(64)
+	buf2.CapTo(64)
 	in = `123456789012345678901234567890123456789012345678901234567890abcde12345678901234567890123456789012345678901234567890123456789abcd` // len == 128
 	i, _ = buf2.Write([]byte(in[:]))
 	if i != len(in) {
@@ -33,7 +34,7 @@ func TestChunkedBytesBuffer(t *testing.T) {
 
 	// the data to write is under-aligned
 	var buf3 chunkedBytesBuffer
-	buf3.capTo(64)
+	buf3.CapTo(64)
 	in = `123456789012345678901234567890123456789012345678901234567890abcde12345678901234567890123456789012345678901234567890123456789ab` // len == 126
 	i, _ = buf3.Write([]byte(in[:]))
 	if i != len(in) {
@@ -42,7 +43,7 @@ func TestChunkedBytesBuffer(t *testing.T) {
 
 	// the data to write is smaller than the buffer
 	var buf4 chunkedBytesBuffer
-	buf4.capTo(64)
+	buf4.CapTo(64)
 	in = `1234567890` // len == 10
 	i, _ = buf4.Write([]byte(in[:]))
 	if i != len(in) {
@@ -52,7 +53,7 @@ func TestChunkedBytesBuffer(t *testing.T) {
 	// what if the buffer already contains stuff before Write is called
 	// and the buffer len is smaller than the len of the slice of bytes we pass it?
 	var buf5 chunkedBytesBuffer
-	buf5.capTo(5)
+	buf5.CapTo(5)
 	buf5.buf = append(buf5.buf, []byte{'a', 'b', 'c'}...)
 	in = `1234567890` // len == 10
 	i, _ = buf5.Write([]byte(in[:]))
@@ -345,12 +346,12 @@ func TestChunkSaverWrite(t *testing.T) {
 	e.RcptTo = append(e.RcptTo, to)
 	e.MailFrom, _ = mail.NewAddress("[email protected]")
 
-	store := new(chunkSaverMemory)
-	chunkBuffer := newChunkedBytesBufferMime()
+	store := new(ChunkSaverMemory)
+	chunkBuffer := NewChunkedBytesBufferMime()
 	//chunkBuffer.setDatabase(store)
 	// instantiate the chunk saver
-	chunksaver := streamers["chunksaver"]()
-	mimeanalyzer := streamers["mimeanalyzer"]()
+	chunksaver := backends.Streamers["chunksaver"]()
+	mimeanalyzer := backends.Streamers["mimeanalyzer"]()
 
 	// add the default processor as the underlying processor for chunksaver
 	// and chain it with mimeanalyzer.
@@ -358,14 +359,14 @@ func TestChunkSaverWrite(t *testing.T) {
 	// This will also set our Open, Close and Initialize functions
 	// we also inject a ChunkSaverStorage and a ChunkedBytesBufferMime
 
-	stream := mimeanalyzer.Decorate(chunksaver.Decorate(DefaultStreamProcessor{}, store, chunkBuffer))
+	stream := mimeanalyzer.Decorate(chunksaver.Decorate(backends.DefaultStreamProcessor{}, store, chunkBuffer))
 
 	// configure the buffer cap
-	bc := BackendConfig{}
+	bc := backends.BackendConfig{}
 	bc["chunksaver_chunk_size"] = 8000
 	bc["chunksaver_storage_engine"] = "memory"
 	bc["chunksaver_compress_level"] = 0
-	_ = Svc.initialize(bc)
+	_ = backends.Svc.Initialize(bc)
 
 	// give it the envelope with the parse results
 	_ = chunksaver.Open(e)
@@ -419,15 +420,15 @@ func TestChunkSaverWrite(t *testing.T) {
 				t.Error("incorrect size, expecting", email.partsInfo.Parts[i].Size, "but read:", w)
 			}
 		}
-
-		dr, err := NewChunkPartDecoder(store, email, 0)
-		_ = dr
-		if err != nil {
-			t.Error(err)
-			t.FailNow()
-		}
-		var decoded bytes.Buffer
-		io.Copy(&decoded, dr)
-
+		/*
+			dr, err := NewChunkPartDecoder(store, email, 0)
+			_ = dr
+			if err != nil {
+				t.Error(err)
+				t.FailNow()
+			}
+			var decoded bytes.Buffer
+			io.Copy(&decoded, dr)
+		*/
 	}
 }

+ 50 - 0
chunk/store.go

@@ -0,0 +1,50 @@
+package chunk
+
+import (
+	"github.com/flashmob/go-guerrilla/backends"
+	"io"
+	"net"
+	"time"
+)
+
+// ChunkSaverStorage defines an interface to the storage layer (the database)
+type ChunkSaverStorage interface {
+	// OpenMessage is used to begin saving an email. An email id is returned and used to call CloseMessage later
+	OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error)
+	// CloseMessage finalizes the writing of an email. Additional data collected while parsing the email is saved
+	CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error
+	// AddChunk saves a chunk of bytes to a given hash key
+	AddChunk(data []byte, hash []byte) error
+	// GetEmail returns an email that's been saved
+	GetEmail(mailID uint64) (*ChunkSaverEmail, error)
+	// GetChunks loads in the specified chunks of bytes from storage
+	GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error)
+	// Initialize is called when the backend is started
+	Initialize(cfg backends.BackendConfig) error
+	// Shutdown is called when the backend gets shutdown.
+	Shutdown() (err error)
+}
+
+// ChunkSaverEmail represents an email
+type ChunkSaverEmail struct {
+	mailID     uint64
+	createdAt  time.Time
+	size       int64
+	from       string // from stores the email address found in the "From" header field
+	to         string // to stores the email address found in the "From" header field
+	partsInfo  PartsInfo
+	helo       string // helo message given by the client when the message was transmitted
+	subject    string // subject stores the value from the first "Subject" header field
+	deliveryID string
+	recipient  string     // recipient is the email address that the server received from the RCPT TO command
+	ipv4       net.IPAddr // set to a value if client connected via ipv4
+	ipv6       net.IPAddr // set to a value if client connected via ipv6
+	returnPath string     // returnPath is the email address that the server received from the MAIL FROM command
+	isTLS      bool       // isTLS is true when TLS was used to connect
+}
+
+type ChunkSaverChunk struct {
+	modifiedAt     time.Time
+	referenceCount uint // referenceCount counts how many emails reference this chunk
+	data           io.Reader
+}

+ 185 - 0
chunk/store_memory.go

@@ -0,0 +1,185 @@
+package chunk
+
+import (
+	"bytes"
+	"compress/zlib"
+	"errors"
+	"github.com/flashmob/go-guerrilla/backends"
+	"net"
+	"time"
+)
+
+type ChunkSaverMemory struct {
+	chunks        map[HashKey]*chunkSaverMemoryChunk
+	emails        []*chunkSaverMemoryEmail
+	nextID        uint64
+	IDOffset      uint64
+	CompressLevel int
+}
+
+type chunkSaverMemoryEmail struct {
+	mailID     uint64
+	createdAt  time.Time
+	size       int64
+	from       string
+	to         string
+	partsInfo  []byte
+	helo       string
+	subject    string
+	deliveryID string
+	recipient  string
+	ipv4       net.IPAddr
+	ipv6       net.IPAddr
+	returnPath string
+	isTLS      bool
+}
+
+type chunkSaverMemoryChunk struct {
+	modifiedAt     time.Time
+	referenceCount uint
+	data           []byte
+}
+
+// OpenMessage implements the ChunkSaverStorage interface
+func (m *ChunkSaverMemory) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
+	var ip4, ip6 net.IPAddr
+	if ip := ipAddress.IP.To4(); ip != nil {
+		ip4 = ipAddress
+	} else {
+		ip6 = ipAddress
+	}
+	email := chunkSaverMemoryEmail{
+		mailID:     m.nextID,
+		createdAt:  time.Now(),
+		from:       from,
+		helo:       helo,
+		recipient:  recipient,
+		ipv4:       ip4,
+		ipv6:       ip6,
+		returnPath: returnPath,
+		isTLS:      isTLS,
+	}
+	m.emails = append(m.emails, &email)
+	m.nextID++
+	return email.mailID, nil
+}
+
+// CloseMessage implements the ChunkSaverStorage interface
+func (m *ChunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
+	if email := m.emails[mailID-m.IDOffset]; email == nil {
+		return errors.New("email not found")
+	} else {
+		email.size = size
+		if info, err := partsInfo.MarshalJSONZlib(); err != nil {
+			return err
+		} else {
+			email.partsInfo = info
+		}
+		email.subject = subject
+		email.deliveryID = deliveryID
+		email.to = to
+		email.from = from
+		email.size = size
+	}
+	return nil
+}
+
+// AddChunk implements the ChunkSaverStorage interface
+func (m *ChunkSaverMemory) AddChunk(data []byte, hash []byte) error {
+	var key HashKey
+	if len(hash) != hashByteSize {
+		return errors.New("invalid hash")
+	}
+	key.Pack(hash)
+	var compressed bytes.Buffer
+	zlibw, err := zlib.NewWriterLevel(&compressed, m.CompressLevel)
+	if err != nil {
+		return err
+	}
+	if chunk, ok := m.chunks[key]; ok {
+		// only update the counters and update time
+		chunk.referenceCount++
+		chunk.modifiedAt = time.Now()
+	} else {
+		if _, err := zlibw.Write(data); err != nil {
+			return err
+		}
+		if err := zlibw.Close(); err != nil {
+			return err
+		}
+		// add a new chunk
+		newChunk := chunkSaverMemoryChunk{
+			modifiedAt:     time.Now(),
+			referenceCount: 1,
+			data:           compressed.Bytes(),
+		}
+		m.chunks[key] = &newChunk
+	}
+	return nil
+}
+
+// Initialize implements the ChunkSaverStorage interface
+func (m *ChunkSaverMemory) Initialize(cfg backends.BackendConfig) error {
+	m.IDOffset = 1
+	m.nextID = m.IDOffset
+	m.emails = make([]*chunkSaverMemoryEmail, 0, 100)
+	m.chunks = make(map[HashKey]*chunkSaverMemoryChunk, 1000)
+	m.CompressLevel = zlib.NoCompression
+	return nil
+}
+
+// Shutdown implements the ChunkSaverStorage interface
+func (m *ChunkSaverMemory) Shutdown() (err error) {
+	m.emails = nil
+	m.chunks = nil
+	return nil
+}
+
+// GetEmail implements the ChunkSaverStorage interface
+func (m *ChunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
+	if size := uint64(len(m.emails)) - m.IDOffset; size > mailID-m.IDOffset {
+		return nil, errors.New("mail not found")
+	}
+	email := m.emails[mailID-m.IDOffset]
+	pi := NewPartsInfo()
+	if err := pi.UnmarshalJSONZlib(email.partsInfo); err != nil {
+		return nil, err
+	}
+	return &ChunkSaverEmail{
+		mailID:     email.mailID,
+		createdAt:  email.createdAt,
+		size:       email.size,
+		from:       email.from,
+		to:         email.to,
+		partsInfo:  *pi,
+		helo:       email.helo,
+		subject:    email.subject,
+		deliveryID: email.deliveryID,
+		recipient:  email.recipient,
+		ipv4:       email.ipv4,
+		ipv6:       email.ipv6,
+		returnPath: email.returnPath,
+		isTLS:      email.isTLS,
+	}, nil
+}
+
+// GetChunk implements the ChunkSaverStorage interface
+func (m *ChunkSaverMemory) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
+	result := make([]*ChunkSaverChunk, 0, len(hash))
+	var key HashKey
+	for i := range hash {
+		key = hash[i]
+		if c, ok := m.chunks[key]; ok {
+			zwr, err := zlib.NewReader(bytes.NewReader(c.data))
+			if err != nil {
+				return nil, err
+			}
+			result = append(result, &ChunkSaverChunk{
+				modifiedAt:     c.modifiedAt,
+				referenceCount: c.referenceCount,
+				data:           zwr,
+			})
+		}
+	}
+	return result, nil
+}

+ 224 - 0
chunk/store_sql.go

@@ -0,0 +1,224 @@
+package chunk
+
+import (
+	"database/sql"
+	"encoding/binary"
+	"encoding/json"
+	"github.com/flashmob/go-guerrilla/backends"
+	"net"
+)
+
+type chunkSaverSQLConfig struct {
+	EmailTable  string `json:"chunksaver_email_table,omitempty"`
+	ChunkTable  string `json:"chunksaver_chunk_table,omitempty"`
+	Driver      string `json:"chunksaver_sql_driver,omitempty"`
+	DSN         string `json:"chunksaver_sql_dsn,omitempty"`
+	PrimaryHost string `json:"chunksaver_primary_mail_host,omitempty"`
+}
+
+// ChunkSaverSQL implements the ChunkSaverStorage interface
+type ChunkSaverSQL struct {
+	config     *chunkSaverSQLConfig
+	statements map[string]*sql.Stmt
+	db         *sql.DB
+}
+
+func (c *ChunkSaverSQL) connect() (*sql.DB, error) {
+	var err error
+	if c.db, err = sql.Open(c.config.Driver, c.config.DSN); err != nil {
+		backends.Log().Error("cannot open database: ", err)
+		return nil, err
+	}
+	// do we have permission to access the table?
+	_, err = c.db.Query("SELECT mail_id FROM " + c.config.EmailTable + " LIMIT 1")
+	if err != nil {
+		return nil, err
+	}
+	return c.db, err
+}
+
+func (c *ChunkSaverSQL) prepareSql() error {
+	if c.statements == nil {
+		c.statements = make(map[string]*sql.Stmt)
+	}
+
+	if stmt, err := c.db.Prepare(`INSERT INTO ` +
+		c.config.EmailTable +
+		` (from, helo, recipient, ipv4_addr, ipv6_addr, return_path, is_tls) 
+ VALUES(?, ?, ?, ?, ?, ?, ?)`); err != nil {
+		return err
+	} else {
+		c.statements["insertEmail"] = stmt
+	}
+
+	// begin inserting an email (before saving chunks)
+	if stmt, err := c.db.Prepare(`INSERT INTO ` +
+		c.config.ChunkTable +
+		` (data, hash) 
+ VALUES(?, ?)`); err != nil {
+		return err
+	} else {
+		c.statements["insertChunk"] = stmt
+	}
+
+	// finalize the email (the connection closed)
+	if stmt, err := c.db.Prepare(`
+		UPDATE ` + c.config.EmailTable + ` 
+			SET size=?, parts_info = ?, subject, delivery_id = ?, to = ? 
+		WHERE mail_id = ? `); err != nil {
+		return err
+	} else {
+		c.statements["finalizeEmail"] = stmt
+	}
+
+	// Check the existence of a chunk (the reference_count col is incremented if it exists)
+	// This means we can avoid re-inserting an existing chunk, only update its reference_count
+	if stmt, err := c.db.Prepare(`
+		UPDATE ` + c.config.ChunkTable + ` 
+			SET reference_count=reference_count+1 
+		WHERE hash = ? `); err != nil {
+		return err
+	} else {
+		c.statements["chunkReferenceIncr"] = stmt
+	}
+
+	// If the reference_count is 0 then it means the chunk has been deleted
+	// Chunks are soft-deleted for now, hard-deleted by another sweeper query as they become stale.
+	if stmt, err := c.db.Prepare(`
+		UPDATE ` + c.config.ChunkTable + ` 
+			SET reference_count=reference_count-1 
+		WHERE hash = ? AND reference_count > 0`); err != nil {
+		return err
+	} else {
+		c.statements["chunkReferenceDecr"] = stmt
+	}
+
+	// fetch an email
+	if stmt, err := c.db.Prepare(`
+		SELECT * 
+		from ` + c.config.EmailTable + ` 
+		where mail_id=?`); err != nil {
+		return err
+	} else {
+		c.statements["selectMail"] = stmt
+	}
+
+	// fetch a chunk
+	if stmt, err := c.db.Prepare(`
+		SELECT * 
+		from ` + c.config.ChunkTable + ` 
+		where hash=?`); err != nil {
+		return err
+	} else {
+		c.statements["selectChunk"] = stmt
+	}
+
+	// TODO sweep old chunks
+
+	// TODO sweep incomplete emails
+
+	return nil
+}
+
+// OpenMessage implements the ChunkSaverStorage interface
+func (c *ChunkSaverSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
+
+	// if it's ipv4 then we want ipv6 to be 0, and vice-versa
+	var ip4 uint32
+	ip6 := make([]byte, 16)
+	if ip := ipAddress.IP.To4(); ip != nil {
+		ip4 = binary.BigEndian.Uint32(ip)
+	} else {
+		_ = copy(ip6, ipAddress.IP)
+	}
+	r, err := c.statements["insertEmail"].Exec(from, helo, recipient, ip4, ip6, returnPath, isTLS)
+	if err != nil {
+		return 0, err
+	}
+	id, err := r.LastInsertId()
+	if err != nil {
+		return 0, err
+	}
+	return uint64(id), err
+}
+
+// AddChunk implements the ChunkSaverStorage interface
+func (c *ChunkSaverSQL) AddChunk(data []byte, hash []byte) error {
+	// attempt to increment the reference_count (it means the chunk is already in there)
+	r, err := c.statements["chunkReferenceIncr"].Exec(hash)
+	if err != nil {
+		return err
+	}
+	affected, err := r.RowsAffected()
+	if err != nil {
+		return err
+	}
+	if affected == 0 {
+		// chunk isn't in there, let's insert it
+		_, err := c.statements["insertChunk"].Exec(data, hash)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// CloseMessage implements the ChunkSaverStorage interface
+func (c *ChunkSaverSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
+	partsInfoJson, err := json.Marshal(partsInfo)
+	if err != nil {
+		return err
+	}
+	_, err = c.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, deliveryID, to, mailID)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// Initialize loads the specific database config, connects to the db, prepares statements
+func (c *ChunkSaverSQL) Initialize(cfg backends.BackendConfig) error {
+	configType := backends.BaseConfig(&chunkSaverSQLConfig{})
+	bcfg, err := backends.Svc.ExtractConfig(cfg, configType)
+	if err != nil {
+		return err
+	}
+	c.config = bcfg.(*chunkSaverSQLConfig)
+	c.db, err = c.connect()
+	if err != nil {
+		return err
+	}
+	err = c.prepareSql()
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// Shutdown implements the ChunkSaverStorage interface
+func (c *ChunkSaverSQL) Shutdown() (err error) {
+	defer func() {
+		closeErr := c.db.Close()
+		if closeErr != err {
+			backends.Log().WithError(err).Error("failed to close sql database")
+			err = closeErr
+		}
+	}()
+	for i := range c.statements {
+		if err = c.statements[i].Close(); err != nil {
+			backends.Log().WithError(err).Error("failed to close sql statement")
+		}
+	}
+	return err
+}
+
+// GetEmail implements the ChunkSaverStorage interface
+func (c *ChunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
+	return &ChunkSaverEmail{}, nil
+}
+
+// GetChunk implements the ChunkSaverStorage interface
+func (c *ChunkSaverSQL) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
+	result := make([]*ChunkSaverChunk, 0, len(hash))
+	return result, nil
+}

+ 1 - 1
tests/pidfilex.pid

@@ -1 +1 @@
-12398
+30534

+ 0 - 19
tests/testlog

@@ -1,19 +0,0 @@
-time="2019-08-25T23:36:04+10:00" level=debug msg="making servers"
-time="2019-08-25T23:36:04+10:00" level=info msg="Starting: 127.0.0.1:2525"
-time="2019-08-25T23:36:04+10:00" level=info msg="Listening on TCP 127.0.0.1:2525"
-time="2019-08-25T23:36:04+10:00" level=info msg="processing worker started (#1)"
-time="2019-08-25T23:36:04+10:00" level=debug msg="[127.0.0.1:2525] Waiting for a new client. Next Client ID: 1"
-time="2019-08-25T23:36:04+10:00" level=info msg="main log configured to tests/testlog"
-time="2019-08-25T23:36:04+10:00" level=debug msg="[127.0.0.1:2525] Waiting for a new client. Next Client ID: 2"
-time="2019-08-25T23:36:04+10:00" level=info msg="Handle client [127.0.0.1], id: 1"
-time="2019-08-25T23:36:04+10:00" level=debug msg="Writing response to client: \n220 oberon SMTP Guerrilla(unknown) #1 (1) 2019-08-25T23:36:04+10:00\r\n"
-time="2019-08-25T23:36:04+10:00" level=debug msg="Client sent: HELO maildiranasaurustester"
-time="2019-08-25T23:36:04+10:00" level=debug msg="Writing response to client: \n250 oberon Hello\r\n"
-time="2019-08-25T23:36:04+10:00" level=debug msg="Client sent: MAIL FROM:<[email protected]>"
-time="2019-08-25T23:36:04+10:00" level=debug msg="Writing response to client: \n250 2.1.0 OK\r\n"
-time="2019-08-25T23:36:04+10:00" level=debug msg="Client sent: RCPT TO:<[email protected]>"
-time="2019-08-25T23:36:04+10:00" level=debug msg="Writing response to client: \n250 2.1.5 OK\r\n"
-time="2019-08-25T23:36:04+10:00" level=debug msg="Client sent: DATA"
-time="2019-08-25T23:36:04+10:00" level=debug msg="Writing response to client: \n354 Enter message, ending with '.' on a line by itself\r\n"
-time="2019-08-25T23:36:42+10:00" level=error msg="Backend has timed out while saving email"
-time="2019-08-25T23:36:43+10:00" level=warning msg="Error reading data" error="gateway timeout"