flashmob 6 years ago
parent
commit
0b7ccf83ff

+ 6 - 6
api_test.go

@@ -719,11 +719,10 @@ func TestStreamProcessor(t *testing.T) {
 		AllowedHosts: []string{"grr.la"},
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
 			"save_process":        "HeadersParser|Debugger",
 			"save_process":        "HeadersParser|Debugger",
-			"stream_save_process": "Header",
+			"stream_save_process": "Header|compress|Decompress|debug",
 		},
 		},
 	}
 	}
 	d := Daemon{Config: cfg}
 	d := Daemon{Config: cfg}
-	//d.AddProcessor("Custom", customBackend2)
 
 
 	if err := d.Start(); err != nil {
 	if err := d.Start(); err != nil {
 		t.Error(err)
 		t.Error(err)
@@ -740,13 +739,14 @@ func TestStreamProcessor(t *testing.T) {
 		t.Error("could not read logfile")
 		t.Error("could not read logfile")
 		return
 		return
 	}
 	}
+
 	// lets check for fingerprints
 	// lets check for fingerprints
-	if strings.Index(string(b), "451 4.3.0 Error") < 0 {
-		t.Error("did not log: 451 4.3.0 Error")
+	if strings.Index(string(b), "Debug stream") < 0 {
+		t.Error("did not log: Debug stream")
 	}
 	}
 
 
-	if strings.Index(string(b), "system shock") < 0 {
-		t.Error("did not log: system shock")
+	if strings.Index(string(b), "Error") == -1 {
+		t.Error("There was an error", string(b))
 	}
 	}
 
 
 }
 }

+ 2 - 2
backends/backend.go

