|
@@ -23,6 +23,8 @@ package backends
|
|
|
// ----------------------------------------------------------------------------------
|
|
|
|
|
|
import (
|
|
|
+ "bytes"
|
|
|
+ "compress/zlib"
|
|
|
"crypto/md5"
|
|
|
"database/sql"
|
|
|
"encoding/binary"
|
|
@@ -42,6 +44,7 @@ type chunkSaverConfig struct {
|
|
|
// 16KB default.
|
|
|
ChunkMaxBytes int `json:"chunksaver_chunk_size"`
|
|
|
StorageEngine string `json:"chunksaver_storage_engine"`
|
|
|
+ CompressLevel int `json:"chunksaver_compress_level,omitempty"`
|
|
|
}
|
|
|
|
|
|
func init() {
|
|
@@ -51,11 +54,12 @@ func init() {
|
|
|
}
|
|
|
|
|
|
type PartsInfo struct {
|
|
|
- Count uint32 `json:"c"` // number of parts
|
|
|
- TextPart int `json:"tp"` // id of the main text part to display
|
|
|
- HTMLPart int `json:"hp"` // id of the main html part to display (if any)
|
|
|
- HasAttach bool `json:"a"`
|
|
|
- Parts []chunkedPart `json:"p"`
|
|
|
+ Count uint32 `json:"c"` // number of parts
|
|
|
+ TextPart int `json:"tp"` // id of the main text part to display
|
|
|
+ HTMLPart int `json:"hp"` // id of the main html part to display (if any)
|
|
|
+ HasAttach bool `json:"a"`
|
|
|
+ Parts []chunkedPart `json:"p"`
|
|
|
+ Dictionary []byte `json:"d"` // zlib dictionary
|
|
|
}
|
|
|
|
|
|
type chunkedPart struct {
|
|
@@ -133,19 +137,24 @@ func (c *chunkedBytesBuffer) capTo(n int) {
|
|
|
|
|
|
type chunkedBytesBufferMime struct {
|
|
|
chunkedBytesBuffer
|
|
|
- current *mime.Part
|
|
|
- info PartsInfo
|
|
|
- md5 hash.Hash
|
|
|
- database ChunkSaverStorage
|
|
|
+ current *mime.Part
|
|
|
+ info PartsInfo
|
|
|
+ md5 hash.Hash
|
|
|
+ database ChunkSaverStorage
|
|
|
+ compressLevel int
|
|
|
}
|
|
|
|
|
|
-func newChunkedBytesBufferMime() *chunkedBytesBufferMime {
|
|
|
+func newChunkedBytesBufferMime(compressLevel int) *chunkedBytesBufferMime {
|
|
|
b := new(chunkedBytesBufferMime)
|
|
|
b.chunkedBytesBuffer.flushTrigger = func() error {
|
|
|
return b.onFlush()
|
|
|
}
|
|
|
b.md5 = md5.New()
|
|
|
b.buf = make([]byte, 0, chunkMaxBytes)
|
|
|
+ if compressLevel > 9 {
|
|
|
+ compressLevel = 9
|
|
|
+ }
|
|
|
+ b.compressLevel = compressLevel
|
|
|
return b
|
|
|
}
|
|
|
|
|
@@ -157,6 +166,11 @@ func (b *chunkedBytesBufferMime) onFlush() error {
|
|
|
b.md5.Write(b.buf)
|
|
|
var chash [16]byte
|
|
|
copy(chash[:], b.md5.Sum([]byte{}))
|
|
|
+ var compressed bytes.Buffer
|
|
|
+ zlibw, err := zlib.NewWriterLevel(&compressed, b.compressLevel)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
if b.current != nil {
|
|
|
if size := len(b.info.Parts); size > 0 && b.info.Parts[size-1].PartId == b.current.Node {
|
|
|
// existing part, just append the hash
|
|
@@ -173,7 +187,13 @@ func (b *chunkedBytesBufferMime) onFlush() error {
|
|
|
b.info.Parts = append(b.info.Parts, part)
|
|
|
b.info.Count++
|
|
|
}
|
|
|
- if err := b.database.AddChunk(b.buf, chash[:]); err != nil {
|
|
|
+ if _, err := zlibw.Write(b.buf); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if err := zlibw.Close(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if err := b.database.AddChunk(compressed.Bytes(), chash[:]); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
@@ -339,7 +359,6 @@ func (m *chunkSaverMemory) AddChunk(data []byte, hash []byte) error {
|
|
|
newChunk := chunkSaverMemoryChunk{
|
|
|
modifiedAt: time.Now(),
|
|
|
referenceCount: 1,
|
|
|
- // data: data,
|
|
|
}
|
|
|
newChunk.data = make([]byte, len(data))
|
|
|
copy(newChunk.data, data)
|
|
@@ -665,15 +684,16 @@ func Chunksaver() *StreamDecorator {
|
|
|
}
|
|
|
|
|
|
Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
|
|
|
- if chunkBuffer == nil {
|
|
|
- chunkBuffer = newChunkedBytesBufferMime()
|
|
|
- }
|
|
|
+
|
|
|
configType := BaseConfig(&chunkSaverConfig{})
|
|
|
bcfg, err := Svc.ExtractConfig(backendConfig, configType)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
config = bcfg.(*chunkSaverConfig)
|
|
|
+ if chunkBuffer == nil {
|
|
|
+ chunkBuffer = newChunkedBytesBufferMime(config.CompressLevel)
|
|
|
+ }
|
|
|
// configure storage if none was injected
|
|
|
if database == nil {
|
|
|
if config.StorageEngine == "memory" {
|