Browse Source

debugging in progress 2

flashmob 5 years ago
parent
commit
bd17c235c7
7 changed files with 30 additions and 11 deletions
  1. 1 0
      api_test.go
  2. 14 5
      backends/config.go
  3. 2 2
      backends/s_decompress.go
  4. 4 1
      chunk/chunk.go
  5. 2 2
      config.go
  6. 7 0
      config_test.go
  7. 0 1
      server.go

+ 1 - 0
api_test.go

@@ -1315,6 +1315,7 @@ func TestStreamChunkSaver(t *testing.T) {
 				"default": {
 					"save_process":        "HeadersParser|Debugger",
 					"stream_save_process": "mimeanalyzer|chunksaver",
+					"gw_save_timeout":     "5",
 				},
 			},
 		},

+ 14 - 5
backends/config.go

@@ -8,8 +8,21 @@ import (
 )
 
 type ConfigGroup map[string]interface{}
+
 type BackendConfig map[string]map[string]ConfigGroup
 
+/*
+
+TODO change to thus
+
+type BackendConfig struct {
+	Processors map[string]ConfigGroup `json:"processors,omitempty"`
+	StreamProcessors map[string]ConfigGroup `json:"stream_processors,omitempty"`
+	Gateways map[string]ConfigGroup `json:"gateways,omitempty"`
+}
+
+*/
+
 func (c *BackendConfig) SetValue(ns configNameSpace, name string, key string, value interface{}) {
 	nsKey := ns.String()
 	if *c == nil {
@@ -84,7 +97,7 @@ func NewStackConfig(config string) (ret *stackConfig) {
 	pos := 0
 	for i := range items {
 		pos = len(items) - 1 - i // reverse order, since decorators are stacked
-		ret.list[pos] = stackConfigExpression{alias: "", name: items[pos]}
+		ret.list[i] = stackConfigExpression{alias: "", name: items[pos]}
 	}
 	return ret
 }
@@ -134,7 +147,6 @@ func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed
 				changed[key] = true
 			}
 		}
-
 		if o, ok := oldConfig[key]; ok {
 			delete(oldConfig, key)
 			if !reflect.DeepEqual(c[key], o) {
@@ -146,14 +158,11 @@ func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed
 			added[key] = true
 		}
 	}
-
 	// whats been removed
 	for p := range oldConfig {
 		removed[p] = true
 	}
-
 	return
-
 }
 
 func changedConfigGroups(old map[string]ConfigGroup, new map[string]ConfigGroup) map[string]bool {

+ 2 - 2
backends/s_decompress.go

@@ -65,8 +65,6 @@ func StreamDecompress() *StreamDecorator {
 				// stop the consumer
 				errR := pr.Close()
 				errW := pw.Close()
-				// wait for the consumer to stop
-				wg.Wait()
 				if zr != nil {
 					if err := zr.Close(); err != nil {
 						return err
@@ -78,6 +76,8 @@ func StreamDecompress() *StreamDecorator {
 				if errW != nil {
 					return errW
 				}
+				// wait for the consumer to stop
+				wg.Wait()
 				pr = nil
 				pw = nil
 				zr = nil

+ 4 - 1
chunk/chunk.go

@@ -5,6 +5,7 @@ import (
 	"compress/zlib"
 	"encoding/base64"
 	"encoding/json"
+	"errors"
 	"io/ioutil"
 	"sync"
 )
@@ -113,7 +114,9 @@ func (info *PartsInfo) UnmarshalJSONZlib(b []byte) error {
 
 // MarshalJSONZlib marshals and compresses the bytes using zlib
 func (info *PartsInfo) MarshalJSONZlib() ([]byte, error) {
-
+	if len(info.Parts) == 0 {
+		return []byte{}, errors.New("message contained no parts, was mime analyzer")
+	}
 	buf, err := json.Marshal(info)
 	if err != nil {
 		return buf, err

+ 2 - 2
config.go

@@ -30,7 +30,7 @@ type AppConfig struct {
 	// "info", "debug", "error", "panic". Default "info"
 	LogLevel string `json:"log_level,omitempty"`
 	// BackendConfig configures the email envelope processing backend
-	BackendConfig backends.BackendConfig `json:"backend"`
+	BackendConfig backends.BackendConfig `json:"backend,omitempty"`
 }
 
 // ServerConfig specifies config options for a single server
@@ -61,7 +61,7 @@ type ServerConfig struct {
 	// original client's IP address & client's HELO
 	XClientOn bool `json:"xclient_on,omitempty"`
 	// Gateway specifies which backend to use
-	Gateway string `json:"backend"`
+	Gateway string `json:"backend,omitempty"`
 }
 
 type ServerTLSConfig struct {

+ 7 - 0
config_test.go

@@ -92,6 +92,7 @@ var configJsonA = `
 			}
         }
     ]
+  }
 }
 `
 
@@ -111,6 +112,12 @@ var configJsonB = `
 			"debugger": {
 				"log_received_mails" : true
 			}
+		},
+		"gateways" : {
+			"default" : {
+				"save_workers_size":  1,
+				"save_process":  "HeadersParser|Header|Hasher|Debugger"
+			}
 		}
 	},
     "servers" : [

+ 0 - 1
server.go

@@ -598,7 +598,6 @@ func (s *server) handleClient(client *client) {
 			if be.StreamOn() {
 				// process the message as a stream
 				res, err = be.ProcessStream(client.smtpReader.DotReader(), client.Envelope)
-
 			} else {
 				// or buffer the entire message (parse headers & mime structure as we go along)
 				n, err = client.Data.ReadFrom(client.smtpReader)