Преглед изворни кода

decoder wip (now reading using the cache)

flashmob пре 6 година
родитељ
комит
ab9b189221
2 измењених фајлова са 190 додато и 48 уклоњено
  1. 167 48
      backends/s_chunksaver.go
  2. 23 0
      backends/s_chunksaver_test.go

+ 167 - 48
backends/s_chunksaver.go

@@ -203,7 +203,7 @@ func (c *chunkedBytesBuffer) Write(p []byte) (i int, err error) {
 			i += remaining
 			return
 		} else {
-			// fill the buffer to the 'brim' with a slice from p
+			// warm the buffer to the 'brim' with a slice from p
 			c.buf = append(c.buf, p[i:i+free]...)
 			remaining -= free
 			i += free
@@ -759,6 +759,8 @@ type chunkMailReader struct {
 	// part requests a part. If 0, all the parts are read sequentially
 	part int
 	i, j int
+
+	cache cachedChunks
 }
 
 // NewChunkMailReader loads the email and selects which mime-part Read will read, starting from 1
@@ -776,6 +778,10 @@ func NewChunkMailReader(db ChunkSaverStorage, email *ChunkSaverEmail, part int)
 		return nil, err
 	}
 
+	r.cache = cachedChunks{
+		db: db,
+	}
+
 	return r, nil
 }
 
@@ -792,27 +798,129 @@ func (r *chunkMailReader) SeekPart(part int) error {
 	return nil
 }
 
