Browse Source

- add a stream processor that can parse headers on-the-fly (s_headers_parser.go)
- use a 4KB buffer to process the stream (io.CopyBuffer instead of io.Copy

flashmob 6 years ago
parent
commit
1367b5d10e
6 changed files with 238 additions and 7 deletions
  1. 79 2
      api_test.go
  2. 3 1
      backends/gateway.go
  3. 1 1
      backends/p_header.go
  4. 4 1
      backends/s_debug.go
  5. 30 2
      backends/s_header.go
  6. 121 0
      backends/s_headers_parser.go

+ 79 - 2
api_test.go

@@ -496,6 +496,66 @@ func talkToServer(address string) (err error) {
 	return nil
 	return nil
 }
 }
 
 
+func talkToServer2(address string, body string) (err error) {
+
+	conn, err := net.Dial("tcp", address)
+	if err != nil {
+		return
+	}
+	in := bufio.NewReader(conn)
+	str, err := in.ReadString('\n')
+	if err != nil {
+		return err
+	}
+	_, err = fmt.Fprint(conn, "HELO maildiranasaurustester\r\n")
+	if err != nil {
+		return err
+	}
+	str, err = in.ReadString('\n')
+	if err != nil {
+		return err
+	}
+	_, err = fmt.Fprint(conn, "MAIL FROM:<[email protected]>r\r\n")
+	if err != nil {
+		return err
+	}
+	str, err = in.ReadString('\n')
+	if err != nil {
+		return err
+	}
+	if err != nil {
+		return err
+	}
+	_, err = fmt.Fprint(conn, "RCPT TO:<[email protected]>\r\n")
+	if err != nil {
+		return err
+	}
+	str, err = in.ReadString('\n')
+	if err != nil {
+		return err
+	}
+	_, err = fmt.Fprint(conn, "DATA\r\n")
+	if err != nil {
+		return err
+	}
+	str, err = in.ReadString('\n')
+	if err != nil {
+		return err
+	}
+
+	_, err = fmt.Fprint(conn, body)
+	if err != nil {
+		return err
+	}
+
+	str, err = in.ReadString('\n')
+	if err != nil {
+		return err
+	}
+	_ = str
+	return nil
+}
+
 // Test hot config reload
 // Test hot config reload
 // Here we forgot to add FunkyLogger so backend will fail to init
 // Here we forgot to add FunkyLogger so backend will fail to init
 
 
@@ -719,7 +779,7 @@ func TestStreamProcessor(t *testing.T) {
 		AllowedHosts: []string{"grr.la"},
 		AllowedHosts: []string{"grr.la"},
 		BackendConfig: backends.BackendConfig{
 		BackendConfig: backends.BackendConfig{
 			"save_process":        "HeadersParser|Debugger",
 			"save_process":        "HeadersParser|Debugger",
-			"stream_save_process": "Header|compress|Decompress|debug",
+			"stream_save_process": "Header|headersparser|compress|Decompress|debug",
 		},
 		},
 	}
 	}
 	d := Daemon{Config: cfg}
 	d := Daemon{Config: cfg}
