Browse Source

debugging in progress

flashmob 5 years ago
parent
commit
091c2bffd6
13 changed files with 44 additions and 26 deletions
  1. 1 0
      api.go
  2. 2 2
      api_test.go
  3. 1 3
      backends/backend.go
  4. 4 2
      backends/config.go
  5. 1 3
      backends/gateway.go
  6. 0 2
      backends/s_debug.go
  7. 14 3
      backends/s_decompress.go
  8. 5 5
      cmd/guerrillad/serve_test.go
  9. 1 0
      config.go
  10. 1 1
      config_test.go
  11. 4 1
      event.go
  12. 9 3
      guerrilla.go
  13. 1 1
      tests/guerrilla_test.go

+ 1 - 0
api.go

@@ -55,6 +55,7 @@ func (d *Daemon) Start() (err error) {
 		if err != nil {
 			return err
 		}
+
 		for i := range d.subs {
 			_ = d.Subscribe(d.subs[i].topic, d.subs[i].fn)
 

+ 2 - 2
api_test.go

@@ -139,7 +139,7 @@ func TestSMTPLoadFile(t *testing.T) {
     "allowed_hosts": ["spam4.me","grr.la"],
 	"backend" : {
 		"processors" : {
-			"debugger" {
+			"debugger" : {
 				"log_received_mails" : true
 			}
 		},
@@ -177,7 +177,7 @@ func TestSMTPLoadFile(t *testing.T) {
     "allowed_hosts": ["spam4.me","grr.la"],
     "backend" : {
 		"processors" : {
-			"debugger" {
+			"debugger" : {
 				"log_received_mails" : true
 			}
 		},

+ 1 - 3
backends/backend.go

@@ -21,8 +21,6 @@ var (
 
 	// Streamers store the constructors for composing a new stream-based processor using a decorator pattern.
 	Streamers map[string]StreamProcessorConstructor
-
-	b Backend
 )
 
 func init() {
@@ -274,7 +272,7 @@ func (s *service) ExtractConfig(ns configNameSpace, group string, cfg BackendCon
 	if v, ok := cfg[ns.String()][group]; ok {
 		configData = v
 	} else {
-		return configData, nil
+		return configType, nil
 	}
 	// Use reflection so that we can provide a nice error message
 	v := reflect.ValueOf(configType).Elem() // so that we can set the values

+ 4 - 2
backends/config.go

@@ -21,7 +21,7 @@ func (c *BackendConfig) SetValue(ns configNameSpace, name string, key string, va
 	if (*c)[nsKey][name] == nil {
 		(*c)[nsKey][name] = make(ConfigGroup)
 	}
-	(*c)[nsKey][name] = map[string]interface{}{key: value}
+	(*c)[nsKey][name][key] = value
 }
 
 func (c *BackendConfig) GetValue(ns configNameSpace, name string, key string) interface{} {
@@ -66,9 +66,11 @@ type stackConfigExpression struct {
 	name  string
 }
 
+type notFoundError func(s string) error
+
 type stackConfig struct {
 	list     []stackConfigExpression
-	notFound func(s string) error
+	notFound notFoundError
 }
 
 func NewStackConfig(config string) (ret *stackConfig) {

+ 1 - 3
backends/gateway.go

@@ -14,8 +14,6 @@ import (
 	"runtime/debug"
 )
 
-var ErrProcessorNotFound error
-
 // A backend gateway is a proxy that implements the Backend interface.
 // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
 // via a channel. Shutting down via Shutdown() will stop all workers.
@@ -161,7 +159,7 @@ func New(name string, backendConfig BackendConfig, l log.Logger) (Backend, error
 	}
 	// keep the a copy of the config
 	gateway.config = backendConfig
-	return b, nil
+	return gateway, nil
 }
 
 var workerMsgPool = sync.Pool{

+ 0 - 2
backends/s_debug.go

@@ -22,8 +22,6 @@ func StreamDebug() *StreamDecorator {
 			}
 			return StreamProcessWith(func(p []byte) (int, error) {
 				str := string(p)
-				//str = strings.Replace(str, "\n", "<LF>\n", -1)
-				//fmt.Println(str)
 				fmt.Print(str)
 				Log().WithField("p", string(p)).Info("Debug stream")
 				return sp.Write(p)

+ 14 - 3
backends/s_decompress.go

@@ -5,6 +5,7 @@ import (
 	"compress/zlib"
 	"github.com/flashmob/go-guerrilla/mail"
 	"io"
+	"sync"
 )
 
 func init() {
@@ -26,12 +27,13 @@ func StreamDecompress() *StreamDecorator {
 				pr *io.PipeReader
 				pw *io.PipeWriter
 			)
-
+			var wg sync.WaitGroup
 			// consumer runs as a gorouitne.
 			// It connects the zlib reader with the read-end of the pipe
 			// then copies the output down to the next stream processor
 			// consumer will exit of the pipe gets closed or on error
 			consumer := func() {
+				defer wg.Done()
 				var err error
 				for {
 					if zr == nil {
@@ -53,16 +55,22 @@ func StreamDecompress() *StreamDecorator {
 			// start our consumer goroutine
 			sd.Open = func(e *mail.Envelope) error {
 				pr, pw = io.Pipe()
+				wg.Add(1)
 				go consumer()
 				return nil
 			}
 
 			// close both ends of the pipes when finished
 			sd.Close = func() error {
+				// stop the consumer
 				errR := pr.Close()
 				errW := pw.Close()
-				if err := zr.Close(); err != nil {
-					return err
+				// wait for the consumer to stop
+				wg.Wait()
+				if zr != nil {
+					if err := zr.Close(); err != nil {
+						return err
+					}
 				}
 				if errR != nil {
 					return errR
@@ -70,6 +78,9 @@ func StreamDecompress() *StreamDecorator {
 				if errW != nil {
 					return errW
 				}
+				pr = nil
+				pw = nil
+				zr = nil
 				return nil
 			}
 

+ 5 - 5
cmd/guerrillad/serve_test.go

@@ -39,7 +39,7 @@ var configJsonA = `
     ],
 	"backend" : {
 		"processors" : {
-			"debugger" {
+			"debugger" : {
 				"log_received_mails" : true
 			}
 		},
@@ -100,7 +100,7 @@ var configJsonB = `
     ],
     "backend" : {
 		"processors" : {
-			"debugger" {
+			"debugger" : {
 				"log_received_mails" : false
 			}
 		},
@@ -146,7 +146,7 @@ var configJsonC = `
     ],
 	"backend" : {
 		"processors" : {
-			"debugger" {
+			"debugger" : {
 				"log_received_mails" : true
 			},
             "sql" : {
@@ -212,7 +212,7 @@ var configJsonD = `
     ],
 	"backend" : {
 		"processors" : {
-			"debugger" {
+			"debugger" : {
 				"log_received_mails" : false
 			}
 		},
@@ -273,7 +273,7 @@ var configJsonE = `
     ],
 	"backend" : {
 		"processors" : {
-			"debugger" {
+			"debugger" : {
 				"log_received_mails" : true
 			},
             "sql" : {

+ 1 - 0
config.go

@@ -304,6 +304,7 @@ func (c *AppConfig) setDefaults() error {
 		sc.MaxClients = defaultMaxClients
 		sc.Timeout = defaultTimeout
 		sc.MaxSize = defaultMaxSize
+		sc.Gateway = backends.DefaultGateway
 		c.Servers = append(c.Servers, sc)
 	} else {
 		// make sure each server has defaults correctly configured

+ 1 - 1
config_test.go

@@ -108,7 +108,7 @@ var configJsonB = `
     "allowed_hosts": ["spam4.me","grr.la","newhost.com"],
     "backend" : {
 		"processors" : {
-			"debugger" {
+			"debugger": {
 				"log_received_mails" : true
 			}
 		}

+ 4 - 1
event.go

@@ -54,7 +54,9 @@ var eventList = [...]string{
 	"config_change:log_file",
 	"config_change:reopen_log_file",
 	"config_change:log_level",
-	"config_change:backend_config", // todo change to 'backend;
+	"config_change:backend_config",
+	"config_change:backend_config_added",
+	"config_change:backend_removed",
 	"server_change:new_server",
 	"server_change:remove_server",
 	"server_change:update_config",
@@ -65,6 +67,7 @@ var eventList = [...]string{
 	"server_change:timeout",
 	"server_change:max_clients",
 	"server_change:tls_config",
+	"server_change:gateway",
 }
 
 func (e Event) String() string {

+ 9 - 3
guerrilla.go

@@ -87,11 +87,10 @@ func (ls *logStore) setMainlog(log log.Logger) {
 func (g *guerrilla) makeConfiguredBackends(l log.Logger) ([]backends.Backend, error) {
 	var list []backends.Backend
 	config := g.Config.BackendConfig[backends.ConfigGateways.String()]
-	count := len(config)
-	if count == 0 {
+	if len(config) == 0 {
 		return list, errors.New("no backends configured")
 	}
-	list = make([]backends.Backend, count)
+	list = make([]backends.Backend, 0)
 	for name := range config {
 		if b, err := backends.New(name, g.Config.BackendConfig, l); err != nil {
 			return nil, err
@@ -117,6 +116,9 @@ func New(ac *AppConfig, l log.Logger, b ...backends.Backend) (Guerrilla, error)
 			return g, err
 		}
 	}
+	if g.backends == nil {
+		g.backends = make(BackendContainer)
+	}
 	for i := range b {
 		g.storeBackend(b[i])
 	}
@@ -252,6 +254,9 @@ func (g *guerrilla) mapBackends(callback func(backend backends.Backend) error) (
 			e = append(e, err)
 		}
 	}
+	if len(e) == 0 {
+		return g.backends, nil
+	}
 	return g.backends, e
 }
 
@@ -625,6 +630,7 @@ func (g *guerrilla) Shutdown() {
 	if _, err := g.mapBackends(func(b backends.Backend) error {
 		return b.Shutdown()
 	}); err != nil {
+		fmt.Println(err)
 		g.mainlog().WithError(err).Warn("Backend failed to shutdown")
 	} else {
 		g.mainlog().Infof("Backend shutdown completed")

+ 1 - 1
tests/guerrilla_test.go

@@ -83,7 +83,7 @@ var configJson = `
 	
     "backend" : {
 		"processors" : {
-			"debugger" {
+			"debugger" : {
 				"log_received_mails" : true
 			}
 		}