Browse Source

looks like parts info is pointing to the wrong chunks

flashmob 6 years ago
parent
commit
c247b2991e
2 changed files with 120 additions and 45 deletions
  1. 103 42
      backends/s_chunksaver.go
  2. 17 3
      backends/s_chunksaver_test.go

+ 103 - 42
backends/s_chunksaver.go

@@ -34,6 +34,7 @@ import (
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mail/mime"
 	"hash"
+	"io"
 	"net"
 	"strings"
 	"time"
@@ -124,7 +125,6 @@ func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
 			}
 		}
 	}
-
 }
 
 // capTo caps the internal buffer to specified number of bytes, sets the length back to 0
@@ -137,24 +137,19 @@ func (c *chunkedBytesBuffer) capTo(n int) {
 
 type chunkedBytesBufferMime struct {
 	chunkedBytesBuffer
-	current       *mime.Part
-	info          PartsInfo
-	md5           hash.Hash
-	database      ChunkSaverStorage
-	compressLevel int
+	current  *mime.Part
+	info     PartsInfo
+	md5      hash.Hash
+	database ChunkSaverStorage
 }
 
-func newChunkedBytesBufferMime(compressLevel int) *chunkedBytesBufferMime {
+func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
 	b := new(chunkedBytesBufferMime)
 	b.chunkedBytesBuffer.flushTrigger = func() error {
 		return b.onFlush()
 	}
 	b.md5 = md5.New()
 	b.buf = make([]byte, 0, chunkMaxBytes)
-	if compressLevel > 9 {
-		compressLevel = 9
-	}
-	b.compressLevel = compressLevel
 	return b
 }
 
@@ -166,11 +161,7 @@ func (b *chunkedBytesBufferMime) onFlush() error {
 	b.md5.Write(b.buf)
 	var chash [16]byte
 	copy(chash[:], b.md5.Sum([]byte{}))
-	var compressed bytes.Buffer
-	zlibw, err := zlib.NewWriterLevel(&compressed, b.compressLevel)
-	if err != nil {
-		return err
-	}
+
 	if b.current != nil {
 		if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
 			// existing part, just append the hash
@@ -187,13 +178,7 @@ func (b *chunkedBytesBufferMime) onFlush() error {
 			b.info.Parts = append(b.info.Parts, part)
 			b.info.Count++
 		}
-		if _, err := zlibw.Write(b.buf); err != nil {
-			return err
-		}
-		if err := zlibw.Close(); err != nil {
-			return err
-		}
-		if err := b.database.AddChunk(compressed.Bytes(), chash[:]); err != nil {
+		if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
 			return err
 		}
 	}
@@ -244,7 +229,7 @@ type ChunkSaverStorage interface {
 	CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error
 	AddChunk(data []byte, hash []byte) error
 	GetEmail(mailID uint64) (*ChunkSaverEmail, error)
-	GetChunks(hash ...[]byte) ([]*ChunkSaverChunk, error)
+	GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error)
 	Initialize(cfg BackendConfig) error
 	Shutdown() (err error)
 }
@@ -269,7 +254,7 @@ type ChunkSaverEmail struct {
 type ChunkSaverChunk struct {
 	modifiedAt     time.Time
 	referenceCount uint
-	data           []byte
+	data           io.Reader
 }
 
 type chunkSaverMemoryEmail struct {
@@ -296,10 +281,11 @@ type chunkSaverMemoryChunk struct {
 }
 
 type chunkSaverMemory struct {
-	chunks   map[[16]byte]*chunkSaverMemoryChunk
-	emails   []*chunkSaverMemoryEmail
-	nextID   uint64
-	IDOffset uint64
+	chunks        map[[16]byte]*chunkSaverMemoryChunk
+	emails        []*chunkSaverMemoryEmail
+	nextID        uint64
+	IDOffset      uint64
+	compressLevel int
 }
 
 func (m *chunkSaverMemory) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
@@ -350,18 +336,28 @@ func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
 		return errors.New("invalid hash")
 	}
 	copy(key[:], hash[0:16])
+	var compressed bytes.Buffer
+	zlibw, err := zlib.NewWriterLevel(&compressed, m.compressLevel)
+	if err != nil {
+		return err
+	}
 	if chunk, ok := m.chunks[key]; ok {
 		// only update the counters and update time
 		chunk.referenceCount++
 		chunk.modifiedAt = time.Now()
 	} else {
+		if _, err := zlibw.Write(data); err != nil {
+			return err
+		}
+		if err := zlibw.Close(); err != nil {
+			return err
+		}
 		// add a new chunk
 		newChunk := chunkSaverMemoryChunk{
 			modifiedAt:     time.Now(),
 			referenceCount: 1,
+			data:           compressed.Bytes(),
 		}
-		newChunk.data = make([]byte, len(data))
-		copy(newChunk.data, data)
 		m.chunks[key] = &newChunk
 	}
 	return nil
@@ -372,6 +368,7 @@ func (m *chunkSaverMemory) Initialize(cfg BackendConfig) error {
 	m.nextID = m.IDOffset
 	m.emails = make([]*chunkSaverMemoryEmail, 0, 100)
 	m.chunks = make(map[[16]byte]*chunkSaverMemoryChunk, 1000)
+	m.compressLevel = zlib.NoCompression
 	return nil
 }
 
@@ -408,16 +405,20 @@ func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 	}, nil
 }
 
