Browse Source

- prepare queries (without the map)
chunkPrefetchCountDefault configurable (add chunkPrefetchMax)

flashmob 5 years ago
parent
commit
f9053d7f77
7 changed files with 81 additions and 72 deletions
  1. 3 3
      chunk/chunk_test.go
  2. 10 4
      chunk/processor.go
  3. 10 9
      chunk/reader.go
  4. 1 1
      chunk/store.go
  5. 1 1
      chunk/store_memory.go
  6. 55 33
      chunk/store_sql.go
  7. 1 21
      chunk/store_sql_test.go

+ 3 - 3
chunk/chunk_test.go

@@ -423,7 +423,7 @@ func TestTransformer(t *testing.T) {
 		_ = mimeanalyzer.Close()
 		_ = chunksaver.Close()
 
-		email, err := store.GetEmail(1)
+		email, err := store.GetMessage(1)
 		if err != nil {
 			t.Error("email not found")
 			return
@@ -459,7 +459,7 @@ func TestChunkSaverReader(t *testing.T) {
 		_ = mimeanalyzer.Close()
 		_ = chunksaver.Close()
 
-		email, err := store.GetEmail(1)
+		email, err := store.GetMessage(1)
 		if err != nil {
 			t.Error("email not found")
 			return
@@ -558,7 +558,7 @@ func TestChunkSaverWrite(t *testing.T) {
 			total += len(chunk.data)
 		}
 		fmt.Println("compressed", total, "saved:", written-int64(total))
-		email, err := store.GetEmail(1)
+		email, err := store.GetMessage(1)
 		if err != nil {
 			t.Error("email not found")
 			return

+ 10 - 4
chunk/processor.go

@@ -44,8 +44,7 @@ type Config struct {
 	// ChunkMaxBytes controls the maximum buffer size for saving
 	// 16KB default.
 	ChunkMaxBytes int `json:"chunk_size,omitempty"`
-	// ChunkPrefetchCount specifies how many chunks to pre-fetch when reading from storage.
-	// It may reduce the number of trips required to storage
+	// ChunkPrefetchCount specifies how many chunks to pre-fetch when reading from storage. Default: 2, Max: 32
 	ChunkPrefetchCount int `json:"chunk_prefetch_count,omitempty"`
 	// StorageEngine specifies which storage engine to use (see the StorageEngines map)
 	StorageEngine string `json:"storage_engine,omitempty"`
@@ -121,13 +120,20 @@ func Chunksaver() *backends.StreamDecorator {
 		if database == nil {
 			return nil, errors.New("database is nil")
 		}
-		email, err := database.GetEmail(emailID)
+		email, err := database.GetMessage(emailID)
 		if err != nil {
 			return nil, errors.New("email not found")
 
 		}
 		r, err := NewChunkedReader(database, email, 0)
-		r.ChunkPrefetchCount = config.ChunkPrefetchCount
+		if r != nil && config.ChunkPrefetchCount > 0 {
+			// override the default with the configured value
+			r.ChunkPrefetchCount = config.ChunkPrefetchCount
+			if r.ChunkPrefetchCount > chunkPrefetchMax {
+				r.ChunkPrefetchCount = chunkPrefetchMax
+			}
+		}
+
 		return r, err
 	}
 

+ 10 - 9
chunk/reader.go

@@ -6,6 +6,10 @@ import (
 	"io"
 )
 
+// chunkPrefetchCountDefault controls how many chunks to pre-load in the cache
+const chunkPrefetchCountDefault = 2
+const chunkPrefetchMax = 32
+
 type chunkedReader struct {
 	db    Storage
 	email *Email
@@ -35,7 +39,7 @@ func NewChunkedReader(db Storage, email *Email, part int) (*chunkedReader, error
 	r.cache = cachedChunks{
 		db: db,
 	}
-	r.ChunkPrefetchCount = chunkCachePreload
+	r.ChunkPrefetchCount = chunkPrefetchCountDefault
 	return r, nil
 }
 
@@ -59,19 +63,16 @@ func (r *chunkedReader) SeekPart(part int) error {
 type cachedChunks struct {
 	// chunks stores the cached chunks. It stores the latest chunk being read
 	// and the next few chunks that are yet to be read
-	// (see the chunkCachePreload constant)
 	chunks []*Chunk
 	// hashIndex is a look-up table that returns the hash of a given index
-	hashIndex map[int]HashKey
-	db        Storage
+	hashIndex          map[int]HashKey
+	db                 Storage
+	ChunkPrefetchCount int // how many chunks to pre-load
 }
 
-// chunkCachePreload controls how many chunks to pre-load in the cache
-const chunkCachePreload = 2
-
 // warm allocates the chunk cache, and gets the first few and stores them in the cache
 func (c *cachedChunks) warm(preload int, hashes []HashKey) (int, error) {
-
+	c.ChunkPrefetchCount = preload
 	if c.hashIndex == nil {
 		c.hashIndex = make(map[int]HashKey, len(hashes))
 	}
@@ -119,7 +120,7 @@ func (c *cachedChunks) get(i int) (*Chunk, error) {
 			return nil, errors.New(fmt.Sprintf("hash for key [%s] not found", key))
 		}
 		// make a list of chunks to load (extra ones to be pre-loaded)
-		for to := i + 1; to < len(c.chunks) && to < chunkCachePreload+i; to++ {
+		for to := i + 1; to < len(c.chunks) && to < c.ChunkPrefetchCount+i; to++ {
 			if key, ok := c.hashIndex[to]; ok {
 				toGet = append(toGet, key)
 			}

+ 1 - 1
chunk/store.go

@@ -36,7 +36,7 @@ type Storage interface {
 	// AddChunk saves a chunk of bytes to a given hash key
 	AddChunk(data []byte, hash []byte) error
 	// GetEmail returns an email that's been saved
-	GetEmail(mailID uint64) (*Email, error)
+	GetMessage(mailID uint64) (*Email, error)
 	// GetChunks loads in the specified chunks of bytes from storage
 	GetChunks(hash ...HashKey) ([]*Chunk, error)
 	// Initialize is called when the backend is started

+ 1 - 1
chunk/store_memory.go

@@ -176,7 +176,7 @@ func (m *StoreMemory) Shutdown() (err error) {
 }
 
 // GetEmail implements the Storage interface
-func (m *StoreMemory) GetEmail(mailID uint64) (*Email, error) {
+func (m *StoreMemory) GetMessage(mailID uint64) (*Email, error) {
 	if count := len(m.emails); count == 0 {
 		return nil, errors.New("storage is empty")
 	} else if overflow := uint64(count) - m.offset; overflow > mailID-m.offset {

+ 55 - 33
chunk/store_sql.go

@@ -93,9 +93,16 @@ type sqlConfig struct {
 
 // StoreSQL implements the Storage interface
 type StoreSQL struct {
-	config     sqlConfig
-	statements map[string]*sql.Stmt
-	db         *sql.DB
+	config sqlConfig
+	db     *sql.DB
+
+	sqlSelectChunk        []*sql.Stmt
+	sqlInsertEmail        *sql.Stmt
+	sqlInsertChunk        *sql.Stmt
+	sqlFinalizeEmail      *sql.Stmt
+	sqlChunkReferenceIncr *sql.Stmt
+	sqlChunkReferenceDecr *sql.Stmt
+	sqlSelectMail         *sql.Stmt
 }
 
 func (s *StoreSQL) StartWorker() (stop chan bool) {
@@ -150,9 +157,6 @@ func (s *StoreSQL) connect() (*sql.DB, error) {
 }
 
 func (s *StoreSQL) prepareSql() error {
-	if s.statements == nil {
-		s.statements = make(map[string]*sql.Stmt)
-	}
 
 	// begin inserting an email (before saving chunks)
 	if stmt, err := s.db.Prepare(`INSERT INTO ` +
@@ -161,7 +165,7 @@ func (s *StoreSQL) prepareSql() error {
  VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`); err != nil {
 		return err
 	} else {
-		s.statements["insertEmail"] = stmt
+		s.sqlInsertEmail = stmt
 	}
 
 	// insert a chunk of email's data
@@ -171,7 +175,7 @@ func (s *StoreSQL) prepareSql() error {
  VALUES(?, ?)`); err != nil {
 		return err
 	} else {
-		s.statements["insertChunk"] = stmt
+		s.sqlInsertChunk = stmt
 	}
 
 	// finalize the email (the connection closed)
@@ -181,7 +185,7 @@ func (s *StoreSQL) prepareSql() error {
 		WHERE mail_id = ? `); err != nil {
 		return err
 	} else {
-		s.statements["finalizeEmail"] = stmt
+		s.sqlFinalizeEmail = stmt
 	}
 
 	// Check the existence of a chunk (the reference_count col is incremented if it exists)
@@ -193,7 +197,7 @@ func (s *StoreSQL) prepareSql() error {
 		WHERE hash = ? `); err != nil {
 		return err
 	} else {
-		s.statements["chunkReferenceIncr"] = stmt
+		s.sqlChunkReferenceIncr = stmt
 	}
 
 	// If the reference_count is 0 then it means the chunk has been deleted
@@ -204,7 +208,7 @@ func (s *StoreSQL) prepareSql() error {
 		WHERE hash = ? AND reference_count > 0`); err != nil {
 		return err
 	} else {
-		s.statements["chunkReferenceDecr"] = stmt
+		s.sqlChunkReferenceDecr = stmt
 	}
 
 	// fetch an email
@@ -214,17 +218,20 @@ func (s *StoreSQL) prepareSql() error {
 		where mail_id=?`); err != nil {
 		return err
 	} else {
-		s.statements["selectMail"] = stmt
+		s.sqlSelectMail = stmt
 	}
 
-	// fetch a chunk
-	if stmt, err := s.db.Prepare(`
-		SELECT * 
-		from ` + s.config.EmailChunkTable + ` 
-		where hash=?`); err != nil {
-		return err
-	} else {
-		s.statements["selectChunk"] = stmt
+	// fetch a chunk, used in GetChunks
+	// prepare a query for all possible combinations is prepared
+
+	for i := 0; i < chunkPrefetchMax; i++ {
+		if stmt, err := s.db.Prepare(
+			s.getChunksSQL(i + 1),
+		); err != nil {
+			return err
+		} else {
+			s.sqlSelectChunk[i] = stmt
+		}
 	}
 
 	// TODO sweep old chunks
@@ -256,7 +263,7 @@ func (s *StoreSQL) OpenMessage(
 	} else {
 		copy(ip6, ipAddress.IP)
 	}
-	r, err := s.statements["insertEmail"].Exec(
+	r, err := s.sqlInsertEmail.Exec(
 		queuedID.Bytes(),
 		time.Now().Format(mysqlYYYY_m_d_s_H_i_s),
 		from,
@@ -280,7 +287,7 @@ func (s *StoreSQL) OpenMessage(
 // AddChunk implements the Storage interface
 func (s *StoreSQL) AddChunk(data []byte, hash []byte) error {
 	// attempt to increment the reference_count (it means the chunk is already in there)
-	r, err := s.statements["chunkReferenceIncr"].Exec(hash)
+	r, err := s.sqlChunkReferenceIncr.Exec(hash)
 	if err != nil {
 		return err
 	}
@@ -290,7 +297,7 @@ func (s *StoreSQL) AddChunk(data []byte, hash []byte) error {
 	}
 	if affected == 0 {
 		// chunk isn't in there, let's insert it
-		_, err := s.statements["insertChunk"].Exec(data, hash)
+		_, err := s.sqlInsertChunk.Exec(data, hash)
 		if err != nil {
 			return err
 		}
@@ -309,7 +316,7 @@ func (s *StoreSQL) CloseMessage(
 	if err != nil {
 		return err
 	}
-	_, err = s.statements["finalizeEmail"].Exec(size, partsInfoJson, subject, to, from, mailID)
+	_, err = s.sqlFinalizeEmail.Exec(size, partsInfoJson, subject, to, from, mailID)
 	if err != nil {
 		return err
 	}
@@ -332,6 +339,8 @@ func (s *StoreSQL) Initialize(cfg backends.ConfigGroup) error {
 	if s.config.Driver == "" {
 		s.config.Driver = "mysql"
 	}
+	// because it uses an IN(?) query, so we need a different query for each possible ? combination (max chunkPrefetchMax)
+	s.sqlSelectChunk = make([]*sql.Stmt, chunkPrefetchMax)
 
 	s.db, err = s.connect()
 	if err != nil {
@@ -353,8 +362,18 @@ func (s *StoreSQL) Shutdown() (err error) {
 			err = closeErr
 		}
 	}()
-	for i := range s.statements {
-		if err = s.statements[i].Close(); err != nil {
+	toClose := []*sql.Stmt{
+		s.sqlInsertEmail,
+		s.sqlFinalizeEmail,
+		s.sqlInsertChunk,
+		s.sqlChunkReferenceIncr,
+		s.sqlChunkReferenceDecr,
+		s.sqlSelectMail,
+	}
+	toClose = append(toClose, s.sqlSelectChunk...)
+
+	for i := range toClose {
+		if err = toClose[i].Close(); err != nil {
 			backends.Log().WithError(err).Error("failed to close sql statement")
 		}
 	}
@@ -362,13 +381,13 @@ func (s *StoreSQL) Shutdown() (err error) {
 }
 
 // GetEmail implements the Storage interface
-func (s *StoreSQL) GetEmail(mailID uint64) (*Email, error) {
+func (s *StoreSQL) GetMessage(mailID uint64) (*Email, error) {
 
 	email := &Email{}
 	var createdAt mysql.NullTime
 	var transport transportType
 	var protocol protocol
-	err := s.statements["selectMail"].QueryRow(mailID).Scan(
+	err := s.sqlSelectMail.QueryRow(mailID).Scan(
 		&email.mailID,
 		&createdAt,
 		&email.size,
@@ -411,6 +430,13 @@ func (v chunkData) Value() (driver.Value, error) {
 	return v[:], nil
 }
 
+func (s *StoreSQL) getChunksSQL(size int) string {
+	return fmt.Sprintf("SELECT modified_at, reference_count, data, `hash` FROM %s WHERE `hash` in (%s)",
+		s.config.EmailChunkTable,
+		"?"+strings.Repeat(",?", size-1),
+	)
+}
+
 // GetChunks implements the Storage interface
 func (s *StoreSQL) GetChunks(hash ...HashKey) ([]*Chunk, error) {
 	result := make([]*Chunk, len(hash))
@@ -419,11 +445,7 @@ func (s *StoreSQL) GetChunks(hash ...HashKey) ([]*Chunk, error) {
 	for i := range hash {
 		args[i] = &hash[i]
 	}
-	query := fmt.Sprintf("SELECT modified_at, reference_count, data, `hash` FROM %s WHERE `hash` in (%s)",
-		s.config.EmailChunkTable,
-		"?"+strings.Repeat(",?", len(hash)-1),
-	)
-	rows, err := s.db.Query(query, args...)
+	rows, err := s.sqlSelectChunk[len(args)-1].Query(args...)
 	defer func() {
 		if rows != nil {
 			_ = rows.Close()

+ 1 - 21
chunk/store_sql_test.go

@@ -60,28 +60,8 @@ func TestSQLStore(t *testing.T) {
 		_ = chunksaver.Close()
 
 		fmt.Println("written:", written)
-		/*
-			total := 0
-			for _, chunk := range storeMemory.chunks {
-				total += len(chunk.data)
-			}
-			fmt.Println("compressed", total, "saved:", written-int64(total))
-		*/
-
-		/*
-
-			part 5 (gif)
-			5a94c939c7101636fc19f266f701968b
-			45c5d2a84119b3a21b0306a9524b361a
-			74eb56d4dd331e3d8c76a373556d6bcb
-
-			hash 5a94c939c7101636fc19f266f701968b h 45c5d2a84119b3a21b0306a9524b361a i 0
-			hash 45c5d2a84119b3a21b0306a9524b361a h 5a94c939c7101636fc19f266f701968b i 1
-
-			hash 74eb56d4dd331e3d8c76a373556d6bcb h 74eb56d4dd331e3d8c76a373556d6bcb i 0
-		*/
 
-		email, err := storeSql.GetEmail(e.MessageID)
+		email, err := storeSql.GetMessage(e.MessageID)
 
 		if err != nil {
 			t.Error("email not found")