Quellcode durchsuchen

- commenting
- fix a bug where the header of the first part wasn't included
- small refactoring

flashmob vor 6 Jahren
Ursprung
Commit
7875164c37
2 geänderte Dateien mit 105 neuen und 69 gelöschten Zeilen
  1. 96 66
      backends/s_chunksaver.go
  2. 9 3
      backends/s_chunksaver_test.go

+ 96 - 66
backends/s_chunksaver.go

@@ -124,7 +124,7 @@ func (info *PartsInfo) boundary(cb string) int {
 	return len(info.CBoundaries) - 1
 	return len(info.CBoundaries) - 1
 }
 }
 
 
-// UnmarshalJSON unmarchals the JSON and decompresses using zlib
+// UnmarshalJSON unmarshals the JSON and decompresses using zlib
 func (info *PartsInfo) UnmarshalJSONZlib(b []byte) error {
 func (info *PartsInfo) UnmarshalJSONZlib(b []byte) error {
 
 
 	r, err := zlib.NewReader(bytes.NewReader(b[1 : len(b)-1]))
 	r, err := zlib.NewReader(bytes.NewReader(b[1 : len(b)-1]))
@@ -142,7 +142,7 @@ func (info *PartsInfo) UnmarshalJSONZlib(b []byte) error {
 	return nil
 	return nil
 }
 }
 
 
-// MarshalJSONZlib marchals and compresses the bytes using zlib
+// MarshalJSONZlib marshals and compresses the bytes using zlib
 func (info *PartsInfo) MarshalJSONZlib() ([]byte, error) {
 func (info *PartsInfo) MarshalJSONZlib() ([]byte, error) {
 
 
 	buf, err := json.Marshal(info)
 	buf, err := json.Marshal(info)
@@ -226,6 +226,7 @@ func (c *chunkedBytesBuffer) capTo(n int) {
 	c.buf = make([]byte, 0, n)
 	c.buf = make([]byte, 0, n)
 }
 }
 
 
+// chunkedBytesBufferMime decorates chunkedBytesBuffer, specifying that to do when a flush event is triggered
 type chunkedBytesBufferMime struct {
 type chunkedBytesBufferMime struct {
 	chunkedBytesBuffer
 	chunkedBytesBuffer
 	current  *mime.Part
 	current  *mime.Part
@@ -248,32 +249,36 @@ func (b *chunkedBytesBufferMime) setDatabase(database ChunkSaverStorage) {
 	b.database = database
 	b.database = database
 }
 }
 
 
+// onFlush is called whenever the flush event fires.
+// - It saves the chunk to disk and adds the chunk's hash to the list.
+// - It builds the b.info.Parts structure
 func (b *chunkedBytesBufferMime) onFlush() error {
 func (b *chunkedBytesBufferMime) onFlush() error {
 	b.md5.Write(b.buf)
 	b.md5.Write(b.buf)
 	var chash HashKey
 	var chash HashKey
 	copy(chash[:], b.md5.Sum([]byte{}))
 	copy(chash[:], b.md5.Sum([]byte{}))
-	if b.current != nil {
-		if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
-			// existing part, just append the hash
-			lastPart := &b.info.Parts[size-1]
-			lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
-			b.fillInfo(lastPart, size-1)
-			lastPart.Size += uint(len(b.buf))
-		} else {
-			// add it as a new part
-			part := ChunkedPart{
-				PartId:          b.current.Node,
-				ChunkHash:       []HashKey{chash},
-				ContentBoundary: b.info.boundary(b.current.ContentBoundary),
-				Size:            uint(len(b.buf)),
-			}
-			b.fillInfo(&part, 0)
-			b.info.Parts = append(b.info.Parts, part)
-			b.info.Count++
-		}
-		if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
-			return err
+	if b.current == nil {
+		return errors.New("b.current part is nil")
+	}
+	if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
+		// existing part, just append the hash
+		lastPart := &b.info.Parts[size-1]
+		lastPart.ChunkHash = append(lastPart.ChunkHash, chash)
+		b.fillInfo(lastPart, size-1)
+		lastPart.Size += uint(len(b.buf))
+	} else {
+		// add it as a new part
+		part := ChunkedPart{
+			PartId:          b.current.Node,
+			ChunkHash:       []HashKey{chash},
+			ContentBoundary: b.info.boundary(b.current.ContentBoundary),
+			Size:            uint(len(b.buf)),
 		}
 		}
+		b.fillInfo(&part, 0)
+		b.info.Parts = append(b.info.Parts, part)
+		b.info.Count++
+	}
+	if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
+		return err
 	}
 	}
 	return nil
 	return nil
 }
 }
