Browse Source

- chunk storage info compression
- content boundary added to mime & info
- abstracted the hash type (HashKey)
- removed dead code
- mime.go : when boundary split, copy the boundary string over

flashmob 6 năm trước cách đây
mục cha
commit
0266a63ab7

+ 0 - 44
backends/gateway_stram.go

@@ -1,44 +0,0 @@
-package backends
-
-import (
-	"github.com/flashmob/go-guerrilla/log"
-	"io"
-)
-
-type StreamBackendGateway struct {
-	BackendGateway
-
-	config *StreamBackendConfig
-
-	pr *io.PipeReader
-	pw *io.PipeWriter
-}
-
-type StreamBackendConfig struct {
-	StreamSaveProcess string `json:"stream_save_process,omitempty"`
-}
-
-func NewStreamBackend(backendConfig BackendConfig, l log.Logger) (Backend, error) {
-	b, err := New(backendConfig, l)
-	if err != nil {
-		return b, err
-	}
-	if bg, ok := b.(*BackendGateway); ok {
-		sb := new(StreamBackendGateway)
-		sb.BackendGateway = *bg
-		return sb, nil
-	}
-	return b, err
-
-}
-
-func (gw *StreamBackendGateway) loadConfig(backendConfig BackendConfig) (err error) {
-	configType := BaseConfig(&StreamBackendConfig{})
-	bcfg, err := Svc.ExtractConfig(backendConfig, configType)
-	if err != nil {
-		return err
-	}
-	m := bcfg.(*StreamBackendConfig)
-	gw.config = m
-	return nil
-}

+ 16 - 0
backends/gateway_stream.go

@@ -0,0 +1,16 @@
+package backends
+
+type StreamBackendGateway struct {
+	BackendGateway
+	config *StreamBackendConfig
+}
+
+type StreamBackendConfig struct {
+	StreamSaveProcess string `json:"stream_save_process,omitempty"`
+}
+
+/*
+
+{"c":6,"tp":0,"hp":-1,"a":true,"p":[{"i":"1","s":87,"h":["ZnIOFj5TN9mANjBj2+C43Q"],"t":"multipart/mixed; boundary=\"D7F------------D7FD5A0B8AB9C65CCDBFA872\"","c":"","e":"","d":"","cb":"D7F------------D7FD5A0B8AB9C65CCDBFA872"},{"i":"1.1","s":180,"h":["WUPYSAuGmo0X2M0dlBPQPQ","60GpUAQjInSlsshIhg3lbg"],"t":"text/plain; charset=\"us-ascii\"","c":"us-ascii","e":"7bit","d":"","cb":"D7F------------D7FD5A0B8AB9C65CCDBFA872"},{"i":"1.2","s":878,"h":["8A9m4qGsTU4wQB1wAgBEVw","jn8wKuYo7bK2+S2Bd6ySVA"],"t":"message/rfc822","c":"","e":"7bit","d":"inline","cb":"D7F------------D7FD5A0B8AB9C65CCDBFA872"},{"i":"1.2.1","s":87,"h":["nLQXv1n/XZgen8SZmcoYnw"],"t":"multipart/mixed; boundary=\"DC8------------DC8638F443D87A7F0726DEF7\"","c":"","e":"","d":"","cb":"DC8------------DC8638F443D87A7F0726DEF7"},{"i":"1.2.1.1","s":469,"h":["XBczIVrikBfu+DLiyFuClA","0OjJJc8xNb2BlqU+1MaSCA"],"t":"text/plain; charset=\"us-ascii\"","c":"us-ascii","e":"7bit","d":"","cb":"DC8------------DC8638F443D87A7F0726DEF7"},{"i":"1.2.1.2","s":633,"h":["TWNp+1kio1xxZZBFMzZ2GA","/oA/Nr7g2e6AoWdmm52v/g"],"t":"image/gif; name=\"map_of_Argentina.gif\"","c":"","e":"base64","d":"attachment; filename=\"map_of_Argentina.gif\"","cb":"DC8------------DC8638F443D87A7F0726DEF7"}]}
+
+*/

+ 127 - 33
backends/s_chunksaver.go

