Browse Source

- pool the buffer for compressing the partsinfo json
- config json, add omitempty
- remove old chunks

flashmob 6 years ago
parent
commit
23c8cf55ee
1 changed files with 50 additions and 21 deletions
  1. 50 21
      backends/s_chunksaver.go

+ 50 - 21
backends/s_chunksaver.go

@@ -39,6 +39,7 @@ import (
 	"io/ioutil"
 	"net"
 	"strings"
+	"sync"
 	"time"
 )
 
@@ -93,12 +94,14 @@ func (h *HashKey) MarshalJSON() ([]byte, error) {
 
 // PartsInfo describes the mime-parts contained in the email
 type PartsInfo struct {
-	Count       uint32        `json:"c"`  // number of parts
-	TextPart    int           `json:"tp"` // index of the main text part to display
-	HTMLPart    int           `json:"hp"` // index of the main html part to display (if any)
-	HasAttach   bool          `json:"a"`
-	Parts       []ChunkedPart `json:"p"`
+	Count       uint32        `json:"c"`   // number of parts
+	TextPart    int           `json:"tp"`  // index of the main text part to display
+	HTMLPart    int           `json:"hp"`  // index of the main html part to display (if any)
+	HasAttach   bool          `json:"a"`   // is there an attachment?
+	Parts       []ChunkedPart `json:"p"`   // info describing a mime-part
 	CBoundaries []string      `json:"cbl"` // content boundaries list
+
+	bp sync.Pool // bytes.buffer pool
 }
 
 // ChunkedPart contains header information about a mime-part, including keys pointing to where the data is stored at
@@ -113,6 +116,18 @@ type ChunkedPart struct {
 	ContentBoundary    int       `json:"cb"` // index to the CBoundaries list in PartsInfo
 }
 
+func NewPartsInfo() *PartsInfo {
+	pi := new(PartsInfo)
+	pi.bp = sync.Pool{
+		// if not available, then create a new one
+		New: func() interface{} {
+			var b bytes.Buffer
+			return &b
+		},
+	}
+	return pi
+}
+
 // boundary takes a string and returns the index of the string in the info.CBoundaries slice
 func (info *PartsInfo) boundary(cb string) int {
 	for i := range info.CBoundaries {
@@ -149,8 +164,15 @@ func (info *PartsInfo) MarshalJSONZlib() ([]byte, error) {
 	if err != nil {
 		return buf, err
 	}
-	var compressed bytes.Buffer
-	zlibw, err := zlib.NewWriterLevel(&compressed, 9)
+	// borrow a buffer form the pool
+	compressed := info.bp.Get().(*bytes.Buffer)
+	// put back in the pool
+	defer func() {
+		compressed.Reset()
+		info.bp.Put(b)
+	}()
+
+	zlibw, err := zlib.NewWriterLevel(compressed, 9)
 	if err != nil {
 		return buf, err
 	}
@@ -203,7 +225,7 @@ func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
 			i += remaining
 			return
 		} else {
-			// warm the buffer to the 'brim' with a slice from p
+			// fill the buffer to the 'brim' with a slice from p
 			c.buf = append(c.buf, p[i:i+free]...)
 			remaining -= free
 			i += free
@@ -316,7 +338,10 @@ func (b *chunkedBytesBufferMime) Reset() {
 
 func (b *chunkedBytesBufferMime) currentPart(cp *mime.Part) {
 	if b.current == nil {
-		b.info = PartsInfo{Parts: make([]ChunkedPart, 0, 3), TextPart: -1, HTMLPart: -1}
+		b.info = *NewPartsInfo()
+		b.info.Parts = make([]ChunkedPart, 0, 3)
+		b.info.TextPart = -1
+		b.info.HTMLPart = -1
 	}
 	b.current = cp
 }
@@ -495,7 +520,7 @@ func (m *chunkSaverMemory) GetEmail(mailID uint64) (*ChunkSaverEmail, error) {
 		return nil, errors.New("mail not found")
 	}
 	email := m.emails[mailID-m.IDOffset]
-	pi := &PartsInfo{}
+	pi := NewPartsInfo()
 	if err := pi.UnmarshalJSONZlib(email.partsInfo); err != nil {
 		return nil, err
 	}
@@ -539,11 +564,11 @@ func (m *chunkSaverMemory) GetChunks(hash ...HashKey) ([]*ChunkSaverChunk, error
 }
 
 type chunkSaverSQLConfig struct {
-	EmailTable  string `json:"email_table"`
-	ChunkTable  string `json:"chunk_table"`
-	Driver      string `json:"sql_driver"`
-	DSN         string `json:"sql_dsn"`
-	PrimaryHost string `json:"primary_mail_host"`
+	EmailTable  string `json:"chunksaver_email_table,omitempty"`
+	ChunkTable  string `json:"chunksaver_chunk_table,omitempty"`
+	Driver      string `json:"chunksaver_sql_driver,omitempty"`
+	DSN         string `json:"chunksaver_sql_dsn,omitempty"`
+	PrimaryHost string `json:"chunksaver_primary_mail_host,omitempty"`
 }
 
 // chunkSaverSQL implements the ChunkSaverStorage interface
@@ -777,11 +802,9 @@ func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int)
 	if err := r.SeekPart(part); err != nil {
 		return nil, err
 	}
-
 	r.cache = cachedChunks{
 		db: db,
 	}
-
 	return r, nil
 }
 
@@ -832,6 +855,7 @@ func (c *cachedChunks) warm(hashes ...HashKey) (int, error) {
 			if i < preload {
 				c.chunks = append(c.chunks, chunks[i])
 			} else {
+				// don't pre-load
 				c.chunks = append(c.chunks, nil) // nil will be a placeholder for our chunk
 			}
 		}
@@ -869,7 +893,14 @@ func (c *cachedChunks) get(i int) (*ChunkSaverChunk, error) {
 				c.chunks[j] = chunks[j-i]
 				c.hashIndex[j] = toGet[j-i]
 			}
-			// todo remove any old ones
+			// remove any old ones (walk back)
+			for j := i; j > -1; j-- {
+				if c.chunks[j] != nil {
+					c.chunks[j] = nil
+				} else {
+					break
+				}
+			}
 
 			// return the chunk asked for
 			return chunks[0], nil
@@ -980,9 +1011,7 @@ func (r *chunkPartDecoder) Read(p []byte) (n int, err error) {
 				// the last char is a \n so next call to Read will check if it starts with a matching \n
 				r.state = decoderStateMatchNL
 			}
-
 		case decoderStateMatchNL:
-
 			if r.buf[0] == '\n' {
 				// found the header
 				start = 1
@@ -1036,7 +1065,7 @@ func Chunksaver() *StreamDecorator {
 				to      string
 				from    string
 
-				progress int
+				progress int // tracks which mime parts were processed
 			)
 
 			var config *chunkSaverConfig