Browse Source

storage engines follow the configuration pattern
stream processor decorators have a new Shutdown function, switched to use this instead of Svc
start developing background processing

flashmob 5 years ago
parent
commit
a3f6d7c6c0
12 changed files with 277 additions and 121 deletions
  1. 84 9
      api_test.go
  2. 1 0
      backends/backend.go
  3. 14 6
      backends/decorate_stream.go
  4. 35 6
      backends/gateway.go
  5. 21 17
      backends/s_mimeanalyzer.go
  6. 2 1
      chunk/buffer.go
  7. 6 5
      chunk/chunk_test.go
  8. 61 64
      chunk/processor.go
  9. 12 1
      chunk/store.go
  10. 22 2
      chunk/store_memory.go
  11. 16 10
      chunk/store_sql.go
  12. 3 0
      server.go

+ 84 - 9
api_test.go

@@ -962,14 +962,14 @@ func TestStreamProcessorConfig(t *testing.T) {
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
 			"stream_procEssoRs": { // note mixed case
 			"stream_procEssoRs": { // note mixed case
 				"chunkSaver": { // note mixed case
 				"chunkSaver": { // note mixed case
-					"chunksaver_chunk_size":     8000,
-					"chunksaver_storage_engine": "memory",
-					"chunksaver_compress_level": 0,
+					"chunk_size":     8000,
+					"storage_engine": "memory",
+					"compress_level": 0,
 				},
 				},
 				"test:chunksaver": {
 				"test:chunksaver": {
-					"chunksaver_chunk_size":     8000,
-					"chunksaver_storage_engine": "memory",
-					"chunksaver_compress_level": 0,
+					"chunk_size":     8000,
+					"storage_engine": "memory",
+					"compress_level": 0,
 				},
 				},
 				"debug": {
 				"debug": {
 					"sleep_seconds": 2,
 					"sleep_seconds": 2,
@@ -1012,6 +1012,81 @@ func TestStreamProcessor(t *testing.T) {
 				"default": {
 				"default": {
 					"save_process":        "HeadersParser|Debugger",
 					"save_process":        "HeadersParser|Debugger",
 					"stream_save_process": "Header|headersparser|compress|Decompress|debug",
 					"stream_save_process": "Header|headersparser|compress|Decompress|debug",
+					"post_process":        "Header|headersparser|compress|Decompress|debug",
+				},
+			},
+		},
+	}
+	d := Daemon{Config: cfg}
+
+	if err := d.Start(); err != nil {
+		t.Error(err)
+	}
+	body := "Subject: Test subject\r\n" +
+		//"\r\n" +
+		"A an email body.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n"
+
+	// lets have a talk with the server
+	if err := talkToServer("127.0.0.1:2525", body); err != nil {
+		t.Error(err)
+	}
+
+	d.Shutdown()
+
+	b, err := ioutil.ReadFile("tests/testlog")
+	if err != nil {
+		t.Error("could not read logfile")
+		return
+	}
+
+	// lets check for fingerprints
+	if strings.Index(string(b), "Debug stream") < 0 {
+		t.Error("did not log: Debug stream")
+	}
+
+	if strings.Index(string(b), "Error") != -1 {
+		t.Error("There was an error", string(b))
+	}
+
+}
+
+func TestStreamProcessorBackground(t *testing.T) {
+
+	if err := os.Truncate("tests/testlog", 0); err != nil {
+		t.Error(err)
+	}
+	cfg := &AppConfig{
+		LogFile:      "tests/testlog",
+		AllowedHosts: []string{"grr.la"},
+		BackendConfig: backends.BackendConfig{
+			"stream_processors": {
+				"debug": {
+					"log_reads": true,
+				},
+				"chunksaver": {
+					"chunk_size":     8000,
+					"storage_engine": "memory",
+					"compress_level": 0,
+				},
+			},
+			"gateways": {
+				"default": {
+					"save_process":          "",
+					"stream_save_process":   "mimeanalyzer|chunksaver",
+					"post_process_consumer": "Header|headersparser|compress|Decompress|debug",
+					"post_process_producer": "chunksaver",
 				},
 				},
 			},
 			},
 		},
 		},
@@ -1470,9 +1545,9 @@ func TestStreamChunkSaver(t *testing.T) {
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
 			"stream_processors": {
 			"stream_processors": {
 				"chunksaver": {
 				"chunksaver": {
-					"chunksaver_chunk_size":     1024 * 32,
-					"stream_buffer_size":        1024 * 16,
-					"chunksaver_storage_engine": "memory",
+					"chunk_size":         1024 * 32,
+					"stream_buffer_size": 1024 * 16,
+					"storage_engine":     "memory",
 				},
 				},
 			},
 			},
 			"gateways": {
 			"gateways": {

+ 1 - 0
backends/backend.go

@@ -44,6 +44,7 @@ type Backend interface {
 	Process(*mail.Envelope) Result
 	Process(*mail.Envelope) Result
 	// ValidateRcpt validates the last recipient that was pushed to the mail envelope
 	// ValidateRcpt validates the last recipient that was pushed to the mail envelope
 	ValidateRcpt(e *mail.Envelope) RcptError
 	ValidateRcpt(e *mail.Envelope) RcptError
+	ProcessBackground(e *mail.Envelope)
 	// ProcessStream is the alternative for Process, a stream is read from io.Reader
 	// ProcessStream is the alternative for Process, a stream is read from io.Reader
 	ProcessStream(r io.Reader, e *mail.Envelope) (Result, error)
 	ProcessStream(r io.Reader, e *mail.Envelope) (Result, error)
 	// StreamOn signals if ProcessStream can be used
 	// StreamOn signals if ProcessStream can be used

+ 14 - 6
backends/decorate_stream.go

@@ -6,20 +6,28 @@ import (
 )
 )
 
 
 type streamOpenWith func(e *mail.Envelope) error
 type streamOpenWith func(e *mail.Envelope) error
-
 type streamCloseWith func() error
 type streamCloseWith func() error
-
 type streamConfigureWith func(cfg ConfigGroup) error
 type streamConfigureWith func(cfg ConfigGroup) error
+type streamShutdownWith func() error
 
 
 // We define what a decorator to our processor will look like
 // We define what a decorator to our processor will look like
 // StreamProcessor argument is the underlying processor that we're decorating
 // StreamProcessor argument is the underlying processor that we're decorating
 // the additional ...interface argument is not needed, but can be useful for dependency injection
 // the additional ...interface argument is not needed, but can be useful for dependency injection
 type StreamDecorator struct {
 type StreamDecorator struct {
-	Decorate  func(StreamProcessor, ...interface{}) StreamProcessor
-	e         *mail.Envelope
-	Close     streamCloseWith
-	Open      streamOpenWith
+	// Decorate is called first. The StreamProcessor will be the next processor called
+	// after this one finished.
+	Decorate func(StreamProcessor, ...interface{}) StreamProcessor
+	e        *mail.Envelope
+	// Open is called at the start of each email
+	Open streamOpenWith
+	// Close is called when the email finished writing
+	Close streamCloseWith
+	// Configure is always called after Decorate, only once for the entire lifetime
+	// it can open database connections, test file permissions, etc
 	Configure streamConfigureWith
 	Configure streamConfigureWith
+	// Shutdown is called to release any resources before StreamDecorator is destroyed
+	// typically to close any database connections, cleanup any files, etc
+	Shutdown streamShutdownWith
 }
 }
 
 
 func (s StreamDecorator) ExtractConfig(cfg ConfigGroup, i interface{}) error {
 func (s StreamDecorator) ExtractConfig(cfg ConfigGroup, i interface{}) error {

+ 35 - 6
backends/gateway.go

@@ -111,6 +111,22 @@ func (s *streamer) close() error {
 	return err
 	return err
 }
 }
 
 
+func (s *streamer) shutdown() error {
+	var err Errors
+	// shutdown in reverse order
+	for i := len(s.d) - 1; i >= 0; i-- {
+		if s.d[i].Shutdown != nil {
+			if e := s.d[i].Shutdown(); e != nil {
+				err = append(err, e)
+			}
+		}
+	}
+	if len(err) == 0 {
+		return nil
+	}
+	return err
+}
+
 type backendState int
 type backendState int
 
 
 // possible values for state
 // possible values for state
@@ -276,6 +292,10 @@ func (gw *BackendGateway) StreamOn() bool {
 	return len(gw.gwConfig.StreamSaveProcess) != 0
 	return len(gw.gwConfig.StreamSaveProcess) != 0
 }
 }
 
 
+func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {
+
+}
+
 func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result, error) {
 func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result, error) {
 	res := response.Canned
 	res := response.Canned
 	if gw.State != BackendStateRunning {
 	if gw.State != BackendStateRunning {
@@ -340,6 +360,12 @@ func (gw *BackendGateway) Shutdown() error {
 		gw.stopWorkers()
 		gw.stopWorkers()
 		// wait for workers to stop
 		// wait for workers to stop
 		gw.wg.Wait()
 		gw.wg.Wait()
+		for stream := range gw.streamers {
+			err := gw.streamers[stream].shutdown()
+			if err != nil {
+				Log().Fields("error", err, "gateway", gw.name).Error("failed shutting down stream")
+			}
+		}
 		// call shutdown on all processor shutdowners
 		// call shutdown on all processor shutdowners
 		if err := Svc.shutdown(); err != nil {
 		if err := Svc.shutdown(); err != nil {
 			return err
 			return err
@@ -390,18 +416,21 @@ func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 	var decorators []*StreamDecorator
 	var decorators []*StreamDecorator
 	noop := streamer{NoopStreamProcessor{}, decorators}
 	noop := streamer{NoopStreamProcessor{}, decorators}
-	c := newStackStreamProcessorConfig(stackConfig, newAliasMap(gw.config[ConfigStreamProcessors.String()]))
+	configKey := ConfigStreamProcessors.String()
+	c := newStackStreamProcessorConfig(stackConfig, newAliasMap(gw.config[configKey]))
 	if len(c.list) == 0 {
 	if len(c.list) == 0 {
 		return noop, nil
 		return noop, nil
 	}
 	}
 	for i := range c.list {
 	for i := range c.list {
 		if makeFunc, ok := Streamers[c.list[i].name]; ok {
 		if makeFunc, ok := Streamers[c.list[i].name]; ok {
 			d := makeFunc()
 			d := makeFunc()
-			if config := gw.config.group(ConfigStreamProcessors.String(), c.list[i].String()); config != nil {
-				if d.Configure != nil {
-					if err := d.Configure(config); err != nil {
-						return noop, err
-					}
+			config := gw.config.group(configKey, c.list[i].String())
+			if config == nil {
+				config = ConfigGroup{}
+			}
+			if d.Configure != nil {
+				if err := d.Configure(config); err != nil {
+					return noop, err
 				}
 				}
 			}
 			}
 			decorators = append(decorators, d)
 			decorators = append(decorators, d)

+ 21 - 17
backends/s_mimeanalyzer.go

@@ -28,26 +28,30 @@ func init() {
 func StreamMimeAnalyzer() *StreamDecorator {
 func StreamMimeAnalyzer() *StreamDecorator {
 
 
 	sd := &StreamDecorator{}
 	sd := &StreamDecorator{}
+	var (
+		envelope *mail.Envelope
+		parseErr error
+		parser   *mimeparse.Parser
+	)
+	sd.Configure = func(cfg ConfigGroup) error {
+		parser = mimeparse.NewMimeParser()
+		return nil
+	}
+	sd.Shutdown = func() error {
+		var err error
+		defer func() {
+			parser = nil
+
+		}()
+		if err = parser.Close(); err != nil {
+			Log().WithError(err).Error("error when closing parser in mimeanalyzer")
+			return err
+		}
+		return nil
+	}
 
 
 	sd.Decorate =
 	sd.Decorate =
 		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
 		func(sp StreamProcessor, a ...interface{}) StreamProcessor {
-			var (
-				envelope *mail.Envelope
-				parseErr error
-				parser   *mimeparse.Parser
-			)
-			Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
-				parser = mimeparse.NewMimeParser()
-				return nil
-			}))
-
-			Svc.AddShutdowner(ShutdownWith(func() error {
-				if err := parser.Close(); err != nil {
-					Log().WithError(err).Error("error when closing parser in mimeanalyzer")
-				}
-				parser = nil
-				return nil
-			}))
 
 
 			sd.Open = func(e *mail.Envelope) error {
 			sd.Open = func(e *mail.Envelope) error {
 				parser.Open()
 				parser.Open()

+ 2 - 1
chunk/buffer.go

@@ -71,7 +71,8 @@ func (c *chunkingBuffer) CapTo(n int) {
 	c.buf = make([]byte, 0, n)
 	c.buf = make([]byte, 0, n)
 }
 }
 
 
-// ChunkingBufferMime decorates chunkingBuffer, specifying that to do when a flush event is triggered
+// ChunkingBufferMime decorates chunkingBuffer, defining what to do when a flush event is triggered
+// in other words,
 type ChunkingBufferMime struct {
 type ChunkingBufferMime struct {
 	chunkingBuffer
 	chunkingBuffer
 	current  *mimeparse.Part
 	current  *mimeparse.Part

+ 6 - 5
chunk/chunk_test.go

@@ -644,15 +644,16 @@ func initTestStream(transform bool) (*StoreMemory, *backends.StreamDecorator, *b
 	bc := backends.BackendConfig{
 	bc := backends.BackendConfig{
 		"stream_processors": {
 		"stream_processors": {
 			"chunksaver": {
 			"chunksaver": {
-				"chunksaver_chunk_size":     8000,
-				"chunksaver_storage_engine": "memory",
-				"chunksaver_compress_level": 0,
+				"chunk_size":     8000,
+				"storage_engine": "memory",
+				"compress_level": 9,
 			},
 			},
 		},
 		},
 	}
 	}
 
 
-	_ = backends.Svc.Initialize(bc)
-
+	//_ = backends.Svc.Initialize(bc)
+	_ = chunksaver.Configure(bc["stream_processors"]["chunksaver"])
+	_ = mimeanalyzer.Configure(backends.ConfigGroup{})
 	// give it the envelope with the parse results
 	// give it the envelope with the parse results
 	_ = chunksaver.Open(e)
 	_ = chunksaver.Open(e)
 	_ = mimeanalyzer.Open(e)
 	_ = mimeanalyzer.Open(e)

+ 61 - 64
chunk/processor.go

@@ -2,6 +2,7 @@ package chunk
 
 
 import (
 import (
 	"errors"
 	"errors"
+	"fmt"
 	"net"
 	"net"
 
 
 	"github.com/flashmob/go-guerrilla/backends"
 	"github.com/flashmob/go-guerrilla/backends"
@@ -13,7 +14,7 @@ import (
 // Processor Name: ChunkSaver
 // Processor Name: ChunkSaver
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
 // Description   : Takes the stream and saves it in chunks. Chunks are split on the
 // Description   : Takes the stream and saves it in chunks. Chunks are split on the
-//               : chunksaver_chunk_size config setting, and also at the end of MIME parts,
+//               : chunk_size config setting, and also at the end of MIME parts,
 //               : and after a header. This allows for basic de-duplication: we can take a
 //               : and after a header. This allows for basic de-duplication: we can take a
 //               : hash of each chunk, then check the database to see if we have it already.
 //               : hash of each chunk, then check the database to see if we have it already.
 //               : We don't need to write it to the database, but take the reference of the
 //               : We don't need to write it to the database, but take the reference of the
@@ -24,7 +25,7 @@ import (
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
 // Requires      : "mimeanalyzer" stream processor to be enabled before it
 // Requires      : "mimeanalyzer" stream processor to be enabled before it
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
-// Config Options: chunksaver_chunk_size - maximum chunk size, in bytes
+// Config Options: chunk_size - maximum chunk size, in bytes
 // --------------:-------------------------------------------------------------------
 // --------------:-------------------------------------------------------------------
 // Input         : e.MimeParts Which is of type *mime.Parts, as populated by "mimeanalyzer"
 // Input         : e.MimeParts Which is of type *mime.Parts, as populated by "mimeanalyzer"
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
@@ -41,12 +42,11 @@ func init() {
 type Config struct {
 type Config struct {
 	// ChunkMaxBytes controls the maximum buffer size for saving
 	// ChunkMaxBytes controls the maximum buffer size for saving
 	// 16KB default.
 	// 16KB default.
-	ChunkMaxBytes int    `json:"chunksaver_chunk_size,omitempty"`
-	StorageEngine string `json:"chunksaver_storage_engine,omitempty"`
-	CompressLevel int    `json:"chunksaver_compress_level,omitempty"`
+	ChunkMaxBytes int    `json:"chunk_size,omitempty"`
+	StorageEngine string `json:"storage_engine,omitempty"`
 }
 }
 
 
-//const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunksaver_chunk_size config setting
+//const chunkMaxBytes = 1024 * 16 // 16Kb is the default, change using chunk_size config setting
 /**
 /**
 *
 *
  * A chunk ends ether:
  * A chunk ends ether:
@@ -57,26 +57,63 @@ type Config struct {
  *
  *
 */
 */
 func Chunksaver() *backends.StreamDecorator {
 func Chunksaver() *backends.StreamDecorator {
+	var (
+		config Config
 
 
-	sd := &backends.StreamDecorator{}
-	sd.Decorate =
-		func(sp backends.StreamProcessor, a ...interface{}) backends.StreamProcessor {
-			var (
-				envelope    *mail.Envelope
-				chunkBuffer *ChunkingBufferMime
-				msgPos      uint
-				database    Storage
-				written     int64
+		envelope    *mail.Envelope
+		chunkBuffer *ChunkingBufferMime
+		msgPos      uint
+		database    Storage
+		written     int64
 
 
-				// just some headers from the first mime-part
-				subject string
-				to      string
-				from    string
+		// just some headers from the first mime-part
+		subject string
+		to      string
+		from    string
 
 
-				progress int // tracks which mime parts were processed
-			)
+		progress int // tracks which mime parts were processed
+	)
+	sd := &backends.StreamDecorator{}
+	sd.Configure = func(cfg backends.ConfigGroup) error {
+		err := sd.ExtractConfig(cfg, &config)
+		if err != nil {
+			return err
+		}
+		if chunkBuffer == nil {
+			chunkBuffer = NewChunkedBytesBufferMime()
+		}
+		// database could be injected when Decorate is called
+		if database == nil {
+			// configure storage if none was injected
+			if config.StorageEngine == "" {
+				return errors.New("storage_engine setting not configured")
+			}
+			if makerFn, ok := StorageEngines[config.StorageEngine]; ok {
+				database = makerFn()
+			} else {
+				return fmt.Errorf("storage engine does not exist [%s]", config.StorageEngine)
+			}
+		}
+		err = database.Initialize(cfg)
+		if err != nil {
+			return err
+		}
+		// configure the chunks buffer
+		if config.ChunkMaxBytes > 0 {
+			chunkBuffer.CapTo(config.ChunkMaxBytes)
+		} else {
+			chunkBuffer.CapTo(chunkMaxBytes)
+		}
+		return nil
+	}
+
+	sd.Shutdown = func() error {
+		err := database.Shutdown()
+		return err
+	}
 
 
-			var config *Config
+	sd.Decorate =
+		func(sp backends.StreamProcessor, a ...interface{}) backends.StreamProcessor {
 			// optional dependency injection (you can pass your own instance of Storage or ChunkingBufferMime)
 			// optional dependency injection (you can pass your own instance of Storage or ChunkingBufferMime)
 			for i := range a {
 			for i := range a {
 				if db, ok := a[i].(Storage); ok {
 				if db, ok := a[i].(Storage); ok {
@@ -86,49 +123,9 @@ func Chunksaver() *backends.StreamDecorator {
 					chunkBuffer = buff
 					chunkBuffer = buff
 				}
 				}
 			}
 			}
-
-			backends.Svc.AddInitializer(backends.InitializeWith(func(backendConfig backends.BackendConfig) error {
-
-				configType := backends.BaseConfig(&Config{})
-				bcfg, err := backends.Svc.ExtractConfig(
-					backends.ConfigStreamProcessors, "chunksaver", backendConfig, configType)
-				if err != nil {
-					return err
-				}
-				config = bcfg.(*Config)
-				if chunkBuffer == nil {
-					chunkBuffer = NewChunkedBytesBufferMime()
-				}
-				// configure storage if none was injected
-				if database == nil {
-					if config.StorageEngine == "memory" {
-						db := new(StoreMemory)
-						db.CompressLevel = config.CompressLevel
-						database = db
-					} else {
-						db := new(StoreSQL)
-						database = db
-					}
-				}
-				err = database.Initialize(backendConfig)
-				if err != nil {
-					return err
-				}
-				// configure the chunks buffer
-				if config.ChunkMaxBytes > 0 {
-					chunkBuffer.CapTo(config.ChunkMaxBytes)
-				} else {
-					chunkBuffer.CapTo(chunkMaxBytes)
-				}
+			if database != nil {
 				chunkBuffer.SetDatabase(database)
 				chunkBuffer.SetDatabase(database)
-
-				return nil
-			}))
-
-			backends.Svc.AddShutdowner(backends.ShutdownWith(func() error {
-				err := database.Shutdown()
-				return err
-			}))
+			}
 
 
 			var writeTo uint
 			var writeTo uint
 			var pos int
 			var pos int

+ 12 - 1
chunk/store.go

@@ -8,6 +8,10 @@ import (
 	"time"
 	"time"
 )
 )
 
 
+func init() {
+	StorageEngines = make(map[string]StorageEngineConstructor)
+}
+
 // Storage defines an interface to the storage layer (the database)
 // Storage defines an interface to the storage layer (the database)
 type Storage interface {
 type Storage interface {
 	// OpenMessage is used to begin saving an email. An email id is returned and used to call CloseMessage later
 	// OpenMessage is used to begin saving an email. An email id is returned and used to call CloseMessage later
@@ -21,11 +25,18 @@ type Storage interface {
 	// GetChunks loads in the specified chunks of bytes from storage
 	// GetChunks loads in the specified chunks of bytes from storage
 	GetChunks(hash ...HashKey) ([]*Chunk, error)
 	GetChunks(hash ...HashKey) ([]*Chunk, error)
 	// Initialize is called when the backend is started
 	// Initialize is called when the backend is started
-	Initialize(cfg backends.BackendConfig) error
+	Initialize(cfg backends.ConfigGroup) error
 	// Shutdown is called when the backend gets shutdown.
 	// Shutdown is called when the backend gets shutdown.
 	Shutdown() (err error)
 	Shutdown() (err error)
 }
 }
 
 
+// StorageEngines contains the constructors for creating instances that implement Storage
+// To add your own Storage, create your own Storage struct, then add your constructor to
+// this `StorageEngines` map. Enable it via the configuration (the `storage_engine` setting)
+var StorageEngines map[string]StorageEngineConstructor
+
+type StorageEngineConstructor func() Storage
+
 // Email represents an email
 // Email represents an email
 type Email struct {
 type Email struct {
 	mailID     uint64
 	mailID     uint64

+ 22 - 2
chunk/store_memory.go

@@ -10,12 +10,23 @@ import (
 	"time"
 	"time"
 )
 )
 
 
+func init() {
+	StorageEngines["memory"] = func() Storage {
+		return new(StoreMemory)
+	}
+}
+
+type storeMemoryConfig struct {
+	CompressLevel int `json:"compress_level,omitempty"`
+}
+
 type StoreMemory struct {
 type StoreMemory struct {
 	chunks        map[HashKey]*memoryChunk
 	chunks        map[HashKey]*memoryChunk
 	emails        []*memoryEmail
 	emails        []*memoryEmail
 	nextID        uint64
 	nextID        uint64
 	offset        uint64
 	offset        uint64
 	CompressLevel int
 	CompressLevel int
+	config        storeMemoryConfig
 }
 }
 
 
 type memoryEmail struct {
 type memoryEmail struct {
@@ -122,12 +133,21 @@ func (m *StoreMemory) AddChunk(data []byte, hash []byte) error {
 }
 }
 
 
 // Initialize implements the Storage interface
 // Initialize implements the Storage interface
-func (m *StoreMemory) Initialize(cfg backends.BackendConfig) error {
+func (m *StoreMemory) Initialize(cfg backends.ConfigGroup) error {
+
+	sd := backends.StreamDecorator{}
+	err := sd.ExtractConfig(cfg, &m.config)
+	if err != nil {
+		return err
+	}
 	m.offset = 1
 	m.offset = 1
 	m.nextID = m.offset
 	m.nextID = m.offset
 	m.emails = make([]*memoryEmail, 0, 100)
 	m.emails = make([]*memoryEmail, 0, 100)
 	m.chunks = make(map[HashKey]*memoryChunk, 1000)
 	m.chunks = make(map[HashKey]*memoryChunk, 1000)
-	m.CompressLevel = zlib.NoCompression
+	if m.config.CompressLevel > 9 || m.config.CompressLevel < 0 {
+		m.config.CompressLevel = zlib.BestCompression
+	}
+	m.CompressLevel = m.config.CompressLevel
 	return nil
 	return nil
 }
 }
 
 

+ 16 - 10
chunk/store_sql.go

@@ -9,17 +9,24 @@ import (
 	"net"
 	"net"
 )
 )
 
 
+func init() {
+	StorageEngines["sql"] = func() Storage {
+		return new(StoreSQL)
+	}
+}
+
 type sqlConfig struct {
 type sqlConfig struct {
-	EmailTable  string `json:"chunksaver_email_table,omitempty"`
-	ChunkTable  string `json:"chunksaver_chunk_table,omitempty"`
-	Driver      string `json:"chunksaver_sql_driver,omitempty"`
-	DSN         string `json:"chunksaver_sql_dsn,omitempty"`
-	PrimaryHost string `json:"chunksaver_primary_mail_host,omitempty"`
+	EmailTable    string `json:"email_table,omitempty"`
+	ChunkTable    string `json:"chunk_table,omitempty"`
+	Driver        string `json:"sql_driver,omitempty"`
+	DSN           string `json:"sql_dsn,omitempty"`
+	PrimaryHost   string `json:"primary_mail_host,omitempty"`
+	CompressLevel int    `json:"compress_level,omitempty"`
 }
 }
 
 
 // StoreSQL implements the Storage interface
 // StoreSQL implements the Storage interface
 type StoreSQL struct {
 type StoreSQL struct {
-	config     *sqlConfig
+	config     sqlConfig
 	statements map[string]*sql.Stmt
 	statements map[string]*sql.Stmt
 	db         *sql.DB
 	db         *sql.DB
 }
 }
@@ -180,13 +187,12 @@ func (s *StoreSQL) CloseMessage(mailID uint64, size int64, partsInfo *PartsInfo,
 }
 }
 
 
 // Initialize loads the specific database config, connects to the db, prepares statements
 // Initialize loads the specific database config, connects to the db, prepares statements
-func (s *StoreSQL) Initialize(cfg backends.BackendConfig) error {
-	configType := backends.BaseConfig(&sqlConfig{})
-	bcfg, err := backends.Svc.ExtractConfig(backends.ConfigStreamProcessors, "chunksaver", cfg, configType)
+func (s *StoreSQL) Initialize(cfg backends.ConfigGroup) error {
+	sd := backends.StreamDecorator{}
+	err := sd.ExtractConfig(cfg, &s.config)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	s.config = bcfg.(*sqlConfig)
 	s.db, err = s.connect()
 	s.db, err = s.connect()
 	if err != nil {
 	if err != nil {
 		return err
 		return err

+ 3 - 0
server.go

@@ -595,6 +595,9 @@ func (s *server) handleClient(client *client) {
 			if be.StreamOn() {
 			if be.StreamOn() {
 				// process the message as a stream
 				// process the message as a stream
 				res, err = be.ProcessStream(client.smtpReader.DotReader(), client.Envelope)
 				res, err = be.ProcessStream(client.smtpReader.DotReader(), client.Envelope)
+				if err == nil && res.Code() < 300 {
+					be.ProcessBackground(client.Envelope)
+				}
 			} else {
 			} else {
 				// or buffer the entire message (parse headers & mime structure as we go along)
 				// or buffer the entire message (parse headers & mime structure as we go along)
 				n, err = client.Data.ReadFrom(client.smtpReader)
 				n, err = client.Data.ReadFrom(client.smtpReader)