+type cachedChunks struct {
+	chunks    []*ChunkSaverChunk
+	hashIndex map[int]HashKey
+	db        ChunkSaverStorage
+}
+
+const chunkCachePreload = 2
+
+// warm allocates the chunk cache, and gets the first few and stores them in the cache
+func (c *cachedChunks) warm(hashes ...HashKey) (int, error) {
+
+	if c.hashIndex == nil {
+		c.hashIndex = make(map[int]HashKey, len(hashes))
+	}
+	if c.chunks == nil {
+		c.chunks = make([]*ChunkSaverChunk, 0, 100)
+	}
+	if len(c.chunks) > 0 {
+		// already been filled
+		return len(c.chunks), nil
+	}
+	// let's pre-load some hashes.
+	preload := chunkCachePreload
+	if len(hashes) < preload {
+		preload = len(hashes)
+	}
+	if chunks, err := c.db.GetChunks(hashes[0:preload]...); err != nil {
+		return 0, err
+	} else {
+		for i := range hashes {
+			c.hashIndex[i] = hashes[i]
+			if i < preload {
+				c.chunks = append(c.chunks, chunks[i])
+			} else {
+				c.chunks = append(c.chunks, nil) // nil will be a placeholder for our chunk
+			}
+		}
+	}
+	return len(c.chunks), nil
+}
+
+// get returns a chunk. If the chunk doesn't exist, it gets it and pre-loads the next few
+// also removes the previous chunks that now have become stale
+func (c *cachedChunks) get(i int) (*ChunkSaverChunk, error) {
+	if i > len(c.chunks) {
+		return nil, errors.New("not enough chunks")
+	}
+	if c.chunks[i] != nil {
+		// cache hit!
+		return c.chunks[i], nil
+	} else {
+		var toGet []HashKey
+		if key, ok := c.hashIndex[i]; ok {
+			toGet = append(toGet, key)
+		} else {
+			return nil, errors.New(fmt.Sprintf("hash for key [%s] not found", key))
+		}
+		// make a list of chunks to load (extra ones to be pre-loaded)
+		for to := i + 1; to < len(c.chunks) || to > chunkCachePreload+i; to++ {
+			if key, ok := c.hashIndex[to]; ok {
+				toGet = append(toGet, key)
+			}
+		}
+		if chunks, err := c.db.GetChunks(toGet...); err != nil {
+			return nil, err
+		} else {
+			// cache the pre-loaded chunks
+			for j := i; j < len(c.chunks); j++ {
+				c.chunks[j] = chunks[j-i]
+				c.hashIndex[j] = toGet[j-i]
+			}
+			// todo remove any old ones
+
+			// return the chunk asked for
+			return chunks[0], nil
+		}
+	}
+
+}
+
+// purgeChunks remove any chunks before i
+func (c *cachedChunks) purgeChunks(i int) {
+
+}
+func (c *cachedChunks) empty() {
+
+	for i := range c.chunks {
+		c.chunks[i] = nil
+	}
+	c.chunks = c.chunks[:] // set len to 0
+
+	for key := range c.hashIndex {
+		delete(c.hashIndex, key)
+	}
+
+}
+
 // Read implements the io.Reader interface
 func (r *chunkMailReader) Read(p []byte) (n int, err error) {
-	var chunks []*ChunkSaverChunk
+	var length int
 	for ; r.i < len(r.email.partsInfo.Parts); r.i++ {
-		chunks, err = r.db.GetChunks(r.email.partsInfo.Parts[r.i].ChunkHash...)
+		length, err = r.cache.warm(r.email.partsInfo.Parts[r.i].ChunkHash...)
 		if err != nil {
 			return
 		}
 		var nRead int
-		for r.j < len(chunks) {
-			nRead, err = chunks[r.j].data.Read(p)
+		for r.j < length {
+			chunk, err := r.cache.get(r.j)
+			if err != nil {
+				return nRead, err
+			}
+			nRead, err = chunk.data.Read(p)
 			if err == io.EOF {
 				r.j++ // advance to the next chunk
 				err = nil
 			}
-			if r.j == len(chunks) { // last chunk in a part?
+			if r.j == length { // last chunk in a part?
 				r.j = 0 // reset chunk index
 				r.i++   // advance to the next part
 				if r.i == len(r.email.partsInfo.Parts) || r.part > 0 {
 					// there are no more parts to return
 					err = io.EOF
+					r.cache.empty()
 				}
 			}
 			// unless there's an error, the next time this function will be
@@ -843,51 +951,62 @@ func NewChunkPartDecoder(db ChunkSaverStorage, email *ChunkSaverEmail, part int)
 
 const chunkSaverNL = '\n'
 
+const (
+	decoderStateFindHeader int = iota
+	decoderStateMatchNL
+	decoderStateDecode
+)
+
 func (r *chunkPartDecoder) Read(p []byte) (n int, err error) {
 	var part *ChunkedPart
 	//if cap(p) != cap(r.buf) {
 	r.buf = make([]byte, len(p), cap(p))
-	//} else {
-	//	r.buf = r.buf[:0] // length back to 0
-	//}
+	var start, buffered int
 	part = &r.email.partsInfo.Parts[r.part]
 	_ = part
-	var offset int
-
+	buffered, err = r.chunkMailReader.Read(r.buf)
+	if buffered == 0 {
+		return
+	}
 	for {
-		n, err = r.chunkMailReader.Read(r.buf)
-		if n == 0 {
-			return
-		}
 		switch r.state {
-		case 0:
+		case decoderStateFindHeader:
 			// finding the start of the header
-			if i := bytes.IndexByte(r.buf, chunkSaverNL); i != -1 {
-				if i+1 < len(r.buf) {
-					if r.buf[i+1] == chunkSaverNL {
-						r.state = 3 // found the header
-					}
-				}
-				r.state = 1
+			if start = bytes.Index(r.buf, []byte{chunkSaverNL, chunkSaverNL}); start != -1 {
+				start += 2                   // skip the \n\n
+				r.state = decoderStateDecode // found the header
+				continue                     // continue scanning
+			} else if r.buf[len(r.buf)-1] == chunkSaverNL {
+				// the last char is a \n so next call to Read will check if it starts with a matching \n
+				r.state = decoderStateMatchNL
 			}
-			// a new []byte will be loaded on next iteration
 
-		case 1:
+		case decoderStateMatchNL:
 
-			if i := bytes.Index(r.buf, []byte("\n")); i != -1 {
-				r.state = 2
+			if r.buf[0] == '\n' {
+				// found the header
+				start = 1
+				r.state = decoderStateDecode
+				continue
+			} else {
+				r.state = decoderStateFindHeader
+				continue
 			}
+
+		case decoderStateDecode:
+			if start < len(r.buf) {
+				// todo decode here (q-printable, base64, charset)
+				n += copy(p[:], r.buf[start:buffered])
+			}
+			return
 		}
-		//offset++
-		if offset > len(p) {
-			break
+
+		buffered, err = r.chunkMailReader.Read(r.buf)
+		if buffered == 0 {
+			return
 		}
 	}
 
-	return
-
-	//if r.email.partsInfo.Parts[r.part].
-
 }
 
 const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
@@ -916,6 +1035,8 @@ func Chunksaver() *StreamDecorator {
 				subject string
 				to      string
 				from    string
+
+				progress int
 			)
 
 			var config *chunkSaverConfig
@@ -974,6 +1095,7 @@ func Chunksaver() *StreamDecorator {
 			sd.Open = func(e *mail.Envelope) error {
 				// create a new entry & grab the id
 				written = 0
+				progress = 0
 				var ip net.IPAddr
 				if ret := net.ParseIP(e.RemoteIP); ret != nil {
 					ip = net.IPAddr{IP: ret}
@@ -1049,18 +1171,12 @@ func Chunksaver() *StreamDecorator {
 				if envelope.Values == nil {
 					return count, errors.New("no message headers found")
 				}
-				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok {
-					var (
-						pos      int
-						progress int
-					)
-					if len(*parts) > 2 {
-						// todo: progress is a bit buggy
-						// todo: do not flush empty buffer
-						//progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already out
-					}
+				if parts, ok := envelope.Values["MimeParts"].(*[]*mime.Part); ok && len(*parts) > 0 {
+					var pos int
+
 					subject, to, from = fillVars(parts, subject, to, from)
 					offset := msgPos
+					chunkBuffer.currentPart((*parts)[0])
 					for i := progress; i < len(*parts); i++ {
 						part := (*parts)[i]
 
@@ -1068,12 +1184,12 @@ func Chunksaver() *StreamDecorator {
 						if part.StartingPos > 0 && part.StartingPos > msgPos {
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPos-offset])
 							written += int64(count)
-							chunkBuffer.currentPart(part)
+
 							err = chunkBuffer.flush()
 							if err != nil {
 								return count, err
 							}
-
+							chunkBuffer.currentPart(part)
 							fmt.Println("->N")
 							pos += count
 							msgPos = part.StartingPos
@@ -1082,12 +1198,12 @@ func Chunksaver() *StreamDecorator {
 						if part.StartingPosBody > 0 && part.StartingPosBody >= msgPos {
 							count, _ = chunkBuffer.Write(p[pos : part.StartingPosBody-offset])
 							written += int64(count)
-							chunkBuffer.currentPart(part)
+
 							err = chunkBuffer.flush()
 							if err != nil {
 								return count, err
 							}
-
+							chunkBuffer.currentPart(part)
 							fmt.Println("->H")
 							pos += count
 							msgPos = part.StartingPosBody
@@ -1104,6 +1220,9 @@ func Chunksaver() *StreamDecorator {
 							break
 						}
 					}
+					if len(*parts) > 2 {
+						progress = len(*parts) - 2 // skip to 2nd last part, assume previous parts are already processed
+					}
 				}
 				return sp.Write(p)
 			})

Разлика између датотеке није приказан због своје велике величине
+ 23 - 0
backends/s_chunksaver_test.go


Неке датотеке нису приказане због велике количине промена