@@ -303,6 +308,7 @@ func (b *chunkedBytesBufferMime) fillInfo(cp *ChunkedPart, index int) {
 	}
 	}
 }
 }
 
 
+// Reset decorates the Reset method of the chunkedBytesBuffer
 func (b *chunkedBytesBufferMime) Reset() {
 func (b *chunkedBytesBufferMime) Reset() {
 	b.md5.Reset()
 	b.md5.Reset()
 	b.chunkedBytesBuffer.Reset()
 	b.chunkedBytesBuffer.Reset()
@@ -313,40 +319,47 @@ func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
 		b.info = PartsInfo{Parts: make([]ChunkedPart, 0, 3), TextPart: -1, HTMLPart: -1}
 		b.info = PartsInfo{Parts: make([]ChunkedPart, 0, 3), TextPart: -1, HTMLPart: -1}
 	}
 	}
 	b.current = cp
 	b.current = cp
-
 }
 }
 
 
 // ChunkSaverStorage defines an interface to the storage layer (the database)
 // ChunkSaverStorage defines an interface to the storage layer (the database)
 type ChunkSaverStorage interface {
 type ChunkSaverStorage interface {
+	// OpenMessage is used to begin saving an email. An email id is returned and used to call CloseMessage later
 	OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error)
 	OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error)
+	// CloseMessage finalizes the writing of an email. Additional data collected while parsing the email is saved
 	CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error
 	CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error
+	// AddChunk saves a chunk of bytes to a given hash key
 	AddChunk(data []byte, hash []byte) error
 	AddChunk(data []byte, hash []byte) error
+	// GetEmail returns an email that's been saved
 	GetEmail(mailID uint64) (*ChunkSaverEmail, error)
 	GetEmail(mailID uint64) (*ChunkSaverEmail, error)
+	// GetChunks loads in the specified chunks of bytes from storage
 	GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error)
 	GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error)
+	// Initialize is called when the backend is started
 	Initialize(cfg BackendConfig) error
 	Initialize(cfg BackendConfig) error
+	// Shutdown is called when the backend gets shutdown.
 	Shutdown() (err error)
 	Shutdown() (err error)
 }
 }
 
 
+// ChunkSaverEmail represents an email
 type ChunkSaverEmail struct {
 type ChunkSaverEmail struct {
 	mailID     uint64
 	mailID     uint64
 	createdAt  time.Time
 	createdAt  time.Time
 	size       int64
 	size       int64
-	from       string
-	to         string
+	from       string // from stores the email address found in the "From" header field
+	to         string // to stores the email address found in the "From" header field
 	partsInfo  PartsInfo
 	partsInfo  PartsInfo
-	helo       string
-	subject    string
+	helo       string // helo message given by the client when the message was transmitted
+	subject    string // subject stores the value from the first "Subject" header field
 	deliveryID string
 	deliveryID string
-	recipient  string
-	ipv4       net.IPAddr
-	ipv6       net.IPAddr
-	returnPath string
-	isTLS      bool
+	recipient  string     // recipient is the email address that the server received from the RCPT TO command
+	ipv4       net.IPAddr // set to a value if client connected via ipv4
+	ipv6       net.IPAddr // set to a value if client connected via ipv6
+	returnPath string     // returnPath is the email address that the server received from the MAIL FROM command
+	isTLS      bool       // isTLS is true when TLS was used to connect
 }
 }
 
 
 type ChunkSaverChunk struct {
 type ChunkSaverChunk struct {
 	modifiedAt     time.Time
 	modifiedAt     time.Time
-	referenceCount uint
+	referenceCount uint // referenceCount counts how many emails reference this chunk
 	data           io.Reader
 	data           io.Reader
 }
 }
 
 
@@ -381,6 +394,7 @@ type chunkSaverMemory struct {
 	compressLevel int
 	compressLevel int
 }
 }
 
 
