flashmob 6 years ago
parent
commit
6424c21449
2 changed files with 104 additions and 28 deletions
  1. 65 26
      backends/s_chunksaver.go
  2. 39 2
      backends/s_chunksaver_test.go

+ 65 - 26
backends/s_chunksaver.go

@@ -50,7 +50,7 @@ func init() {
 	}
 	}
 }
 }
 
 
-type partsInfo struct {
+type PartsInfo struct {
 	Count     uint32        `json:"c"`  // number of parts
 	Count     uint32        `json:"c"`  // number of parts
 	TextPart  int           `json:"tp"` // id of the main text part to display
 	TextPart  int           `json:"tp"` // id of the main text part to display
 	HTMLPart  int           `json:"hp"` // id of the main html part to display (if any)
 	HTMLPart  int           `json:"hp"` // id of the main html part to display (if any)
@@ -74,7 +74,7 @@ type chunkedBytesBuffer struct {
 	flushTrigger flushEvent
 	flushTrigger flushEvent
 }
 }
 
 
-// flush signals that it's time to write the buffer out to disk
+// flush signals that it's time to write the buffer out to storage
 func (c *chunkedBytesBuffer) flush() error {
 func (c *chunkedBytesBuffer) flush() error {
 	if len(c.buf) == 0 {
 	if len(c.buf) == 0 {
 		return nil
 		return nil
@@ -134,18 +134,18 @@ func (c *chunkedBytesBuffer) capTo(n int) {
 type chunkedBytesBufferMime struct {
 type chunkedBytesBufferMime struct {
 	chunkedBytesBuffer
 	chunkedBytesBuffer
 	current  *mime.Part
 	current  *mime.Part
-	info     partsInfo
+	info     PartsInfo
 	md5      hash.Hash
 	md5      hash.Hash
 	database ChunkSaverStorage
 	database ChunkSaverStorage
 }
 }
 
 
 func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
 func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
 	b := new(chunkedBytesBufferMime)
 	b := new(chunkedBytesBufferMime)
-
 	b.chunkedBytesBuffer.flushTrigger = func() error {
 	b.chunkedBytesBuffer.flushTrigger = func() error {
 		return b.onFlush()
 		return b.onFlush()
 	}
 	}
 	b.md5 = md5.New()
 	b.md5 = md5.New()
+	b.buf = make([]byte, 0, chunkMaxBytes)
 	return b
 	return b
 }
 }
 
 
@@ -212,7 +212,7 @@ func (b *chunkedBytesBufferMime) Reset() {
 
 
 func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
 func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
 	if b.current == nil {
 	if b.current == nil {
-		b.info = partsInfo{Parts: make([]chunkedPart, 0, 3), TextPart: -1, HTMLPart: -1}
+		b.info = PartsInfo{Parts: make([]chunkedPart, 0, 3), TextPart: -1, HTMLPart: -1}
 	}
 	}
 	b.current = cp
 	b.current = cp
 
 
@@ -221,10 +221,10 @@ func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
 // ChunkSaverStorage defines an interface to the storage layer (the database)
 // ChunkSaverStorage defines an interface to the storage layer (the database)
 type ChunkSaverStorage interface {
 type ChunkSaverStorage interface {
 	OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error)
 	OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error)
-	CloseMessage(mailID uint64, size uint, partsInfo *partsInfo, subject string, deliveryID string, to string, from string) error
+	CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error
 	AddChunk(data []byte, hash []byte) error
 	AddChunk(data []byte, hash []byte) error
 	GetEmail(mailID uint64) (*ChunkSaverEmail, error)
 	GetEmail(mailID uint64) (*ChunkSaverEmail, error)
-	GetChunk(hash []byte) (*ChunkSaverChunk, error)
+	GetChunks(hash ...[]byte) ([]*ChunkSaverChunk, error)
 	Initialize(cfg BackendConfig) error
 	Initialize(cfg BackendConfig) error
 	Shutdown() (err error)
 	Shutdown() (err error)
 }
 }
@@ -232,10 +232,10 @@ type ChunkSaverStorage interface {
 type ChunkSaverEmail struct {
 type ChunkSaverEmail struct {
 	mailID     uint64
 	mailID     uint64
 	createdAt  time.Time
 	createdAt  time.Time
-	size       uint
+	size       int64
 	from       string
 	from       string
 	to         string
 	to         string
-	partsInfo  []byte
+	partsInfo  PartsInfo
 	helo       string
 	helo       string
 	subject    string
 	subject    string
 	deliveryID string
 	deliveryID string
@@ -255,7 +255,7 @@ type ChunkSaverChunk struct {
 type chunkSaverMemoryEmail struct {
 type chunkSaverMemoryEmail struct {
 	mailID     uint64
 	mailID     uint64
 	createdAt  time.Time
 	createdAt  time.Time
-	size       uint
+	size       int64
 	from       string
 	from       string
 	to         string
 	to         string
 	partsInfo  []byte
 	partsInfo  []byte
@@ -305,7 +305,7 @@ func (m *chunkSaverMemory) OpenMessage(from string, helo string, recipient strin
 	return email.mailID, nil
 	return email.mailID, nil
 }
 }
 
 
-func (m *chunkSaverMemory) CloseMessage(mailID uint64, size uint, partsInfo *partsInfo, subject string, deliveryID string, to string, from string) error {
+func (m *chunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
 	if email := m.emails[mailID-m.IDOffset]; email == nil {
 	if email := m.emails[mailID-m.IDOffset]; email == nil {
 		return errors.New("email not found")
 		return errors.New("email not found")
 	} else {
 	} else {
@@ -329,7 +329,7 @@ func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
 	if len(hash) != 16 {
 	if len(hash) != 16 {
 		return errors.New("invalid hash")
 		return errors.New("invalid hash")
 	}
 	}
-	copy(key[:], hash[0:15])
+	copy(key[:], hash[0:16])
 	if chunk, ok := m.chunks[key]; ok {
 	if chunk, ok := m.chunks[key]; ok {
 		// only update the counters and update time
 		// only update the counters and update time
 		chunk.referenceCount++
 		chunk.referenceCount++
@@ -339,8 +339,10 @@ func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
 		newChunk := chunkSaverMemoryChunk{
 		newChunk := chunkSaverMemoryChunk{
 			modifiedAt:     time.Now(),
 			modifiedAt:     time.Now(),
 			referenceCount: 1,
 			referenceCount: 1,
-			data:           data,
+			//	data:           data,
 		}
 		}
+		newChunk.data = make([]byte, len(data))
+		copy(newChunk.data, data)
 		m.chunks[key] = &newChunk
 		m.chunks[key] = &newChunk
 	}
 	}
 	return nil
 	return nil
@@ -361,10 +363,46 @@ func (m *chunkSaverMemory) Shutdown() (err error) {
 }
 }
 
 
 func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
-	return &ChunkSaverEmail{}, nil
-}
-func (m *chunkSaverMemory) GetChunk(hash []byte) (*ChunkSaverChunk, error) {
-	return &ChunkSaverChunk{}, nil
+	if size := uint64(len(m.emails)) - m.IDOffset; size > mailID-m.IDOffset {
+		return nil, errors.New("mail not found")
+	}
+	email := m.emails[mailID-m.IDOffset]
+	pi := &PartsInfo{}
+	if err := json.Unmarshal(email.partsInfo, pi); err != nil {
+		return nil, err
+	}
+	return &ChunkSaverEmail{
+		mailID:     email.mailID,
+		createdAt:  email.createdAt,
+		size:       email.size,
+		from:       email.from,
+		to:         email.to,
+		partsInfo:  *pi,
+		helo:       email.helo,
+		subject:    email.subject,
+		deliveryID: email.deliveryID,
+		recipient:  email.recipient,
+		ipv4:       email.ipv4,
+		ipv6:       email.ipv6,
+		returnPath: email.returnPath,
+		isTLS:      email.isTLS,
+	}, nil
+}
+
+func (m *chunkSaverMemory) GetChunks(hash ...[]byte) ([]*ChunkSaverChunk, error) {
+	result := make([]*ChunkSaverChunk, 0, len(hash))
+	var key [16]byte
+	for i := range hash {
+		copy(key[:], hash[i][:16])
+		if c, ok := m.chunks[key]; ok {
+			result = append(result, &ChunkSaverChunk{
+				modifiedAt:     c.modifiedAt,
+				referenceCount: c.referenceCount,
+				data:           c.data,
+			})
+		}
+	}
+	return result, nil
 }
 }
 
 
 type chunkSaverSQLConfig struct {
 type chunkSaverSQLConfig struct {
@@ -520,7 +558,7 @@ func (c *chunkSaverSQL) AddChunk(data []byte, hash []byte) error {
 	return nil
 	return nil
 }
 }
 
 
-func (c *chunkSaverSQL) CloseMessage(mailID uint64, size uint, partsInfo *partsInfo, subject string, deliveryID string, to string, from string) error {
+func (c *chunkSaverSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
 	partsInfoJson, err := json.Marshal(partsInfo)
 	partsInfoJson, err := json.Marshal(partsInfo)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -570,16 +608,17 @@ func (c *chunkSaverSQL) Shutdown() (err error) {
 func (m *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 func (m *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 	return &ChunkSaverEmail{}, nil
 	return &ChunkSaverEmail{}, nil
 }
 }
-func (m *chunkSaverSQL) GetChunk(hash []byte) (*ChunkSaverChunk, error) {
-	return &ChunkSaverChunk{}, nil
+func (m *chunkSaverSQL) GetChunks(hash ...[]byte) ([]*ChunkSaverChunk, error) {
+	result := make([]*ChunkSaverChunk, 0, len(hash))
+	return result, nil
 }
 }
 
 
 type chunkMailReader struct {
 type chunkMailReader struct {
 	db ChunkSaverStorage
 	db ChunkSaverStorage
 }
 }
 
 
-func (r *chunkMailReader) Info(mailID uint64) (*partsInfo, error) {
-	return &partsInfo{}, nil
+func (r *chunkMailReader) Info(mailID uint64) (*PartsInfo, error) {
+	return &PartsInfo{}, nil
 }
 }
 
 
 func (r *chunkMailReader) Read(p []byte) (int, error) {
 func (r *chunkMailReader) Read(p []byte) (int, error) {
@@ -607,7 +646,7 @@ func Chunksaver() *StreamDecorator {
 				chunkBuffer *chunkedBytesBufferMime
 				chunkBuffer *chunkedBytesBufferMime
 				msgPos      uint
 				msgPos      uint
 				database    ChunkSaverStorage
 				database    ChunkSaverStorage
-				written     uint
+				written     int64
 
 
 				// just some headers from the first mime-part
 				// just some headers from the first mime-part
 				subject string
 				subject string
@@ -747,7 +786,7 @@ func Chunksaver() *StreamDecorator {
 						// break chunk on new part
 						// break chunk on new part
 						if part.StartingPos > 0 && part.StartingPos > msgPos {
 						if part.StartingPos > 0 && part.StartingPos > msgPos {
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
-							written += uint(count)
+							written += int64(count)
 							err = chunkBuffer.flush()
 							err = chunkBuffer.flush()
 							if err != nil {
 							if err != nil {
 								return count, err
 								return count, err
@@ -759,7 +798,7 @@ func Chunksaver() *StreamDecorator {
 						// break chunk on header
 						// break chunk on header
 						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
 						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
-							written += uint(count)
+							written += int64(count)
 							err = chunkBuffer.flush()
 							err = chunkBuffer.flush()
 							if err != nil {
 							if err != nil {
 								return count, err
 								return count, err
@@ -771,7 +810,7 @@ func Chunksaver() *StreamDecorator {
 						// if on the latest (last) part, and yet there is still data to be written out
 						// if on the latest (last) part, and yet there is still data to be written out
 						if len(*parts)-1 == i && len(p)-1 > pos {
 						if len(*parts)-1 == i && len(p)-1 > pos {
 							count, _ = chunkBuffer.Write(p[pos:])
 							count, _ = chunkBuffer.Write(p[pos:])
-							written += uint(count)
+							written += int64(count)
 							pos += count
 							pos += count
 							msgPos += uint(count)
 							msgPos += uint(count)
 						}
 						}

+ 39 - 2
backends/s_chunksaver_test.go

@@ -1,8 +1,12 @@
 package backends
 package backends
 
 
 import (
 import (
+	"bytes"
+	"fmt"
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mail/mime"
 	"github.com/flashmob/go-guerrilla/mail/mime"
+	"io"
+	"net"
 	"testing"
 	"testing"
 )
 )
 
 
@@ -187,10 +191,9 @@ func writeIt(parser *mime.Parser, t *testing.T, stream StreamProcessor, size int
 			t.Error(err)
 			t.Error(err)
 			t.Fail()
 			t.Fail()
 		}
 		}
-		// todo: close parser on last chunk! (and finalize save)
 		cut := msgPos + size
 		cut := msgPos + size
 		if cut > len(email) {
 		if cut > len(email) {
-			// the last chunk make be shorter than size
+			// the last chunk may be shorter than size
 			cut -= cut - len(email)
 			cut -= cut - len(email)
 		}
 		}
 		i, _ := stream.Write([]byte(email)[msgPos:cut])
 		i, _ := stream.Write([]byte(email)[msgPos:cut])
@@ -200,3 +203,37 @@ func writeIt(parser *mime.Parser, t *testing.T, stream StreamProcessor, size int
 		t.Error("short write, total is", total, "but len(email) is", len(email))
 		t.Error("short write, total is", total, "but len(email) is", len(email))
 	}
 	}
 }
 }
+
+func TestMemDB(t *testing.T) {
+
+	m := new(chunkSaverMemory)
+
+	from := "[email protected]"
+	helo := "home-host"
+	recipient := "[email protected]"
+	ipAddress := net.IPAddr{IP: net.ParseIP("127.0.0.1")}
+	returnPath := "[email protected]"
+	isTLS := false
+	_ = m.Initialize(nil)
+	mailID, err := m.OpenMessage(from, helo, recipient, ipAddress, returnPath, isTLS)
+	if err != nil {
+		t.Error(err)
+	}
+	buff := newChunkedBytesBufferMime()
+	buff.capTo(64)
+	buff.setDatabase(m)
+	written, err := io.Copy(buff, bytes.NewBuffer([]byte(email)))
+	if err != nil {
+		t.Error(err, "written:", written)
+	} else {
+		err = m.CloseMessage(mailID, written, &PartsInfo{}, "a subject", "1234abc", "[email protected]", "[email protected]")
+		if err != nil {
+			t.Error(err, "close message:", written)
+		}
+		email, _ := m.GetEmail(mailID)
+		fmt.Println(email)
+		//_ = m.GetChunks()
+		_ = m.Shutdown()
+	}
+
+}