@@ -27,6 +27,7 @@ import (
 	"compress/zlib"
 	"crypto/md5"
 	"database/sql"
+	"encoding/base64"
 	"encoding/binary"
 	"encoding/json"
 	"errors"
@@ -35,6 +36,7 @@ import (
 	"github.com/flashmob/go-guerrilla/mail/mime"
 	"hash"
 	"io"
+	"io/ioutil"
 	"net"
 	"strings"
 	"time"
@@ -54,22 +56,111 @@ func init() {
 	}
 }
 
+const hashByteSize = 16
+
+type HashKey [hashByteSize]byte
+
+// Pack takes a slice and copies each byte to HashKey internal representation
+func (h *HashKey) Pack(b []byte) {
+	if len(b) < hashByteSize {
+		return
+	}
+	copy(h[:], b[0:hashByteSize])
+}
+
+// String implements the Stringer interface from fmt
+func (h HashKey) String() string {
+	return base64.RawStdEncoding.EncodeToString(h[0:hashByteSize])
+}
+
+// UnmarshalJSON implements the Unmarshaler interface from encoding/json
+func (h *HashKey) UnmarshalJSON(b []byte) error {
+	dbuf := make([]byte, base64.RawStdEncoding.DecodedLen(len(b[1:len(b)-1])))
+	_, err := base64.RawStdEncoding.Decode(dbuf, b[1:len(b)-1])
+	if err != nil {
+		return err
+	}
+	h.Pack(dbuf)
+	return nil
+}
+
+// MarshalJSON implements the Marshaler interface from encoding/json
+// The value is marshaled as a raw base64 to save some bytes
+// eg. instead of typically using hex, de17038001170380011703ff01170380 would be represented as 3hcDgAEXA4ABFwP/ARcDgA
+func (h *HashKey) MarshalJSON() ([]byte, error) {
+	return []byte(`"` + h.String() + `"`), nil
+}
+
+// PartsInfo describes the mime-parts contained in the email
 type PartsInfo struct {
-	Count      uint32        `json:"c"`  // number of parts
-	TextPart   int           `json:"tp"` // id of the main text part to display
-	HTMLPart   int           `json:"hp"` // id of the main html part to display (if any)
-	HasAttach  bool          `json:"a"`
-	Parts      []chunkedPart `json:"p"`
-	Dictionary []byte        `json:"d"` // zlib dictionary
+	Count       uint32        `json:"c"`  // number of parts
+	TextPart    int           `json:"tp"` // index of the main text part to display
+	HTMLPart    int           `json:"hp"` // index of the main html part to display (if any)
+	HasAttach   bool          `json:"a"`
+	Parts       []ChunkedPart `json:"p"`
+	CBoundaries []string      `json:"cbl"` // content boundaries list
+}
+
+// ChunkedPart contains header information about a mime-part, including keys pointing to where the data is stored at
+type ChunkedPart struct {
+	PartId             string    `json:"i"`
+	Size               uint      `json:"s"`
+	ChunkHash          []HashKey `json:"h"` // sequence of hashes the data is stored at
+	ContentType        string    `json:"t"`
+	Charset            string    `json:"c"`
+	TransferEncoding   string    `json:"e"`
+	ContentDisposition string    `json:"d"`
+	ContentBoundary    int       `json:"cb"` // index to the CBoundaries list in PartsInfo
+}
+
+// boundary takes a string and returns the index of the string in the info.CBoundaries slice
+func (info *PartsInfo) boundary(cb string) int {
+	for i := range info.CBoundaries {
+		if info.CBoundaries[i] == cb {
+			return i
+		}
+	}
+	info.CBoundaries = append(info.CBoundaries, cb)
+	return len(info.CBoundaries) - 1
+}
+
+// UnmarshalJSON unmarchals the JSON and decompresses using zlib
+func (info *PartsInfo) UnmarshalJSONZlib(b []byte) error {
+
+	r, err := zlib.NewReader(bytes.NewReader(b[1 : len(b)-1]))
+	if err != nil {
+		return err
+	}
+	all, err := ioutil.ReadAll(r)
+	if err != nil {
+		return err
+	}
+	err = json.Unmarshal(all, info)
+	if err != nil {
+		return err
+	}
+	return nil
 }
 
-type chunkedPart struct {
-	PartId             string     `json:"i"`
-	ChunkHash          [][16]byte `json:"h"` // sequence of hashes the data is stored at
-	ContentType        string     `json:"t"`
-	Charset            string     `json:"c"`
-	TransferEncoding   string     `json:"e"`
-	ContentDisposition string     `json:"d"`
+// MarshalJSONZlib marchals and compresses the bytes using zlib
+func (info *PartsInfo) MarshalJSONZlib() ([]byte, error) {
+
+	buf, err := json.Marshal(info)
+	if err != nil {
+		return buf, err
+	}
+	var compressed bytes.Buffer
+	zlibw, err := zlib.NewWriterLevel(&compressed, 9)
+	if err != nil {
+		return buf, err
+	}
+	if _, err := zlibw.Write(buf); err != nil {
+		return buf, err
+	}
+	if err := zlibw.Close(); err != nil {
+		return buf, err
+	}
+	return []byte(`"` + compressed.String() + `"`), nil
 }
 
 type flushEvent func() error
@@ -159,20 +250,22 @@ func (b *chunkedBytesBufferMime) setDatabase(database ChunkSaverStorage) {
 
 func (b *chunkedBytesBufferMime) onFlush() error {
 	b.md5.Write(b.buf)
-	var chash [16]byte
+	var chash HashKey
 	copy(chash[:], b.md5.Sum([]byte{}))
-
 	if b.current != nil {
 		if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
 			// existing part, just append the hash
 			lastPart := &b.info.Parts[size-1]
 			lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
 			b.fillInfo(lastPart, size-1)
+			lastPart.Size += uint(len(b.buf))
 		} else {
 			// add it as a new part
-			part := chunkedPart{
-				PartId:    b.current.Node,
-				ChunkHash: [][16]byte{chash},
+			part := ChunkedPart{
+				PartId:          b.current.Node,
+				ChunkHash:       []HashKey{chash},
+				ContentBoundary: b.info.boundary(b.current.ContentBoundary),
+				Size:            uint(len(b.buf)),
 			}
 			b.fillInfo(&part, 0)
 			b.info.Parts = append(b.info.Parts, part)
@@ -185,7 +278,7 @@ func (b *chunkedBytesBufferMime) onFlush() error {
 	return nil
 }
 
-func (b *chunkedBytesBufferMime) fillInfo(cp *chunkedPart, index int) {
+func (b *chunkedBytesBufferMime) fillInfo(cp *ChunkedPart, index int) {
 	if cp.ContentType == "" && b.current.ContentType != nil {
 		cp.ContentType = b.current.ContentType.String()
 	}
@@ -217,7 +310,7 @@ func (b *chunkedBytesBufferMime) Reset() {
 
 func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
 	if b.current == nil {
-		b.info = PartsInfo{Parts: make([]chunkedPart, 0, 3), TextPart: -1, HTMLPart: -1}
+		b.info = PartsInfo{Parts: make([]ChunkedPart, 0, 3), TextPart: -1, HTMLPart: -1}
 	}
 	b.current = cp
 
@@ -229,7 +322,7 @@ type ChunkSaverStorage interface {
 	CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error
 	AddChunk(data []byte, hash []byte) error
 	GetEmail(mailID uint64) (*ChunkSaverEmail, error)
-	GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error)
+	GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error)
 	Initialize(cfg BackendConfig) error
 	Shutdown() (err error)
 }
@@ -281,7 +374,7 @@ type chunkSaverMemoryChunk struct {
 }
 
 type chunkSaverMemory struct {
-	chunks        map[[16]byte]*chunkSaverMemoryChunk
+	chunks        map[HashKey]*chunkSaverMemoryChunk
 	emails        []*chunkSaverMemoryEmail
 	nextID        uint64
 	IDOffset      uint64
@@ -316,7 +409,7 @@ func (m *chunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *Pa
 		return errors.New("email not found")
 	} else {
 		email.size = size
-		if info, err := json.Marshal(partsInfo); err != nil {
+		if info, err := partsInfo.MarshalJSONZlib(); err != nil {
 			return err
 		} else {
 			email.partsInfo = info
@@ -331,11 +424,11 @@ func (m *chunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *Pa
 }
 
 func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
-	var key [16]byte
-	if len(hash) != 16 {
+	var key HashKey
+	if len(hash) != hashByteSize {
 		return errors.New("invalid hash")
 	}
-	copy(key[:], hash[0:16])
+	key.Pack(hash)
 	var compressed bytes.Buffer
 	zlibw, err := zlib.NewWriterLevel(&compressed, m.compressLevel)
 	if err != nil {
@@ -367,7 +460,7 @@ func (m *chunkSaverMemory) Initialize(cfg BackendConfig) error {
 	m.IDOffset = 1
 	m.nextID = m.IDOffset
 	m.emails = make([]*chunkSaverMemoryEmail, 0, 100)
-	m.chunks = make(map[[16]byte]*chunkSaverMemoryChunk, 1000)
+	m.chunks = make(map[HashKey]*chunkSaverMemoryChunk, 1000)
 	m.compressLevel = zlib.NoCompression
 	return nil
 }
@@ -384,7 +477,7 @@ func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 	}
 	email := m.emails[mailID-m.IDOffset]
 	pi := &PartsInfo{}
-	if err := json.Unmarshal(email.partsInfo, pi); err != nil {
+	if err := pi.UnmarshalJSONZlib(email.partsInfo); err != nil {
 		return nil, err
 	}
 	return &ChunkSaverEmail{
@@ -405,11 +498,11 @@ func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 	}, nil
 }
 
-func (m *chunkSaverMemory) GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error) {
+func (m *chunkSaverMemory) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
 	result := make([]*ChunkSaverChunk, 0, len(hash))
-	var key [16]byte
+	var key HashKey
 	for i := range hash {
-		copy(key[:], hash[i][:16])
+		key = hash[i]
 		if c, ok := m.chunks[key]; ok {
 			zwr, err := zlib.NewReader(bytes.NewReader(c.data))
 			if err != nil {
@@ -628,7 +721,7 @@ func (c *chunkSaverSQL) Shutdown() (err error) {
 func (c *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 	return &ChunkSaverEmail{}, nil
 }
-func (c *chunkSaverSQL) GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error) {
+func (c *chunkSaverSQL) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
 	result := make([]*ChunkSaverChunk, 0, len(hash))
 	return result, nil
 }
@@ -876,7 +969,6 @@ func Chunksaver() *StreamDecorator {
 					for i := progress; i < len(*parts); i++ {
 						part := (*parts)[i]
 
-						chunkBuffer.currentPart(part)
 						// break chunk on new part
 						if part.StartingPos > 0 && part.StartingPos > msgPos {
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
@@ -885,6 +977,7 @@ func Chunksaver() *StreamDecorator {
 							if err != nil {
 								return count, err
 							}
+							chunkBuffer.currentPart(part)
 							fmt.Println("->N")
 							pos += count
 							msgPos = part.StartingPos
@@ -897,6 +990,7 @@ func Chunksaver() *StreamDecorator {
 							if err != nil {
 								return count, err
 							}
+							chunkBuffer.currentPart(part)
 							fmt.Println("->H")
 							pos += count
 							msgPos = part.StartingPosBody

+ 8 - 0
backends/s_chunksaver_test.go

@@ -142,6 +142,13 @@ U6ZGxseyk8SasGw3J9GRzdTQky1iHNvcPNNI4TLeKdfMvy0vMqLrItvuxfDW8ubjueDtJufz
 
 `
 
+func TestHashBytes(t *testing.T) {
+	var h HashKey
+	h.Pack([]byte{222, 23, 3, 128, 1, 23, 3, 128, 1, 23, 3, 255, 1, 23, 3, 128})
+	if h.String() != "3hcDgAEXA4ABFwP/ARcDgA" {
+		t.Error("expecting 3hcDgAEXA4ABFwP/ARcDgA got", h.String())
+	}
+}
 func TestChunkSaverWrite(t *testing.T) {
 
 	// place the parse result in an envelope
@@ -187,6 +194,7 @@ func TestChunkSaverWrite(t *testing.T) {
 		for _, chunk := range store.chunks {
 			total += len(chunk.data)
 		}
+		// 8A9m4qGsTU4wQB1wAgBEVw==
 		fmt.Println("compressed", total, "saved:", written-int64(total))
 		email, err := store.GetEmail(1)
 		if err != nil {

+ 3 - 3
mail/mime/mime.go

@@ -440,7 +440,6 @@ func (p *Parser) header(mh *Part) (err error) {
 	}()
 
 	for {
-
 		switch state {
 		case 0: // header name
 			if (p.ch >= 33 && p.ch <= 126) && p.ch != ':' {
@@ -476,9 +475,7 @@ func (p *Parser) header(mh *Part) (err error) {
 				p.next()
 				continue
 			}
-
 		case 1: // header value
-
 			if name == contentTypeHeader {
 				var err error
 				contentType, err := p.contentType()
@@ -846,6 +843,7 @@ func (p *Parser) mime(part *Part, cb string) (err error) {
 		part.ContentBoundary != cb { /* content-boundary must be different to previous */
 		var subPart *Part
 		subPart = newPart()
+		subPart.ContentBoundary = part.ContentBoundary
 		for {
 			subPartId := part.Node + dot + strconv.Itoa(count)
 			if end, bErr := p.boundary(part.ContentBoundary); bErr != nil {
@@ -891,10 +889,12 @@ func (p *Parser) mime(part *Part, cb string) (err error) {
 }
 
 func (p *Parser) split(subPart *Part, count int) (*Part, int) {
+	cb := subPart.ContentBoundary
 	subPart = nil
 	count++
 	subPart = newPart()
 	subPart.StartingPos = p.msgPos
+	subPart.ContentBoundary = cb
 	return subPart, count
 }