+// OpenMessage implements the ChunkSaverStorage interface
 func (m *chunkSaverMemory) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
 func (m *chunkSaverMemory) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
 	var ip4, ip6 net.IPAddr
 	var ip4, ip6 net.IPAddr
 	if ip := ipAddress.IP.To4(); ip != nil {
 	if ip := ipAddress.IP.To4(); ip != nil {
@@ -404,6 +418,7 @@ func (m *chunkSaverMemory) OpenMessage(from string, helo string, recipient strin
 	return email.mailID, nil
 	return email.mailID, nil
 }
 }
 
 
+// CloseMessage implements the ChunkSaverStorage interface
 func (m *chunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
 func (m *chunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
 	if email := m.emails[mailID-m.IDOffset]; email == nil {
 	if email := m.emails[mailID-m.IDOffset]; email == nil {
 		return errors.New("email not found")
 		return errors.New("email not found")
@@ -423,6 +438,7 @@ func (m *chunkSaverMemory) CloseMessage(mailID uint64, size int64, partsInfo *Pa
 	return nil
 	return nil
 }
 }
 
 
+// AddChunk implements the ChunkSaverStorage interface
 func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
 func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
 	var key HashKey
 	var key HashKey
 	if len(hash) != hashByteSize {
 	if len(hash) != hashByteSize {
@@ -456,6 +472,7 @@ func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
 	return nil
 	return nil
 }
 }
 
 
+// Initialize implements the ChunkSaverStorage interface
 func (m *chunkSaverMemory) Initialize(cfg BackendConfig) error {
 func (m *chunkSaverMemory) Initialize(cfg BackendConfig) error {
 	m.IDOffset = 1
 	m.IDOffset = 1
 	m.nextID = m.IDOffset
 	m.nextID = m.IDOffset
@@ -465,12 +482,14 @@ func (m *chunkSaverMemory) Initialize(cfg BackendConfig) error {
 	return nil
 	return nil
 }
 }
 
 
+// Shutdown implements the ChunkSaverStorage interface
 func (m *chunkSaverMemory) Shutdown() (err error) {
 func (m *chunkSaverMemory) Shutdown() (err error) {
 	m.emails = nil
 	m.emails = nil
 	m.chunks = nil
 	m.chunks = nil
 	return nil
 	return nil
 }
 }
 
 
+// GetEmail implements the ChunkSaverStorage interface
 func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 	if size := uint64(len(m.emails)) - m.IDOffset; size > mailID-m.IDOffset {
 	if size := uint64(len(m.emails)) - m.IDOffset; size > mailID-m.IDOffset {
 		return nil, errors.New("mail not found")
 		return nil, errors.New("mail not found")
@@ -498,6 +517,7 @@ func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 	}, nil
 	}, nil
 }
 }
 
 
+// GetChunk implements the ChunkSaverStorage interface
 func (m *chunkSaverMemory) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
 func (m *chunkSaverMemory) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
 	result := make([]*ChunkSaverChunk, 0, len(hash))
 	result := make([]*ChunkSaverChunk, 0, len(hash))
 	var key HashKey
 	var key HashKey
@@ -623,13 +643,14 @@ func (c *chunkSaverSQL) prepareSql() error {
 		c.statements["selectChunk"] = stmt
 		c.statements["selectChunk"] = stmt
 	}
 	}
 
 
-	// sweep old chunks
+	// TODO sweep old chunks
 
 
-	// sweep incomplete emails
+	// TODO sweep incomplete emails
 
 
 	return nil
 	return nil
 }
 }
 
 
