Browse Source

add streams based processing, implementing io.Writer

flashmob 6 năm trước cách đây
mục cha
commit
75a123068c
3 tập tin đã thay đổi với 15 bổ sung17 xóa
  1. 1 1
      api_test.go
  2. 14 10
      backends/gateway.go
  3. 0 6
      server.go

+ 1 - 1
api_test.go

@@ -745,7 +745,7 @@ func TestStreamProcessor(t *testing.T) {
 		t.Error("did not log: Debug stream")
 	}
 
-	if strings.Index(string(b), "Error") == -1 {
+	if strings.Index(string(b), "Error") != -1 {
 		t.Error("There was an error", string(b))
 	}
 

+ 14 - 10
backends/gateway.go

@@ -77,8 +77,8 @@ func (s streamer) Write(p []byte) (n int, err error) {
 	return s.sp.Write(p)
 }
 
-func (s *streamer) open(e *mail.Envelope) Errors {
-	var err Errors = nil
+func (s *streamer) open(e *mail.Envelope) error {
+	var err Errors
 	for i := range s.d {
 		if s.d[i].Open != nil {
 			if e := s.d[i].Open(e); e != nil {
@@ -86,11 +86,14 @@ func (s *streamer) open(e *mail.Envelope) Errors {
 			}
 		}
 	}
+	if len(err) == 0 {
+		return nil
+	}
 	return err
 }
 
-func (s *streamer) close() Errors {
-	var err Errors = nil
+func (s *streamer) close() error {
+	var err Errors
 	// close in reverse order
 	for i := len(s.d) - 1; i >= 0; i-- {
 		if s.d[i].Close != nil {
@@ -99,7 +102,9 @@ func (s *streamer) close() Errors {
 			}
 		}
 	}
-
+	if len(err) == 0 {
+		return nil
+	}
 	return err
 }
 
@@ -596,12 +601,11 @@ func (gw *BackendGateway) workDispatcher(
 			} else if msg.task == TaskSaveMailStream {
 				err := stream.open(msg.e)
 				if err == nil {
-					if N, copyErr := io.Copy(stream, msg.r); copyErr != nil {
-						err = append(err, copyErr)
-						msg.e.Values["size"] = N
+					if msg.e.Values["size"], err = io.Copy(stream, msg.r); err != nil {
+						Log().WithError(err).Error("stream writing failed")
 					}
-					if closeErr := stream.close(); closeErr != nil {
-						err = append(err, closeErr)
+					if err = stream.close(); err != nil {
+						Log().WithError(err).Error("stream closing failed")
 					}
 				}
 				state = dispatcherStateNotify

+ 0 - 6
server.go

@@ -10,7 +10,6 @@ import (
 	"io/ioutil"
 	"net"
 	"path/filepath"
-	"reflect"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -560,11 +559,6 @@ func (s *server) handleClient(client *client) {
 					err = fmt.Errorf("maximum DATA size exceeded (%d)", sc.MaxSize)
 				}
 			}
-			if err == nil {
-				fmt.Println(err)
-			} else {
-				fmt.Println(err.Error(), "type is:", reflect.TypeOf(err))
-			}
 
 			if err != nil {
 				if err == LineLimitExceeded {