Browse Source

- and visible line feed to debug message <LF?
- Add a new 'process' stream decorator

flashmob 6 years ago
parent
commit
2faf37ad29
3 changed files with 42 additions and 7 deletions
  1. 1 1
      backends/s_debug.go
  2. 6 6
      backends/s_headers_parser.go
  3. 35 0
      backends/s_process.go

+ 1 - 1
backends/s_debug.go

@@ -23,7 +23,7 @@ func StreamDebug() *StreamDecorator {
 			}
 			}
 			return StreamProcessWith(func(p []byte) (int, error) {
 			return StreamProcessWith(func(p []byte) (int, error) {
 				str := string(p)
 				str := string(p)
-				str = strings.Replace(str, "\n", "<NL>\n", -1)
+				str = strings.Replace(str, "\n", "<LF>\n", -1)
 				fmt.Println(str)
 				fmt.Println(str)
 				Log().WithField("p", string(p)).Info("Debug stream")
 				Log().WithField("p", string(p)).Info("Debug stream")
 				return sp.Write(p)
 				return sp.Write(p)

+ 6 - 6
backends/s_headers_parser.go

@@ -3,10 +3,10 @@ package backends
 import (
 import (
 	"bufio"
 	"bufio"
 	"bytes"
 	"bytes"
-	"fmt"
-	"github.com/flashmob/go-guerrilla/mail"
 	"io"
 	"io"
 	"net/textproto"
 	"net/textproto"
+
+	"github.com/flashmob/go-guerrilla/mail"
 )
 )
 
 
 func init() {
 func init() {
@@ -17,6 +17,7 @@ func init() {
 
 
 const stateHeaderScanning = 0
 const stateHeaderScanning = 0
 const stateHeaderNotScanning = 1
 const stateHeaderNotScanning = 1
+const headerMaxBytes = 1024 * 4
 
 
 func StreamHeadersParser() *StreamDecorator {
 func StreamHeadersParser() *StreamDecorator {
 	sd := &StreamDecorator{}
 	sd := &StreamDecorator{}
@@ -24,7 +25,7 @@ func StreamHeadersParser() *StreamDecorator {
 
 
 		func(sp StreamProcessor) StreamProcessor {
 		func(sp StreamProcessor) StreamProcessor {
 
 
-			// contains the header
+			// buf buffers the header
 			var buf bytes.Buffer
 			var buf bytes.Buffer
 			var state byte
 			var state byte
 			var lastByte byte
 			var lastByte byte
@@ -100,7 +101,7 @@ func StreamHeadersParser() *StreamDecorator {
 					} else {
 					} else {
 						total += n
 						total += n
 						// give up if we didn't detect the header after x bytes
 						// give up if we didn't detect the header after x bytes
-						if total > 100 {
+						if total > headerMaxBytes {
 							state = stateHeaderNotScanning
 							state = stateHeaderNotScanning
 							n, err = io.Copy(sp, &buf)
 							n, err = io.Copy(sp, &buf)
 							return int(n), err
 							return int(n), err
@@ -111,8 +112,7 @@ func StreamHeadersParser() *StreamDecorator {
 					// just forward everything to the next writer without buffering
 					// just forward everything to the next writer without buffering
 					return sp.Write(p)
 					return sp.Write(p)
 				}
 				}
-				fmt.Println(string(p))
-				Log().WithField("p", string(p)).Info("Debug stream")
+
 				return sp.Write(p)
 				return sp.Write(p)
 			})
 			})
 		}
 		}

+ 35 - 0
backends/s_process.go

@@ -0,0 +1,35 @@
+package backends
+
+import (
+	"bytes"
+	"github.com/flashmob/go-guerrilla/mail"
+	"io"
+)
+
+func init() {
+	streamers["process"] = func() *StreamDecorator {
+		return StreamProcess()
+	}
+}
+
+// Buffers to envelope.Data so that processors can be called on it at the end
+func StreamProcess() *StreamDecorator {
+	sd := &StreamDecorator{}
+	sd.p =
+
+		func(sp StreamProcessor) StreamProcessor {
+			var envelope *mail.Envelope
+			sd.Open = func(e *mail.Envelope) error {
+				envelope = e
+				return nil
+			}
+
+			return StreamProcessWith(func(p []byte) (int, error) {
+				tr := io.TeeReader(bytes.NewReader(p), sp)
+				n, err := envelope.Data.ReadFrom(tr)
+				return int(n), err
+			})
+		}
+
+	return sd
+}