+// OpenMessage implements the ChunkSaverStorage interface
 func (c *chunkSaverSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
 func (c *chunkSaverSQL) OpenMessage(from string, helo string, recipient string, ipAddress net.IPAddr, returnPath string, isTLS bool) (mailID uint64, err error) {
 
 
 	// if it's ipv4 then we want ipv6 to be 0, and vice-versa
 	// if it's ipv4 then we want ipv6 to be 0, and vice-versa
@@ -651,6 +672,7 @@ func (c *chunkSaverSQL) OpenMessage(from string, helo string, recipient string,
 	return uint64(id), err
 	return uint64(id), err
 }
 }
 
 
+// AddChunk implements the ChunkSaverStorage interface
 func (c *chunkSaverSQL) AddChunk(data []byte, hash []byte) error {
 func (c *chunkSaverSQL) AddChunk(data []byte, hash []byte) error {
 	// attempt to increment the reference_count (it means the chunk is already in there)
 	// attempt to increment the reference_count (it means the chunk is already in there)
 	r, err := c.statements["chunkReferenceIncr"].Exec(hash)
 	r, err := c.statements["chunkReferenceIncr"].Exec(hash)
@@ -671,6 +693,7 @@ func (c *chunkSaverSQL) AddChunk(data []byte, hash []byte) error {
 	return nil
 	return nil
 }
 }
 
 
+// CloseMessage implements the ChunkSaverStorage interface
 func (c *chunkSaverSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
 func (c *chunkSaverSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo, subject string, deliveryID string, to string, from string) error {
 	partsInfoJson, err := json.Marshal(partsInfo)
 	partsInfoJson, err := json.Marshal(partsInfo)
 	if err != nil {
 	if err != nil {
@@ -702,6 +725,7 @@ func (c *chunkSaverSQL) Initialize(cfg BackendConfig) error {
 	return nil
 	return nil
 }
 }
 
 
+// Shutdown implements the ChunkSaverStorage interface
 func (c *chunkSaverSQL) Shutdown() (err error) {
 func (c *chunkSaverSQL) Shutdown() (err error) {
 	defer func() {
 	defer func() {
 		closeErr := c.db.Close()
 		closeErr := c.db.Close()
@@ -718,9 +742,12 @@ func (c *chunkSaverSQL) Shutdown() (err error) {
 	return err
 	return err
 }
 }
 
 
+// GetEmail implements the ChunkSaverStorage interface
 func (c *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 func (c *chunkSaverSQL) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 	return &ChunkSaverEmail{}, nil
 	return &ChunkSaverEmail{}, nil
 }
 }
+
+// GetChunk implements the ChunkSaverStorage interface
 func (c *chunkSaverSQL) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
 func (c *chunkSaverSQL) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
 	result := make([]*ChunkSaverChunk, 0, len(hash))
 	result := make([]*ChunkSaverChunk, 0, len(hash))
 	return result, nil
 	return result, nil
@@ -729,12 +756,13 @@ func (c *chunkSaverSQL) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error) {
 type chunkMailReader struct {
 type chunkMailReader struct {
 	db    ChunkSaverStorage
 	db    ChunkSaverStorage
 	email *ChunkSaverEmail
 	email *ChunkSaverEmail
-	part  int
-	i, j  int
+	// part requests a part. If 0, all the parts are read sequentially
+	part int
+	i, j int
 }
 }
 
 
-// NewChunkMailReader loads the email and selects which mime-part Read will return using the part argument
-// if part is -1, Read will read in the entire message
+// NewChunkMailReader loads the email and selects which mime-part Read will read, starting from 1
+// if part is 0, Read will read in the entire message. 1 selects the first part, 2 2nd, and so on..
 func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int) (*chunkMailReader, error) {
 func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int) (*chunkMailReader, error) {
 	r := new(chunkMailReader)
 	r := new(chunkMailReader)
 	r.db = db
 	r.db = db
@@ -751,6 +779,8 @@ func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int)
 	return r, nil
 	return r, nil
 }
 }
 
 
+// SeekPart resets the reader. The part argument chooses which part Read will read in
+// If part is 0, Read will return the entire message
 func (r *chunkMailReader) SeekPart(part int) error {
 func (r *chunkMailReader) SeekPart(part int) error {
 	if parts := len(r.email.partsInfo.Parts); parts == 0 {
 	if parts := len(r.email.partsInfo.Parts); parts == 0 {
 		return errors.New("email has mime parts missing")
 		return errors.New("email has mime parts missing")
@@ -762,36 +792,35 @@ func (r *chunkMailReader) SeekPart(part int) error {
 	return nil
 	return nil
 }
 }
 
 
+// Read implements the io.Reader interface
 func (r *chunkMailReader) Read(p []byte) (n int, err error) {
 func (r *chunkMailReader) Read(p []byte) (n int, err error) {
 	var chunks []*ChunkSaverChunk
 	var chunks []*ChunkSaverChunk
-	if r.part < 90 {
-		for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
-			chunks, err = r.db.GetChunks(r.email.partsInfo.Parts[r.i].ChunkHash...)
-			if err != nil {
-				return
+	for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
+		chunks, err = r.db.GetChunks(r.email.partsInfo.Parts[r.i].ChunkHash...)
+		if err != nil {
+			return
+		}
+		var nRead int
+		for r.j < len(chunks) {
+			nRead, err = chunks[r.j].data.Read(p)
+			if err == io.EOF {
+				r.j++ // advance to the next chunk
+				err = nil
 			}
 			}
-			var nRead int
-			for r.j < len(chunks) {
-				nRead, err = chunks[r.j].data.Read(p)
-				if err == io.EOF {
-					r.j++ // advance to the next chunk
-					err = nil
+			if r.j == len(chunks) { // last chunk in a part?
+				r.j = 0 // reset chunk index
+				r.i++   // advance to the next part
+				if r.i == len(r.email.partsInfo.Parts) || r.part > 0 {
+					// there are no more parts to return
+					err = io.EOF
 				}
 				}
-				if r.j == len(chunks) { // last chunk in a part?
-					r.j = 0 // reset chunk index
-					r.i++   // advance to the next part
-					if r.i == len(r.email.partsInfo.Parts) || r.part > 0 {
-						// there are no more parts to return
-						err = io.EOF
-					}
-				}
-				// unless there's an error, the next time this function will be
-				// called, it will read the next chunk
-				return nRead, err
 			}
 			}
+			// unless there's an error, the next time this function will be
+			// called, it will read the next chunk
+			return nRead, err
 		}
 		}
-		err = io.EOF
 	}
 	}
+	err = io.EOF
 	return n, err
 	return n, err
 }
 }
 
 
@@ -973,11 +1002,12 @@ func Chunksaver() *StreamDecorator {
 						if part.StartingPos > 0 && part.StartingPos > msgPos {
 						if part.StartingPos > 0 && part.StartingPos > msgPos {
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
 							written += int64(count)
 							written += int64(count)
+							chunkBuffer.currentPart(part)
 							err = chunkBuffer.flush()
 							err = chunkBuffer.flush()
 							if err != nil {
 							if err != nil {
 								return count, err
 								return count, err
 							}
 							}
-							chunkBuffer.currentPart(part)
+
 							fmt.Println("->N")
 							fmt.Println("->N")
 							pos += count
 							pos += count
 							msgPos = part.StartingPos
 							msgPos = part.StartingPos
@@ -986,11 +1016,12 @@ func Chunksaver() *StreamDecorator {
 						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
 						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
 							written += int64(count)
 							written += int64(count)
+							chunkBuffer.currentPart(part)
 							err = chunkBuffer.flush()
 							err = chunkBuffer.flush()
 							if err != nil {
 							if err != nil {
 								return count, err
 								return count, err
 							}
 							}
-							chunkBuffer.currentPart(part)
+
 							fmt.Println("->H")
 							fmt.Println("->H")
 							pos += count
 							pos += count
 							msgPos = part.StartingPosBody
 							msgPos = part.StartingPosBody
@@ -1011,6 +1042,5 @@ func Chunksaver() *StreamDecorator {
 				return sp.Write(p)
 				return sp.Write(p)
 			})
 			})
 		}
 		}
-
 	return sd
 	return sd
 }
 }

+ 9 - 3
backends/s_chunksaver_test.go

@@ -201,9 +201,15 @@ func TestChunkSaverWrite(t *testing.T) {
 			t.Error("email not found")
 			t.Error("email not found")
 			return
 			return
 		}
 		}
-		r, err := NewChunkMailReader(store, email, 1)
-		for i := 0; i < len(email.partsInfo.Parts); i++ {
-			fmt.Println("seeking to", i+1)
+
+		// this should read all parts
+		r, err := NewChunkMailReader(store, email, 0)
+		io.Copy(os.Stdout, r)
+
+		// test the seek feature
+		r, err = NewChunkMailReader(store, email, 1)
+		for i := 1; i < len(email.partsInfo.Parts); i++ {
+			fmt.Println("seeking to", i)
 			r.SeekPart(i)
 			r.SeekPart(i)
 			io.Copy(os.Stdout, r)
 			io.Copy(os.Stdout, r)
 		}
 		}