flashmob 6 years ago
parent
commit
cf7c68c5ed

+ 41 - 0
api_test.go

@@ -709,3 +709,44 @@ func TestCustomBackendResult(t *testing.T) {
 	}
 
 }
+
+func TestStreamProcessor(t *testing.T) {
+	if err := os.Truncate("tests/testlog", 0); err != nil {
+		t.Error(err)
+	}
+	cfg := &AppConfig{
+		LogFile:      "tests/testlog",
+		AllowedHosts: []string{"grr.la"},
+		BackendConfig: backends.BackendConfig{
+			"save_process":        "HeadersParser|Debugger",
+			"stream_save_process": "Header",
+		},
+	}
+	d := Daemon{Config: cfg}
+	//d.AddProcessor("Custom", customBackend2)
+
+	if err := d.Start(); err != nil {
+		t.Error(err)
+	}
+	// lets have a talk with the server
+	if err := talkToServer("127.0.0.1:2525"); err != nil {
+		t.Error(err)
+	}
+
+	d.Shutdown()
+
+	b, err := ioutil.ReadFile("tests/testlog")
+	if err != nil {
+		t.Error("could not read logfile")
+		return
+	}
+	// lets check for fingerprints
+	if strings.Index(string(b), "451 4.3.0 Error") < 0 {
+		t.Error("did not log: 451 4.3.0 Error")
+	}
+
+	if strings.Index(string(b), "system shock") < 0 {
+		t.Error("did not log: system shock")
+	}
+
+}

+ 23 - 7
backends/backend.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"github.com/flashmob/go-guerrilla/log"
 	"github.com/flashmob/go-guerrilla/mail"
+	"io"
 	"reflect"
 	"strconv"
 	"strings"