@@ -201,7 +201,7 @@ func (s *service) reset() {
 func (s *service) initialize(backend BackendConfig) Errors {
 func (s *service) initialize(backend BackendConfig) Errors {
 	s.Lock()
 	s.Lock()
 	defer s.Unlock()
 	defer s.Unlock()
-	var errors Errors
+	var errors Errors = nil
 	failed := make([]processorInitializer, 0)
 	failed := make([]processorInitializer, 0)
 	for i := range s.initializers {
 	for i := range s.initializers {
 		if err := s.initializers[i].Initialize(backend); err != nil {
 		if err := s.initializers[i].Initialize(backend); err != nil {
@@ -249,7 +249,7 @@ func (s *service) AddProcessor(name string, p ProcessorConstructor) {
 func (s *service) AddStreamProcessor(name string, p StreamProcessorConstructor) {
 func (s *service) AddStreamProcessor(name string, p StreamProcessorConstructor) {
 	// wrap in a constructor since we want to defer calling it
 	// wrap in a constructor since we want to defer calling it
 	var c StreamProcessorConstructor
 	var c StreamProcessorConstructor
-	c = func() StreamDecorator {
+	c = func() *StreamDecorator {
 		return p()
 		return p()
 	}
 	}
 	// add to our processors list
 	// add to our processors list

+ 15 - 19
backends/gateway.go

@@ -70,7 +70,7 @@ type streamer struct {
 	// StreamProcessor is a chain of StreamProcessor
 	// StreamProcessor is a chain of StreamProcessor
 	sp StreamProcessor
 	sp StreamProcessor
 	// so that we can call Open and Close
 	// so that we can call Open and Close
-	d []StreamDecorator
+	d []*StreamDecorator
 }
 }
 
 
 func (s streamer) Write(p []byte) (n int, err error) {
 func (s streamer) Write(p []byte) (n int, err error) {
@@ -78,7 +78,7 @@ func (s streamer) Write(p []byte) (n int, err error) {
 }
 }
 
 
 func (s *streamer) open(e *mail.Envelope) Errors {
 func (s *streamer) open(e *mail.Envelope) Errors {
-	var err Errors
+	var err Errors = nil
 	for i := range s.d {
 	for i := range s.d {
 		if s.d[i].Open != nil {
 		if s.d[i].Open != nil {
 			if e := s.d[i].Open(e); e != nil {
 			if e := s.d[i].Open(e); e != nil {
@@ -90,14 +90,16 @@ func (s *streamer) open(e *mail.Envelope) Errors {
 }
 }
 
 
 func (s *streamer) close() Errors {
 func (s *streamer) close() Errors {
-	var err Errors
-	for i := range s.d {
+	var err Errors = nil
+	// close in reverse order
+	for i := len(s.d) - 1; i >= 0; i-- {
 		if s.d[i].Close != nil {
 		if s.d[i].Close != nil {
 			if e := s.d[i].Close(); e != nil {
 			if e := s.d[i].Close(); e != nil {
 				err = append(err, e)
 				err = append(err, e)
 			}
 			}
 		}
 		}
 	}
 	}
+
 	return err
 	return err
 }
 }
 
 
@@ -374,7 +376,7 @@ func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
 }
 }
 
 
 func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
-	var decorators []StreamDecorator
+	var decorators []*StreamDecorator
 	cfg := strings.ToLower(strings.TrimSpace(stackConfig))
 	cfg := strings.ToLower(strings.TrimSpace(stackConfig))
 	if len(cfg) == 0 {
 	if len(cfg) == 0 {
 
 
@@ -384,8 +386,7 @@ func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 	for i := range items {
 	for i := range items {
 		name := items[len(items)-1-i] // reverse order, since decorators are stacked
 		name := items[len(items)-1-i] // reverse order, since decorators are stacked
 		if makeFunc, ok := streamers[name]; ok {
 		if makeFunc, ok := streamers[name]; ok {
-			emmy := makeFunc()
-			decorators = append(decorators, emmy)
+			decorators = append(decorators, makeFunc())
 		} else {
 		} else {
 			ErrProcessorNotFound = errors.New(fmt.Sprintf("stream processor [%s] not found", name))
 			ErrProcessorNotFound = errors.New(fmt.Sprintf("stream processor [%s] not found", name))
 			return streamer{nil, decorators}, ErrProcessorNotFound
 			return streamer{nil, decorators}, ErrProcessorNotFound
@@ -393,7 +394,7 @@ func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
 	}
 	}
 
 
 	// build the call-stack of decorators
 	// build the call-stack of decorators
-	sp := DecorateStream(DefaultStreamProcessor{}, decorators...)
+	sp, decorators := DecorateStream(&DefaultStreamProcessor{}, decorators)
 	return streamer{sp, decorators}, nil
 	return streamer{sp, decorators}, nil
 }
 }
 
 
@@ -593,27 +594,22 @@ func (gw *BackendGateway) workDispatcher(
 				state = dispatcherStateNotify
 				state = dispatcherStateNotify
 				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
 				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
 			} else if msg.task == TaskSaveMailStream {
 			} else if msg.task == TaskSaveMailStream {
-
 				err := stream.open(msg.e)
 				err := stream.open(msg.e)
 				if err == nil {
 				if err == nil {
-					N, copyErr := io.Copy(stream, msg.r)
-					if copyErr != nil {
+					if N, copyErr := io.Copy(stream, msg.r); copyErr != nil {
 						err = append(err, copyErr)
 						err = append(err, copyErr)
+						msg.e.Values["size"] = N
 					}
 					}
-					msg.e.Values["size"] = N
-
-					closeErr := stream.close()
-					if closeErr != nil {
-						err = append(err, copyErr)
+					if closeErr := stream.close(); closeErr != nil {
+						err = append(err, closeErr)
 					}
 					}
 				}
 				}
-
 				state = dispatcherStateNotify
 				state = dispatcherStateNotify
 				var result Result
 				var result Result
 				if err != nil {
 				if err != nil {
-					result = NewResult(response.Canned.SuccessMessageQueued, response.SP, msg.e.QueuedId)
-				} else {
 					result = NewResult(response.Canned.FailBackendTransaction, err)
 					result = NewResult(response.Canned.FailBackendTransaction, err)
+				} else {
+					result = NewResult(response.Canned.SuccessMessageQueued, response.SP, msg.e.QueuedId)
 				}
 				}
 				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
 				msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
 			} else {
 			} else {

+ 7 - 13
backends/s_compress.go

@@ -7,16 +7,17 @@ import (
 )
 )
 
 
 func init() {
 func init() {
-	streamers["compress"] = func() StreamDecorator {
+	streamers["compress"] = func() *StreamDecorator {
 		return StreamCompress()
 		return StreamCompress()
 	}
 	}
 }
 }
 
 
-func StreamCompress() StreamDecorator {
-	sd := StreamDecorator{}
+func StreamCompress() *StreamDecorator {
+	sd := &StreamDecorator{}
 	sd.p =
 	sd.p =
 		func(sp StreamProcessor) StreamProcessor {
 		func(sp StreamProcessor) StreamProcessor {
 			var zw io.WriteCloser
 			var zw io.WriteCloser
+			_ = zw
 			sd.Open = func(e *mail.Envelope) error {
 			sd.Open = func(e *mail.Envelope) error {
 				var err error
 				var err error
 				zw, err = zlib.NewWriterLevel(sp, zlib.BestSpeed)
 				zw, err = zlib.NewWriterLevel(sp, zlib.BestSpeed)
@@ -27,16 +28,9 @@ func StreamCompress() StreamDecorator {
 				return zw.Close()
 				return zw.Close()
 			}
 			}
 
 
-			return StreamProcessWith(zw.Write)
-			/*
-				return StreamProcessWith(func(p []byte) (n int, err error) {
-					var buf bytes.Buffer
-					if n, err := io.Copy(w, bytes.NewReader(p)); err != nil {
-						return int(n), err
-					}
-					return sp.Write(buf.Bytes())
-				})
-			*/
+			return StreamProcessWith(func(p []byte) (int, error) {
+				return zw.Write(p)
+			})
 
 
 		}
 		}
 	return sd
 	return sd

+ 31 - 0
backends/s_debug.go

@@ -0,0 +1,31 @@
+package backends
+
+import (
+	"fmt"
+	"github.com/flashmob/go-guerrilla/mail"
+)
+
+func init() {
+	streamers["debug"] = func() *StreamDecorator {
+		return StreamDebug()
+	}
+}
+
+func StreamDebug() *StreamDecorator {
+	sd := &StreamDecorator{}
+	sd.p =
+
+		func(sp StreamProcessor) StreamProcessor {
+
+			sd.Open = func(e *mail.Envelope) error {
+				return nil
+			}
+			return StreamProcessWith(func(p []byte) (int, error) {
+				fmt.Println(string(p))
+				Log().WithField("p", string(p)).Info("Debug stream")
+				return sp.Write(p)
+			})
+		}
+
+	return sd
+}

+ 87 - 0
backends/s_decompress.go

@@ -0,0 +1,87 @@
+package backends
+
+import (
+	"bytes"
+	"compress/zlib"
+	"github.com/flashmob/go-guerrilla/mail"
+	"io"
+)
+
+func init() {
+	streamers["decompress"] = func() *StreamDecorator {
+		return StreamDecompress()
+	}
+}
+
+// StreamDecompress is a PoC demonstrating how we can connect an io.Reader to our Writer
+// We use an io.Pipe to connect the two, writing to one end of the pipe, while
+// consuming the output on the other end of the pipe.
+
+func StreamDecompress() *StreamDecorator {
+	sd := &StreamDecorator{}
+	sd.p =
+		func(sp StreamProcessor) StreamProcessor {
+			var (
+				zr io.ReadCloser
+				pr *io.PipeReader
+				pw *io.PipeWriter
+			)
+
+			// consumer runs as a gorouitne.
+			// It connects the zlib reader with the read-end of the pipe
+			// then copies the output down to the next stream processor
+			// consumer will exit of the pipe gets closed or on error
+			consumer := func() {
+				var err error
+				for {
+					if zr == nil {
+						zr, err = zlib.NewReader(pr)
+						if err != nil {
+							_ = pr.CloseWithError(err)
+							return
+						}
+					}
+
+					_, err := io.Copy(sp, zr)
+					if err != nil {
+						_ = pr.CloseWithError(err)
+						return
+					}
+				}
+			}
+
+			// start our consumer goroutine
+			sd.Open = func(e *mail.Envelope) error {
+				pr, pw = io.Pipe()
+				go consumer()
+				return nil
+			}
+
+			// close both ends of the pipes when finished
+			sd.Close = func() error {
+				errR := pr.Close()
+				errW := pw.Close()
+				if err := zr.Close(); err != nil {
+					return err
+				}
+				if errR != nil {
+					return errR
+				}
+				if errW != nil {
+					return errW
+				}
+				return nil
+			}
+
+			return StreamProcessWith(func(p []byte) (n int, err error) {
+				// take the output and copy on the pipe, for the consumer to pick up
+				N, err := io.Copy(pw, bytes.NewReader(p))
+				if N > 0 {
+					n = int(N)
+				}
+				return
+			})
+
+		}
+	return sd
+}

+ 18 - 40
backends/s_header.go

@@ -8,8 +8,8 @@ import (
 )
 )
 
 
 func init() {
 func init() {
-	streamers["header"] = func() StreamDecorator {
-		return *StreamHeader()
+	streamers["header"] = func() *StreamDecorator {
+		return StreamHeader()
 	}
 	}
 }
 }
 
 
@@ -42,57 +42,35 @@ func (sh *streamHeader) addHeader(e *mail.Envelope, config HeaderConfig) {
 	sh.addHead = []byte(addHead)
 	sh.addHead = []byte(addHead)
 }
 }
 
 
-func (sh *streamHeader) Write(p []byte) (n int, err error) {
-	if sh.i < len(sh.addHead) {
-		for {
-			if N, err := sh.w.Write(sh.addHead[sh.i:]); err != nil {
-				return N, err
-			} else {
-				sh.i += N
-				if sh.i >= len(sh.addHead) {
-					break
-				}
-			}
-		}
-	}
-	return sh.w.Write(p)
-}
-
 func StreamHeader() *StreamDecorator {
 func StreamHeader() *StreamDecorator {
 	sd := &StreamDecorator{}
 	sd := &StreamDecorator{}
 	sd.p =
 	sd.p =
 
 
 		func(sp StreamProcessor) StreamProcessor {
 		func(sp StreamProcessor) StreamProcessor {
-			var dc *streamHeader
-			x := 1 + 5
-			_ = x
+			var sh *streamHeader
+
 			sd.Open = func(e *mail.Envelope) error {
 			sd.Open = func(e *mail.Envelope) error {
-				dc = newStreamHeader(sp)
+				sh = newStreamHeader(sp)
 				hc := HeaderConfig{"sharklasers.com"}
 				hc := HeaderConfig{"sharklasers.com"}
-				dc.addHeader(e, hc)
+				sh.addHeader(e, hc)
 				return nil
 				return nil
 			}
 			}
 			return StreamProcessWith(func(p []byte) (int, error) {
 			return StreamProcessWith(func(p []byte) (int, error) {
-
+				if sh.i < len(sh.addHead) {
+					for {
+						if N, err := sh.w.Write(sh.addHead[sh.i:]); err != nil {
+							return N, err
+						} else {
+							sh.i += N
+							if sh.i >= len(sh.addHead) {
+								break
+							}
+						}
+					}
+				}
 				return sp.Write(p)
 				return sp.Write(p)
 			})
 			})
 		}
 		}
 
 
-		/*
-			func(sp StreamProcessor) StreamProcessor {
-				var dc *streamHeader
-
-				sd.Open = func(e *mail.Envelope) error {
-					dc = newStreamHeader(sp)
-					hc := HeaderConfig{"sharklasers.com"}
-					dc.addHeader(e, hc)
-					return nil
-				}
-
-				return StreamProcessWith(dc.Write)
-
-
-			}
-		*/
 	return sd
 	return sd
 }
 }

+ 0 - 120
backends/stream.go

@@ -1,120 +0,0 @@
-package backends
-
-import (
-	"bytes"
-	"compress/zlib"
-	"io"
-)
-
-func init() {
-	streamers["compressor"] = func() StreamDecorator {
-		return StreamTest()
-	}
-}
-
-type streamCompressor struct {
-	zw *zlib.Writer
-}
-
-func newStreamCompressor(w io.Writer) io.Writer {
-	sc := new(streamCompressor)
-	sc.zw, _ = zlib.NewWriterLevel(w, zlib.BestSpeed)
-	return sc
-}
-func (sc *streamCompressor) Close() error {
-	return sc.zw.Close()
-}
-func (sc *streamCompressor) Write(p []byte) (n int, err error) {
-	N, err := sc.zw.Write(p)
-	return N, err
-}
-
-func newStreamDecompresser(w io.Writer) io.Writer {
-	sc := new(streamDecompressor)
-	sc.w = w
-	sc.pr, sc.pw = io.Pipe()
-	go sc.consumer()
-	return sc
-}
-
-type streamDecompressor struct {
-	w  io.Writer
-	zr io.ReadCloser
-
-	pr  *io.PipeReader
-	pw  *io.PipeWriter
-	zr2 io.ReadCloser
-}
-
-func (sc *streamDecompressor) Close() error {
-
-	errR := sc.pr.Close()
-	errW := sc.pw.Close()
-	if err := sc.zr.Close(); err != nil {
-		return err
-	}
-	if errR != nil {
-		return errR
-	}
-	if errW != nil {
-		return errW
-	}
-
-	return nil
-}
-
-func (sc *streamDecompressor) Write(p []byte) (n int, err error) {
-
-	N, err := io.Copy(sc.pw, bytes.NewReader(p))
-	if N > 0 {
-		n = int(N)
-	}
-	return
-}
-
-func (sc *streamDecompressor) consumer() {
-	var err error
-	for {
-		if sc.zr == nil {
-			sc.zr, err = zlib.NewReader(sc.pr)
-			if err != nil {
-				_ = sc.pr.CloseWithError(err)
-				return
-			}
-		}
-
-		_, err := io.Copy(sc.w, sc.zr)
-		if err != nil {
-			_ = sc.pr.CloseWithError(err)
-			return
-		}
-	}
-}
-
-func StreamTest() StreamDecorator {
-	sd := StreamDecorator{}
-	sd.p =
-		func(sp StreamProcessor) StreamProcessor {
-
-			dc := newStreamDecompresser(sp)
-			sd.Close = func() error {
-				if c, ok := dc.(io.Closer); ok {
-					return c.Close()
-				}
-				return nil
-			}
-
-			return StreamProcessWith(dc.Write)
-			/*
-				return StreamProcessWith(func(p []byte) (n int, err error) {
-					var buf bytes.Buffer
-					if n, err := io.Copy(w, bytes.NewReader(p)); err != nil {
-						return int(n), err
-					}
-					return sp.Write(buf.Bytes())
-				})
-			*/
-
-		}
-	return sd
-}

+ 1 - 1
backends/stream_backend.go

@@ -20,7 +20,7 @@ func (c CloseWith) Close() error {
 	return c()
 	return c()
 }
 }
 
 
-type StreamProcessorConstructor func() StreamDecorator
+type StreamProcessorConstructor func() *StreamDecorator
 
 
 type streamService struct {
 type streamService struct {
 	service
 	service

+ 7 - 6
backends/stream_decorate.go

@@ -1,6 +1,8 @@
 package backends
 package backends
 
 
-import "github.com/flashmob/go-guerrilla/mail"
+import (
+	"github.com/flashmob/go-guerrilla/mail"
+)
 
 
 type streamOpenWith func(e *mail.Envelope) error
 type streamOpenWith func(e *mail.Envelope) error
 
 
@@ -15,12 +17,11 @@ type StreamDecorator struct {
 }
 }
 
 
 // DecorateStream will decorate a StreamProcessor with a slice of passed decorators
 // DecorateStream will decorate a StreamProcessor with a slice of passed decorators
-func DecorateStream(c StreamProcessor, ds ...StreamDecorator) StreamProcessor {
-	decorated := c
-	for _, decorate := range ds {
-		decorated = decorate.p(decorated)
+func DecorateStream(c StreamProcessor, ds []*StreamDecorator) (StreamProcessor, []*StreamDecorator) {
+	for i := range ds {
+		c = ds[i].p(c)
 	}
 	}
-	return decorated
+	return c, ds
 }
 }
 
 
 func (sd *StreamDecorator) OpenX(e *mail.Envelope) error {
 func (sd *StreamDecorator) OpenX(e *mail.Envelope) error {

+ 0 - 45
backends/stream_test.go

@@ -1,45 +0,0 @@
-package backends
-
-import (
-	"bytes"
-	"fmt"
-	"github.com/flashmob/go-guerrilla/mail"
-	"io"
-	"testing"
-)
-
-func TestStream(t *testing.T) {
-
-	var e = mail.Envelope{
-		RcptTo:   []mail.Address{{User: "test", Host: "example.com"}},
-		Helo:     "a.cool.host.com",
-		RemoteIP: "6.6.4.4",
-	}
-	hc := HeaderConfig{"sharklasers.com"}
-
-	var buf bytes.Buffer
-	dc := newStreamDecompresser(&buf)
-	comp := newStreamCompressor(dc)
-
-	s := newStreamHeader(comp)
-	s.addHeader(&e, hc)
-
-	n, err := io.Copy(s, bytes.NewBufferString("testing123"))
-	if err != nil {
-		t.Error(err, n)
-	}
-
-	if wc, ok := comp.(io.WriteCloser); ok {
-		err = wc.Close()
-		fmt.Println("err1", err)
-	}
-
-	if wcec, ok := dc.(io.WriteCloser); ok {
-		err = wcec.Close()
-		fmt.Println("err2", err)
-	}
-
-	fmt.Println((buf.String()))
-
-	//time.Sleep(time.Second * 10)
-}

+ 6 - 0
server.go

@@ -10,6 +10,7 @@ import (
 	"io/ioutil"
 	"io/ioutil"
 	"net"
 	"net"
 	"path/filepath"
 	"path/filepath"
+	"reflect"
 	"strings"
 	"strings"
 	"sync"
 	"sync"
 	"sync/atomic"
 	"sync/atomic"
@@ -559,6 +560,11 @@ func (s *server) handleClient(client *client) {
 					err = fmt.Errorf("maximum DATA size exceeded (%d)", sc.MaxSize)
 					err = fmt.Errorf("maximum DATA size exceeded (%d)", sc.MaxSize)
 				}
 				}
 			}
 			}
+			if err == nil {
+				fmt.Println(err)
+			} else {
+				fmt.Println(err.Error(), "type is:", reflect.TypeOf(err))
+			}
 
 
 			if err != nil {
 			if err != nil {
 				if err == LineLimitExceeded {
 				if err == LineLimitExceeded {