@@ -727,8 +787,25 @@ func TestStreamProcessor(t *testing.T) {
 	if err := d.Start(); err != nil {
 	if err := d.Start(); err != nil {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
+	body := "Subject: Test subject\r\n" +
+		//"\r\n" +
+		"A an email body.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+		"Header|headersparser|compress|Decompress|debug Header|headersparser|compress|Decompress|debug.\r\n" +
+
+		".\r\n"
 	// lets have a talk with the server
 	// lets have a talk with the server
-	if err := talkToServer("127.0.0.1:2525"); err != nil {
+	if err := talkToServer2("127.0.0.1:2525", body); err != nil {
 		t.Error(err)
 		t.Error(err)
 	}
 	}
 
 

+ 3 - 1
backends/gateway.go

@@ -601,7 +601,9 @@ func (gw *BackendGateway) workDispatcher(
 			} else if msg.task == TaskSaveMailStream {
 			} else if msg.task == TaskSaveMailStream {
 				err := stream.open(msg.e)
 				err := stream.open(msg.e)
 				if err == nil {
 				if err == nil {
-					if msg.e.Values["size"], err = io.Copy(stream, msg.r); err != nil {
+					var buf []byte
+					buf = make([]byte, 1024*4)
+					if msg.e.Values["size"], err = io.CopyBuffer(stream, msg.r, buf); err != nil {
 						Log().WithError(err).Error("stream writing failed")
 						Log().WithError(err).Error("stream writing failed")
 					}
 					}
 					if err = stream.close(); err != nil {
 					if err = stream.close(); err != nil {

+ 1 - 1
backends/p_header.go

@@ -15,7 +15,7 @@ type HeaderConfig struct {
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
 // Description   : Adds delivery information headers to e.DeliveryHeader
 // Description   : Adds delivery information headers to e.DeliveryHeader
 // ----------------------------------------------------------------------------------
 // ----------------------------------------------------------------------------------
-// Config Options: none
+// Config Options: primary_mail_host - string of the primary mail hostname
 // --------------:-------------------------------------------------------------------
 // --------------:-------------------------------------------------------------------
 // Input         : e.Helo
 // Input         : e.Helo
 //               : e.RemoteAddress
 //               : e.RemoteAddress

+ 4 - 1
backends/s_debug.go

@@ -3,6 +3,7 @@ package backends
 import (
 import (
 	"fmt"
 	"fmt"
 	"github.com/flashmob/go-guerrilla/mail"
 	"github.com/flashmob/go-guerrilla/mail"
+	"strings"
 )
 )
 
 
 func init() {
 func init() {
@@ -21,7 +22,9 @@ func StreamDebug() *StreamDecorator {
 				return nil
 				return nil
 			}
 			}
 			return StreamProcessWith(func(p []byte) (int, error) {
 			return StreamProcessWith(func(p []byte) (int, error) {
-				fmt.Println(string(p))
+				str := string(p)
+				str = strings.Replace(str, "\n", "<NL>\n", -1)
+				fmt.Println(str)
 				Log().WithField("p", string(p)).Info("Debug stream")
 				Log().WithField("p", string(p)).Info("Debug stream")
 				return sp.Write(p)
 				return sp.Write(p)
 			})
 			})

+ 30 - 2
backends/s_header.go

@@ -7,6 +7,21 @@ import (
 	"time"
 	"time"
 )
 )
 
 
+// ----------------------------------------------------------------------------------
+// Processor Name: header
+// ----------------------------------------------------------------------------------
+// Description   : Adds delivery information headers to e.DeliveryHeader
+// ----------------------------------------------------------------------------------
+// Config Options: primary_mail_host - string of the primary mail hostname
+// --------------:-------------------------------------------------------------------
+// Input         : e.Helo
+//               : e.RemoteAddress
+//               : e.RcptTo
+//               : e.Hashes
+// ----------------------------------------------------------------------------------
+// Output        : Sets e.DeliveryHeader with additional delivery info
+// ----------------------------------------------------------------------------------
+
 func init() {
 func init() {
 	streamers["header"] = func() *StreamDecorator {
 	streamers["header"] = func() *StreamDecorator {
 		return StreamHeader()
 		return StreamHeader()
@@ -25,7 +40,7 @@ func newStreamHeader(w io.Writer) *streamHeader {
 	return sc
 	return sc
 }
 }
 
 
-func (sh *streamHeader) addHeader(e *mail.Envelope, config HeaderConfig) {
+func (sh *streamHeader) addHeader(e *mail.Envelope, config *HeaderConfig) {
 	to := strings.TrimSpace(e.RcptTo[0].User) + "@" + config.PrimaryHost
 	to := strings.TrimSpace(e.RcptTo[0].User) + "@" + config.PrimaryHost
 	hash := "unknown"
 	hash := "unknown"
 	if len(e.Hashes) > 0 {
 	if len(e.Hashes) > 0 {
@@ -43,6 +58,19 @@ func (sh *streamHeader) addHeader(e *mail.Envelope, config HeaderConfig) {
 }
 }
 
 
 func StreamHeader() *StreamDecorator {
 func StreamHeader() *StreamDecorator {
+
+	var hc *HeaderConfig
+
+	Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
+		configType := BaseConfig(&HeaderConfig{})
+		bcfg, err := Svc.ExtractConfig(backendConfig, configType)
+		if err != nil {
+			return err
+		}
+		hc = bcfg.(*HeaderConfig)
+		return nil
+	}))
+
 	sd := &StreamDecorator{}
 	sd := &StreamDecorator{}
 	sd.p =
 	sd.p =
 
 
@@ -51,10 +79,10 @@ func StreamHeader() *StreamDecorator {
 
 
 			sd.Open = func(e *mail.Envelope) error {
 			sd.Open = func(e *mail.Envelope) error {
 				sh = newStreamHeader(sp)
 				sh = newStreamHeader(sp)
-				hc := HeaderConfig{"sharklasers.com"}
 				sh.addHeader(e, hc)
 				sh.addHeader(e, hc)
 				return nil
 				return nil
 			}
 			}
+
 			return StreamProcessWith(func(p []byte) (int, error) {
 			return StreamProcessWith(func(p []byte) (int, error) {
 				if sh.i < len(sh.addHead) {
 				if sh.i < len(sh.addHead) {
 					for {
 					for {

+ 121 - 0
backends/s_headers_parser.go

@@ -0,0 +1,121 @@
+package backends
+
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"github.com/flashmob/go-guerrilla/mail"
+	"io"
+	"net/textproto"
+)
+
+func init() {
+	streamers["headersparser"] = func() *StreamDecorator {
+		return StreamHeadersParser()
+	}
+}
+
+const stateHeaderScanning = 0
+const stateHeaderNotScanning = 1
+
+func StreamHeadersParser() *StreamDecorator {
+	sd := &StreamDecorator{}
+	sd.p =
+
+		func(sp StreamProcessor) StreamProcessor {
+
+			// contains the header
+			var buf bytes.Buffer
+			var state byte
+			var lastByte byte
+			var total int64
+			var envelope *mail.Envelope
+
+			parse := func() error {
+				var err error
+				// use a TeeReader to split the write to both sp and headerReader
+				r := bufio.NewReader(io.TeeReader(&buf, sp))
+				headerReader := textproto.NewReader(r)
+				envelope.Header, err = headerReader.ReadMIMEHeader()
+
+				if err != nil {
+					if subject, ok := envelope.Header["Subject"]; ok {
+						envelope.Subject = mail.MimeHeaderDecode(subject[0])
+					}
+				}
+
+				return err
+			}
+			sd.Open = func(e *mail.Envelope) error {
+				buf.Reset()
+				state = 0
+				lastByte = 0
+				total = 0
+				envelope = e
+				return nil
+			}
+
+			sd.Close = func() error {
+				// If header wasn't detected
+				// pump out whatever is in the buffer to the underlying writer
+				if state == stateHeaderScanning {
+					_, err := io.Copy(sp, &buf)
+					return err
+				}
+				return nil
+			}
+			return StreamProcessWith(func(p []byte) (int, error) {
+
+				switch state {
+				case stateHeaderScanning:
+					// detect end of header \n\n
+					headerEnd := bytes.Index(p, []byte{'\n', '\n'})
+					if headerEnd == -1 && (lastByte == '\n' && p[0] == '\n') {
+						headerEnd = 0
+					}
+					var remainder []byte // remainder are the non-header bytes after the \n\n
+					if headerEnd > -1 {
+
+						if len(p) > headerEnd {
+							remainder = p[headerEnd:]
+						}
+						p = p[:headerEnd]
+					}
+
+					// read in the header to a temp buffer
+					n, err := io.Copy(&buf, bytes.NewReader(p))
+					lastByte = p[n-1] // remember the last byte read
+					if headerEnd > -1 {
+						// header found, parse it
+						parseErr := parse()
+						if parseErr != nil {
+							Log().WithError(parseErr).Error("cannot parse headers")
+						}
+						// flush the remainder to the underlying writer
+						if remainder != nil {
+							n1, _ := sp.Write(remainder)
+							n = n + int64(n1)
+						}
+						state = stateHeaderNotScanning
+					} else {
+						total += n
+						// give up if we didn't detect the header after x bytes
+						if total > 100 {
+							state = stateHeaderNotScanning
+							n, err = io.Copy(sp, &buf)
+							return int(n), err
+						}
+					}
+					return int(n), err
+				case stateHeaderNotScanning:
+					// just forward everything to the next writer without buffering
+					return sp.Write(p)
+				}
+				fmt.Println(string(p))
+				Log().WithField("p", string(p)).Info("Debug stream")
+				return sp.Write(p)
+			})
+		}
+
+	return sd
+}