@@ -24,6 +25,7 @@ var (
 func init() {
 	Svc = &service{}
 	processors = make(map[string]ProcessorConstructor)
+	streamers = make(map[string]StreamProcessorConstructor)
 }
 
 type ProcessorConstructor func() Decorator
@@ -37,6 +39,10 @@ type Backend interface {
 	Process(*mail.Envelope) Result
 	// ValidateRcpt validates the last recipient that was pushed to the mail envelope
 	ValidateRcpt(e *mail.Envelope) RcptError
+	// ProcessStream is the alternative for Process, a stream is read from io.Reader
+	ProcessStream(r io.Reader, e *mail.Envelope) (Result, error)
+	// StreamOn signals if ProcessStream can be used
+	StreamOn() bool
 	// Initializes the backend, eg. creates folders, sets-up database connections
 	Initialize(BackendConfig) error
 	// Initializes the backend after it was Shutdown()
@@ -91,19 +97,19 @@ func (r *result) Code() int {
 	return code
 }
 
-func NewResult(r ...interface{}) Result {
-	buf := new(result)
-	for _, item := range r {
+func NewResult(param ...interface{}) Result {
+	r := new(result)
+	for _, item := range param {
 		switch v := item.(type) {
 		case error:
-			_, _ = buf.WriteString(v.Error())
+			_, _ = r.WriteString(v.Error())
 		case fmt.Stringer:
-			_, _ = buf.WriteString(v.String())
+			_, _ = r.WriteString(v.String())
 		case string:
-			_, _ = buf.WriteString(v)
+			_, _ = r.WriteString(v)
 		}
 	}
-	return buf
+	return r
 }
 
 type processorInitializer interface {
@@ -240,6 +246,16 @@ func (s *service) AddProcessor(name string, p ProcessorConstructor) {
 	processors[strings.ToLower(name)] = c
 }
 
+func (s *service) AddStreamProcessor(name string, p StreamProcessorConstructor) {
+	// wrap in a constructor since we want to defer calling it
+	var c StreamProcessorConstructor
+	c = func() StreamDecorator {
+		return p()
+	}
+	// add to our processors list
+	streamers[strings.ToLower(name)] = c
+}
+
 // extractConfig loads the backend config. It has already been unmarshalled
 // configData contains data from the main config file's "backend_config" value
 // configType is a Processor's specific config value.

+ 165 - 6
backends/gateway.go

@@ -3,6 +3,7 @@ package backends
 import (
 	"errors"
 	"fmt"
+	"io"
 	"strconv"
 	"sync"
 	"time"
@@ -29,6 +30,7 @@ type BackendGateway struct {
 	workStoppers []chan bool
 	processors   []Processor
 	validators   []Processor
+	streamers    []streamer
 
 	// controls access to state
 	sync.Mutex
@@ -48,6 +50,8 @@ type GatewayConfig struct {
 	TimeoutSave string `json:"gw_save_timeout,omitempty"`
 	// TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
 	TimeoutValidateRcpt string `json:"gw_val_rcpt_timeout,omitempty"`
+	// StreamSaveProcess is same as a ProcessorStack, but reads from an io.Reader to write email data
+	StreamSaveProcess string `json:"stream_save_process,omitempty"`
 }
 
 // workerMsg is what get placed on the BackendGateway.saveMailChan channel
@@ -58,6 +62,43 @@ type workerMsg struct {
 	notifyMe chan *notifyMsg
 	// select the task type
 	task SelectTask
+	// io.Reader for streamed processor
+	r io.Reader
+}
+
+type streamer struct {
+	// StreamProcessor is a chain of StreamProcessor
+	sp StreamProcessor
+	// so that we can call Open and Close
+	d []StreamDecorator
+}
+
+func (s streamer) Write(p []byte) (n int, err error) {
+	return s.sp.Write(p)
+}
+
+func (s *streamer) open(e *mail.Envelope) Errors {
+	var err Errors
+	for i := range s.d {
+		if s.d[i].Open != nil {
+			if e := s.d[i].Open(e); e != nil {
+				err = append(err, e)
+			}
+		}
+	}
+	return err
+}
+
+func (s *streamer) close() Errors {
+	var err Errors
+	for i := range s.d {
+		if s.d[i].Close != nil {
+			if e := s.d[i].Close(); e != nil {
+				err = append(err, e)
+			}
+		}
+	}
+	return err
 }
 
 type backendState int
@@ -132,6 +173,7 @@ func (gw *BackendGateway) Process(e *mail.Envelope) Result {
 	}
 	// borrow a workerMsg from the pool
 	workerMsg := workerMsgPool.Get().(*workerMsg)
+	defer workerMsgPool.Put(workerMsg)
 	workerMsg.reset(e, TaskSaveMail)
 	// place on the channel so that one of the save mail workers can pick it up
 	gw.conveyor <- workerMsg
@@ -172,7 +214,6 @@ func (gw *BackendGateway) Process(e *mail.Envelope) Result {
 			// keep waiting for the backend to finish processing
 			<-workerMsg.notifyMe
 			e.Unlock()
-			workerMsgPool.Put(workerMsg)
 		}()
 		return NewResult(response.Canned.FailBackendTimeout)
 	}
@@ -190,13 +231,13 @@ func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
 	}
 	// place on the channel so that one of the save mail workers can pick it up
 	workerMsg := workerMsgPool.Get().(*workerMsg)
+	defer workerMsgPool.Put(workerMsg)
 	workerMsg.reset(e, TaskValidateRcpt)
 	gw.conveyor <- workerMsg
 	// wait for the validation to complete
 	// or timeout
 	select {
 	case status := <-workerMsg.notifyMe:
-		workerMsgPool.Put(workerMsg)
 		if status.err != nil {
 			return status.err
 		}
@@ -207,13 +248,71 @@ func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
 		go func() {
 			<-workerMsg.notifyMe
 			e.Unlock()
-			workerMsgPool.Put(workerMsg)
 			Log().Error("Backend has timed out while validating rcpt")
 		}()
 		return StorageTimeout
 	}
 }
 
+func (gw *BackendGateway) StreamOn() bool {
+	return len(gw.gwConfig.StreamSaveProcess) != 0
+}
+
+func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result, error) {
+	res := response.Canned
+	if gw.State != BackendStateRunning {
+		return NewResult(res.FailBackendNotRunning, response.SP, gw.State), errors.New(res.FailBackendNotRunning.String())
+	}
+	// borrow a workerMsg from the pool
+	workerMsg := workerMsgPool.Get().(*workerMsg)
+	workerMsg.reset(e, TaskSaveMailStream)
+	workerMsg.r = r
+	// place on the channel so that one of the save mail workers can pick it up
+	gw.conveyor <- workerMsg
+	// wait for the save to complete
+	// or timeout
+	select {
+	case status := <-workerMsg.notifyMe:
+		// email saving transaction completed
+		if status.result == BackendResultOK && status.queuedID != "" {
+			return NewResult(res.SuccessMessageQueued, response.SP, status.queuedID), status.err
+		}
+
+		// A custom result, there was probably an error, if so, log it
+		if status.result != nil {
+			if status.err != nil {
+				Log().Error(status.err)
+			}
+			return status.result, status.err
+		}
+
+		// if there was no result, but there's an error, then make a new result from the error
+		if status.err != nil {
+			if _, err := strconv.Atoi(status.err.Error()[:3]); err != nil {
+				return NewResult(res.FailBackendTransaction, response.SP, status.err), status.err
+			}
+			return NewResult(status.err), status.err
+		}
+
+		// both result & error are nil (should not happen)
+		err := errors.New("no response from backend - processor did not return a result or an error")
+		Log().Error(err)
+		return NewResult(res.FailBackendTransaction, response.SP, err), err
+
+	case <-time.After(gw.saveTimeout()):
+		Log().Error("Backend has timed out while saving email")
+		e.Lock() // lock the envelope - it's still processing here, we don't want the server to recycle it
+		go func() {
+			// keep waiting for the backend to finish processing
+			<-workerMsg.notifyMe
+			e.Unlock()
+			workerMsgPool.Put(workerMsg)
+		}()
+		return NewResult(res.FailBackendTimeout), errors.New("gateway timeout")
+	}
+
+}
+
 // Shutdown shuts down the backend and leaves it in BackendStateShuttered state
 func (gw *BackendGateway) Shutdown() error {
 	gw.Lock()
@@ -257,7 +356,6 @@ func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 	var decorators []Decorator
 	cfg := strings.ToLower(strings.TrimSpace(stackConfig))
 	if len(cfg) == 0 {
-		//cfg = strings.ToLower(defaultProcessor)
 		return NoopProcessor{}, nil
 	}
 	items := strings.Split(cfg, "|")
@@ -275,6 +373,30 @@ func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 	return p, nil
 }
 
+func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
+	var decorators []StreamDecorator
+	cfg := strings.ToLower(strings.TrimSpace(stackConfig))
+	if len(cfg) == 0 {
+
+		return streamer{NoopStreamProcessor{}, decorators}, nil
+	}
+	items := strings.Split(cfg, "|")
+	for i := range items {
+		name := items[len(items)-1-i] // reverse order, since decorators are stacked
+		if makeFunc, ok := streamers[name]; ok {
+			emmy := makeFunc()
+			decorators = append(decorators, emmy)
+		} else {
+			ErrProcessorNotFound = errors.New(fmt.Sprintf("stream processor [%s] not found", name))
+			return streamer{nil, decorators}, ErrProcessorNotFound
+		}
+	}
+
+	// build the call-stack of decorators
+	sp := DecorateStream(DefaultStreamProcessor{}, decorators...)
+	return streamer{sp, decorators}, nil
+}
+
 // loadConfig loads the config for the GatewayConfig
 func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
 	configType := BaseConfig(&GatewayConfig{})
@@ -308,6 +430,7 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 	}
 	gw.processors = make([]Processor, 0)
 	gw.validators = make([]Processor, 0)
+	gw.streamers = make([]streamer, 0)
 	for i := 0; i < workersSize; i++ {
 		p, err := gw.newStack(gw.gwConfig.SaveProcess)
 		if err != nil {
@@ -322,6 +445,14 @@ func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
 			return err
 		}
 		gw.validators = append(gw.validators, v)
+
+		s, err := gw.newStreamStack(gw.gwConfig.StreamSaveProcess)
+		if err != nil {
+			gw.State = BackendStateError
+			return err
+		}
+
+		gw.streamers = append(gw.streamers, s)
 	}
 	// initialize processors
 	if err := Svc.initialize(cfg); err != nil {
@@ -357,6 +488,7 @@ func (gw *BackendGateway) Start() error {
 						gw.conveyor,
 						gw.processors[workerId],
 						gw.validators[workerId],
+						gw.streamers[workerId],
 						workerId+1,
 						stop)
 					// keep running after panic
@@ -422,6 +554,7 @@ func (gw *BackendGateway) workDispatcher(
 	workIn chan *workerMsg,
 	save Processor,
 	validate Processor,
+	stream streamer,
 	workerId int,
 	stop chan bool) (state dispatcherState) {
 
@@ -455,11 +588,37 @@ func (gw *BackendGateway) workDispatcher(
 			return
 		case msg = <-workIn:
 			state = dispatcherStateWorking // recovers from panic if in this state
-			result, err := save.Process(msg.e, msg.task)
-			state = dispatcherStateNotify
 			if msg.task == TaskSaveMail {
+				result, err := save.Process(msg.e, msg.task)
+				state = dispatcherStateNotify
+				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
+			} else if msg.task == TaskSaveMailStream {
+
+				err := stream.open(msg.e)
+				if err == nil {
+					N, copyErr := io.Copy(stream, msg.r)
+					if copyErr != nil {
+						err = append(err, copyErr)
+					}
+					msg.e.Values["size"] = N
+
+					closeErr := stream.close()
+					if closeErr != nil {
+						err = append(err, copyErr)
+					}
+				}
+
+				state = dispatcherStateNotify
+				var result Result
+				if err != nil {
+					result = NewResult(response.Canned.SuccessMessageQueued, response.SP, msg.e.QueuedId)
+				} else {
+					result = NewResult(response.Canned.FailBackendTransaction, err)
+				}
 				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
 			} else {
+				result, err := validate.Process(msg.e, msg.task)
+				state = dispatcherStateNotify
 				msg.notifyMe <- &notifyMsg{err: err, result: result}
 			}
 		}

+ 44 - 0
backends/gateway_stram.go

@@ -0,0 +1,44 @@
+package backends
+
+import (
+	"github.com/flashmob/go-guerrilla/log"
+	"io"
+)
+
+type StreamBackendGateway struct {
+	BackendGateway
+
+	config *StreamBackendConfig
+
+	pr *io.PipeReader
+	pw *io.PipeWriter
+}
+
+type StreamBackendConfig struct {
+	StreamSaveProcess string `json:"stream_save_process,omitempty"`
+}
+
+func NewStreamBackend(backendConfig BackendConfig, l log.Logger) (Backend, error) {
+	b, err := New(backendConfig, l)
+	if err != nil {
+		return b, err
+	}
+	if bg, ok := b.(*BackendGateway); ok {
+		sb := new(StreamBackendGateway)
+		sb.BackendGateway = *bg
+		return sb, nil
+	}
+	return b, err
+
+}
+
+func (gw *StreamBackendGateway) loadConfig(backendConfig BackendConfig) (err error) {
+	configType := BaseConfig(&StreamBackendConfig{})
+	bcfg, err := Svc.ExtractConfig(backendConfig, configType)
+	if err != nil {
+		return err
+	}
+	m := bcfg.(*StreamBackendConfig)
+	gw.config = m
+	return nil
+}

+ 23 - 0
backends/processor.go

@@ -2,6 +2,7 @@ package backends
 
 import (
 	"github.com/flashmob/go-guerrilla/mail"
+	"io"
 )
 
 type SelectTask int
@@ -9,6 +10,7 @@ type SelectTask int
 const (
 	TaskSaveMail SelectTask = iota
 	TaskValidateRcpt
+	TaskSaveMailStream
 )
 
 func (o SelectTask) String() string {
@@ -17,6 +19,8 @@ func (o SelectTask) String() string {
 		return "save mail"
 	case TaskValidateRcpt:
 		return "validate recipient"
+	case TaskSaveMailStream:
+		return "save mail stream"
 	}
 	return "[unnamed task]"
 }
@@ -49,3 +53,22 @@ func (w DefaultProcessor) Process(e *mail.Envelope, task SelectTask) (Result, er
 
 // if no processors specified, skip operation
 type NoopProcessor struct{ DefaultProcessor }
+
+type StreamProcessor interface {
+	io.Writer
+}
+
+type StreamProcessWith func(p []byte) (n int, err error)
+
+func (f StreamProcessWith) Write(p []byte) (n int, err error) {
+	// delegate to the anonymous function
+	return f(p)
+}
+
+type DefaultStreamProcessor struct{}
+
+func (w DefaultStreamProcessor) Write(p []byte) (n int, err error) {
+	return 0, nil
+}
+
+type NoopStreamProcessor struct{ DefaultStreamProcessor }

+ 43 - 0
backends/s_compress.go

@@ -0,0 +1,43 @@
+package backends
+
+import (
+	"compress/zlib"
+	"github.com/flashmob/go-guerrilla/mail"
+	"io"
+)
+
+func init() {
+	streamers["compress"] = func() StreamDecorator {
+		return StreamCompress()
+	}
+}
+
+func StreamCompress() StreamDecorator {
+	sd := StreamDecorator{}
+	sd.p =
+		func(sp StreamProcessor) StreamProcessor {
+			var zw io.WriteCloser
+			sd.Open = func(e *mail.Envelope) error {
+				var err error
+				zw, err = zlib.NewWriterLevel(sp, zlib.BestSpeed)
+				return err
+			}
+
+			sd.Close = func() error {
+				return zw.Close()
+			}
+
+			return StreamProcessWith(zw.Write)
+			/*
+				return StreamProcessWith(func(p []byte) (n int, err error) {
+					var buf bytes.Buffer
+					if n, err := io.Copy(w, bytes.NewReader(p)); err != nil {
+						return int(n), err
+					}
+					return sp.Write(buf.Bytes())
+				})
+			*/
+
+		}
+	return sd
+}

+ 98 - 0
backends/s_header.go

@@ -0,0 +1,98 @@
+package backends
+
+import (
+	"github.com/flashmob/go-guerrilla/mail"
+	"io"
+	"strings"
+	"time"
+)
+
+func init() {
+	streamers["header"] = func() StreamDecorator {
+		return *StreamHeader()
+	}
+}
+
+type streamHeader struct {
+	addHead []byte
+	w       io.Writer
+	i       int
+}
+
+func newStreamHeader(w io.Writer) *streamHeader {
+	sc := new(streamHeader)
+	sc.w = w
+	return sc
+}
+
+func (sh *streamHeader) addHeader(e *mail.Envelope, config HeaderConfig) {
+	to := strings.TrimSpace(e.RcptTo[0].User) + "@" + config.PrimaryHost
+	hash := "unknown"
+	if len(e.Hashes) > 0 {
+		hash = e.Hashes[0]
+	}
+	var addHead string
+	addHead += "Delivered-To: " + to + "\n"
+	addHead += "Received: from " + e.Helo + " (" + e.Helo + "  [" + e.RemoteIP + "])\n"
+	if len(e.RcptTo) > 0 {
+		addHead += "	by " + e.RcptTo[0].Host + " with SMTP id " + hash + "@" + e.RcptTo[0].Host + ";\n"
+	}
+	addHead += "	" + time.Now().Format(time.RFC1123Z) + "\n"
+
+	sh.addHead = []byte(addHead)
+}
+
+func (sh *streamHeader) Write(p []byte) (n int, err error) {
+	if sh.i < len(sh.addHead) {
+		for {
+			if N, err := sh.w.Write(sh.addHead[sh.i:]); err != nil {
+				return N, err
+			} else {
+				sh.i += N
+				if sh.i >= len(sh.addHead) {
+					break
+				}
+			}
+		}
+	}
+	return sh.w.Write(p)
+}
+
+func StreamHeader() *StreamDecorator {
+	sd := &StreamDecorator{}
+	sd.p =
+
+		func(sp StreamProcessor) StreamProcessor {
+			var dc *streamHeader
+			x := 1 + 5
+			_ = x
+			sd.Open = func(e *mail.Envelope) error {
+				dc = newStreamHeader(sp)
+				hc := HeaderConfig{"sharklasers.com"}
+				dc.addHeader(e, hc)
+				return nil
+			}
+			return StreamProcessWith(func(p []byte) (int, error) {
+
+				return sp.Write(p)
+			})
+		}
+
+		/*
+			func(sp StreamProcessor) StreamProcessor {
+				var dc *streamHeader
+
+				sd.Open = func(e *mail.Envelope) error {
+					dc = newStreamHeader(sp)
+					hc := HeaderConfig{"sharklasers.com"}
+					dc.addHeader(e, hc)
+					return nil
+				}
+
+				return StreamProcessWith(dc.Write)
+
+
+			}
+		*/
+	return sd
+}

+ 120 - 0
backends/stream.go

@@ -0,0 +1,120 @@
+package backends
+
+import (
+	"bytes"
+	"compress/zlib"
+	"io"
+)
+
+func init() {
+	streamers["compressor"] = func() StreamDecorator {
+		return StreamTest()
+	}
+}
+
+type streamCompressor struct {
+	zw *zlib.Writer
+}
+
+func newStreamCompressor(w io.Writer) io.Writer {
+	sc := new(streamCompressor)
+	sc.zw, _ = zlib.NewWriterLevel(w, zlib.BestSpeed)
+	return sc
+}
+func (sc *streamCompressor) Close() error {
+	return sc.zw.Close()
+}
+func (sc *streamCompressor) Write(p []byte) (n int, err error) {
+	N, err := sc.zw.Write(p)
+	return N, err
+}
+
+func newStreamDecompresser(w io.Writer) io.Writer {
+	sc := new(streamDecompressor)
+	sc.w = w
+	sc.pr, sc.pw = io.Pipe()
+	go sc.consumer()
+	return sc
+}
+
+type streamDecompressor struct {
+	w  io.Writer
+	zr io.ReadCloser
+
+	pr  *io.PipeReader
+	pw  *io.PipeWriter
+	zr2 io.ReadCloser
+}
+
+func (sc *streamDecompressor) Close() error {
+
+	errR := sc.pr.Close()
+	errW := sc.pw.Close()
+	if err := sc.zr.Close(); err != nil {
+		return err
+	}
+	if errR != nil {
+		return errR
+	}
+	if errW != nil {
+		return errW
+	}
+
+	return nil
+}
+
+func (sc *streamDecompressor) Write(p []byte) (n int, err error) {
+
+	N, err := io.Copy(sc.pw, bytes.NewReader(p))
+	if N > 0 {
+		n = int(N)
+	}
+	return
+}
+
+func (sc *streamDecompressor) consumer() {
+	var err error
+	for {
+		if sc.zr == nil {
+			sc.zr, err = zlib.NewReader(sc.pr)
+			if err != nil {
+				_ = sc.pr.CloseWithError(err)
+				return
+			}
+		}
+
+		_, err := io.Copy(sc.w, sc.zr)
+		if err != nil {
+			_ = sc.pr.CloseWithError(err)
+			return
+		}
+	}
+}
+
+func StreamTest() StreamDecorator {
+	sd := StreamDecorator{}
+	sd.p =
+		func(sp StreamProcessor) StreamProcessor {
+
+			dc := newStreamDecompresser(sp)
+			sd.Close = func() error {
+				if c, ok := dc.(io.Closer); ok {
+					return c.Close()
+				}
+				return nil
+			}
+
+			return StreamProcessWith(dc.Write)
+			/*
+				return StreamProcessWith(func(p []byte) (n int, err error) {
+					var buf bytes.Buffer
+					if n, err := io.Copy(w, bytes.NewReader(p)); err != nil {
+						return int(n), err
+					}
+					return sp.Write(buf.Bytes())
+				})
+			*/
+
+		}
+	return sd
+}

+ 27 - 0
backends/stream_backend.go

@@ -0,0 +1,27 @@
+package backends
+
+var (
+	streamers map[string]StreamProcessorConstructor
+)
+
+func init() {
+
+}
+
+type processorCloser interface {
+	Close() error
+}
+
+type CloseWith func() error
+
+// satisfy processorCloser interface
+func (c CloseWith) Close() error {
+	// delegate
+	return c()
+}
+
+type StreamProcessorConstructor func() StreamDecorator
+
+type streamService struct {
+	service
+}

+ 33 - 0
backends/stream_decorate.go

@@ -0,0 +1,33 @@
+package backends
+
+import "github.com/flashmob/go-guerrilla/mail"
+
+type streamOpenWith func(e *mail.Envelope) error
+
+type streamCloseWith func() error
+
+// We define what a decorator to our processor will look like
+type StreamDecorator struct {
+	p     func(StreamProcessor) StreamProcessor
+	e     *mail.Envelope
+	Close streamCloseWith
+	Open  streamOpenWith
+}
+
+// DecorateStream will decorate a StreamProcessor with a slice of passed decorators
+func DecorateStream(c StreamProcessor, ds ...StreamDecorator) StreamProcessor {
+	decorated := c
+	for _, decorate := range ds {
+		decorated = decorate.p(decorated)
+	}
+	return decorated
+}
+
+func (sd *StreamDecorator) OpenX(e *mail.Envelope) error {
+	sd.e = e
+	return nil
+}
+
+func (sd *StreamDecorator) Closex() error {
+	return nil
+}

+ 45 - 0
backends/stream_test.go

@@ -0,0 +1,45 @@
+package backends
+
+import (
+	"bytes"
+	"fmt"
+	"github.com/flashmob/go-guerrilla/mail"
+	"io"
+	"testing"
+)
+
+func TestStream(t *testing.T) {
+
+	var e = mail.Envelope{
+		RcptTo:   []mail.Address{{User: "test", Host: "example.com"}},
+		Helo:     "a.cool.host.com",
+		RemoteIP: "6.6.4.4",
+	}
+	hc := HeaderConfig{"sharklasers.com"}
+
+	var buf bytes.Buffer
+	dc := newStreamDecompresser(&buf)
+	comp := newStreamCompressor(dc)
+
+	s := newStreamHeader(comp)
+	s.addHeader(&e, hc)
+
+	n, err := io.Copy(s, bytes.NewBufferString("testing123"))
+	if err != nil {
+		t.Error(err, n)
+	}
+
+	if wc, ok := comp.(io.WriteCloser); ok {
+		err = wc.Close()
+		fmt.Println("err1", err)
+	}
+
+	if wcec, ok := dc.(io.WriteCloser); ok {
+		err = wcec.Close()
+		fmt.Println("err2", err)
+	}
+
+	fmt.Println((buf.String()))
+
+	//time.Sleep(time.Second * 10)
+}

+ 20 - 4
server.go

@@ -542,11 +542,24 @@ func (s *server) handleClient(client *client) {
 			// intentionally placed the limit 1MB above so that reading does not return with an error
 			// if the client goes a little over. Anything above will err
 			client.bufin.setLimit(int64(sc.MaxSize) + 1024000) // This a hard limit.
+			be := s.backend()
+			var (
+				n   int64
+				err error
+				res backends.Result
+			)
+			if be.StreamOn() {
+				// process the message as a stream
+				res, err = be.ProcessStream(client.smtpReader.DotReader(), client.Envelope)
 
-			n, err := client.Data.ReadFrom(client.smtpReader.DotReader())
-			if n > sc.MaxSize {
-				err = fmt.Errorf("maximum DATA size exceeded (%d)", sc.MaxSize)
+			} else {
+				// or buffer the entire message
+				n, err = client.Data.ReadFrom(client.smtpReader.DotReader())
+				if n > sc.MaxSize {
+					err = fmt.Errorf("maximum DATA size exceeded (%d)", sc.MaxSize)
+				}
 			}
+
 			if err != nil {
 				if err == LineLimitExceeded {
 					client.sendResponse(r.FailReadLimitExceededDataCmd, " ", LineLimitExceeded.Error())
@@ -563,7 +576,10 @@ func (s *server) handleClient(client *client) {
 				break
 			}
 
-			res := s.backend().Process(client.Envelope)
+			if !be.StreamOn() {
+				res = be.Process(client.Envelope)
+			}
+
 			if res.Code() < 300 {
 				client.messagesSent++
 			}