-func (m *chunkSaverMemory) GetChunks(hash ...[]byte) ([]*ChunkSaverChunk, error) {
+func (m *chunkSaverMemory) GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error) {
 	result := make([]*ChunkSaverChunk, 0, len(hash))
 	var key [16]byte
 	for i := range hash {
 		copy(key[:], hash[i][:16])
 		if c, ok := m.chunks[key]; ok {
+			zwr, err := zlib.NewReader(bytes.NewReader(c.data))
+			if err != nil {
+				return nil, err
+			}
 			result = append(result, &ChunkSaverChunk{
 				modifiedAt:     c.modifiedAt,
 				referenceCount: c.referenceCount,
-				data:           c.data,
+				data:           zwr,
 			})
 		}
 	}
@@ -624,24 +625,81 @@ func (c *chunkSaverSQL) Shutdown() (err error) {
 	return err
 }
 
-func (m *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
+func (c *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 	return &ChunkSaverEmail{}, nil
 }
-func (m *chunkSaverSQL) GetChunks(hash ...[]byte) ([]*ChunkSaverChunk, error) {
+func (c *chunkSaverSQL) GetChunks(hash ...[16]byte) ([]*ChunkSaverChunk, error) {
 	result := make([]*ChunkSaverChunk, 0, len(hash))
 	return result, nil
 }
 
 type chunkMailReader struct {
-	db ChunkSaverStorage
+	db    ChunkSaverStorage
+	email *ChunkSaverEmail
+	part  int
+	i, j  int
+}
+
+// NewChunkMailReader loads the email and selects which mime-part Read will return using the part argument
+// if part is -1, Read will read in the entire message
+func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int) (*chunkMailReader, error) {
+	r := new(chunkMailReader)
+	r.db = db
+	r.part = part
+	if email == nil {
+		return nil, errors.New("nil email")
+	} else {
+		r.email = email
+	}
+	if err := r.SeekPart(part); err != nil {
+		return nil, err
+	}
+
+	return r, nil
 }
 
-func (r *chunkMailReader) Info(mailID uint64) (*PartsInfo, error) {
-	return &PartsInfo{}, nil
+func (r *chunkMailReader) SeekPart(part int) error {
+	if parts := len(r.email.partsInfo.Parts); parts == 0 {
+		return errors.New("email has mime parts missing")
+	} else if part > parts {
+		return errors.New("no such part available")
+	}
+	r.i = part
+	r.j = 0
+	return nil
 }
 
-func (r *chunkMailReader) Read(p []byte) (int, error) {
-	return 1, nil
+func (r *chunkMailReader) Read(p []byte) (n int, err error) {
+	var chunks []*ChunkSaverChunk
+	if r.part < 90 {
+		for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
+			chunks, err = r.db.GetChunks(r.email.partsInfo.Parts[r.i].ChunkHash...)
+			if err != nil {
+				return
+			}
+			var nRead int
+			for r.j < len(chunks) {
+				nRead, err = chunks[r.j].data.Read(p)
+				if err == io.EOF {
+					r.j++ // advance to the next chunk
+					err = nil
+				}
+				if r.j == len(chunks) { // last chunk in a part?
+					r.j = 0 // reset chunk index
+					r.i++   // advance to the next part
+					if r.i == len(r.email.partsInfo.Parts) || r.part > 0 {
+						// there are no more parts to return
+						err = io.EOF
+					}
+				}
+				// unless there's an error, the next time this function will be
+				// called, it will read the next chunk
+				return nRead, err
+			}
+		}
+		err = io.EOF
+	}
+	return n, err
 }
 
 const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
@@ -692,12 +750,13 @@ func Chunksaver() *StreamDecorator {
 				}
 				config = bcfg.(*chunkSaverConfig)
 				if chunkBuffer == nil {
-					chunkBuffer = newChunkedBytesBufferMime(config.CompressLevel)
+					chunkBuffer = newChunkedBytesBufferMime()
 				}
 				// configure storage if none was injected
 				if database == nil {
 					if config.StorageEngine == "memory" {
 						db := new(chunkSaverMemory)
+						db.compressLevel = config.CompressLevel
 						database = db
 					} else {
 						db := new(chunkSaverSQL)
@@ -808,7 +867,9 @@ func Chunksaver() *StreamDecorator {
 						progress int
 					)
 					if len(*parts) > 2 {
-						progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already out
+						// todo: progress is a bit buggy
+						// todo: do not flush empty buffer
+						//progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already out
 					}
 					subject, to, from = fillVars(parts, subject, to, from)
 					offset := msgPos

+ 17 - 3
backends/s_chunksaver_test.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"github.com/flashmob/go-guerrilla/mail"
 	"io"
+	"os"
 	"testing"
 )
 
@@ -147,9 +148,10 @@ func TestChunkSaverWrite(t *testing.T) {
 	e := mail.NewEnvelope("127.0.0.1", 1)
 	to, _ := mail.NewAddress("[email protected]")
 	e.RcptTo = append(e.RcptTo, to)
+	e.MailFrom, _ = mail.NewAddress("[email protected]")
 
 	store := new(chunkSaverMemory)
-	chunkBuffer := newChunkedBytesBufferMime(9)
+	chunkBuffer := newChunkedBytesBufferMime()
 	//chunkBuffer.setDatabase(store)
 	// instantiate the chunk saver
 	chunksaver := streamers["chunksaver"]()
@@ -165,9 +167,9 @@ func TestChunkSaverWrite(t *testing.T) {
 
 	// configure the buffer cap
 	bc := BackendConfig{}
-	bc["chunksaver_chunk_size"] = 1024
+	bc["chunksaver_chunk_size"] = 8000
 	bc["chunksaver_storage_engine"] = "memory"
-	bc["chunksaver_compress_level"] = 9
+	bc["chunksaver_compress_level"] = 0
 	_ = Svc.initialize(bc)
 
 	// give it the envelope with the parse results
@@ -186,5 +188,17 @@ func TestChunkSaverWrite(t *testing.T) {
 			total += len(chunk.data)
 		}
 		fmt.Println("compressed", total, "saved:", written-int64(total))
+		email, err := store.GetEmail(1)
+		if err != nil {
+			t.Error("email not found")
+			return
+		}
+		r, err := NewChunkMailReader(store, email, 1)
+		for i := 0; i < len(email.partsInfo.Parts); i++ {
+			fmt.Println("seeking to", i+1)
+			r.SeekPart(i)
+			io.Copy(os.Stdout, r)
+		}
+
 	}
 }