|
@@ -2,6 +2,7 @@ package chunk
|
|
|
|
|
|
import (
|
|
import (
|
|
"bytes"
|
|
"bytes"
|
|
|
|
+ "compress/zlib"
|
|
"database/sql"
|
|
"database/sql"
|
|
"database/sql/driver"
|
|
"database/sql/driver"
|
|
"encoding/binary"
|
|
"encoding/binary"
|
|
@@ -14,6 +15,7 @@ import (
|
|
"github.com/go-sql-driver/mysql"
|
|
"github.com/go-sql-driver/mysql"
|
|
"net"
|
|
"net"
|
|
"strings"
|
|
"strings"
|
|
|
|
+ "sync"
|
|
"time"
|
|
"time"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -103,6 +105,8 @@ type StoreSQL struct {
|
|
sqlChunkReferenceIncr *sql.Stmt
|
|
sqlChunkReferenceIncr *sql.Stmt
|
|
sqlChunkReferenceDecr *sql.Stmt
|
|
sqlChunkReferenceDecr *sql.Stmt
|
|
sqlSelectMail *sql.Stmt
|
|
sqlSelectMail *sql.Stmt
|
|
|
|
+
|
|
|
|
+ bufferPool sync.Pool
|
|
}
|
|
}
|
|
|
|
|
|
func (s *StoreSQL) StartWorker() (stop chan bool) {
|
|
func (s *StoreSQL) StartWorker() (stop chan bool) {
|
|
@@ -297,7 +301,23 @@ func (s *StoreSQL) AddChunk(data []byte, hash []byte) error {
|
|
}
|
|
}
|
|
if affected == 0 {
|
|
if affected == 0 {
|
|
// chunk isn't in there, let's insert it
|
|
// chunk isn't in there, let's insert it
|
|
- _, err := s.sqlInsertChunk.Exec(data, hash)
|
|
|
|
|
|
+
|
|
|
|
+ compressed := s.bufferPool.Get().(*bytes.Buffer)
|
|
|
|
+ defer func() {
|
|
|
|
+ compressed.Reset()
|
|
|
|
+ s.bufferPool.Put(compressed)
|
|
|
|
+ }()
|
|
|
|
+ zlibw, err := zlib.NewWriterLevel(compressed, s.config.CompressLevel)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ if _, err := zlibw.Write(data); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ if err := zlibw.Close(); err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ _, err = s.sqlInsertChunk.Exec(compressed.Bytes(), hash)
|
|
if err != nil {
|
|
if err != nil {
|
|
return err
|
|
return err
|
|
}
|
|
}
|
|
@@ -339,6 +359,15 @@ func (s *StoreSQL) Initialize(cfg backends.ConfigGroup) error {
|
|
if s.config.Driver == "" {
|
|
if s.config.Driver == "" {
|
|
s.config.Driver = "mysql"
|
|
s.config.Driver = "mysql"
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ s.bufferPool = sync.Pool{
|
|
|
|
+ // if not available, then create a new one
|
|
|
|
+ New: func() interface{} {
|
|
|
|
+ var b bytes.Buffer
|
|
|
|
+ return &b
|
|
|
|
+ },
|
|
|
|
+ }
|
|
|
|
+
|
|
// because it uses an IN(?) query, so we need a different query for each possible ? combination (max chunkPrefetchMax)
|
|
// because it uses an IN(?) query, so we need a different query for each possible ? combination (max chunkPrefetchMax)
|
|
s.sqlSelectChunk = make([]*sql.Stmt, chunkPrefetchMax)
|
|
s.sqlSelectChunk = make([]*sql.Stmt, chunkPrefetchMax)
|
|
|
|
|
|
@@ -472,7 +501,10 @@ func (s *StoreSQL) GetChunks(hash ...HashKey) ([]*Chunk, error) {
|
|
); err != nil {
|
|
); err != nil {
|
|
return result, err
|
|
return result, err
|
|
}
|
|
}
|
|
- c.data = bytes.NewBuffer(data)
|
|
|
|
|
|
+ c.data, err = zlib.NewReader(bytes.NewReader(data))
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
c.modifiedAt = createdAt.Time
|
|
c.modifiedAt = createdAt.Time
|
|
temp[h] = &c
|
|
temp[h] = &c
|
|
i++
|
|
i++
|