|
@@ -3,7 +3,9 @@ package guerrilla
|
|
|
import (
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
+ "github.com/sirupsen/logrus"
|
|
|
"os"
|
|
|
+ "strings"
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
|
|
@@ -52,19 +54,20 @@ type guerrilla struct {
|
|
|
state int8
|
|
|
EventHandler
|
|
|
logStore
|
|
|
- backendStore
|
|
|
+
|
|
|
+ beGuard sync.Mutex
|
|
|
+ backends BackendContainer
|
|
|
}
|
|
|
|
|
|
type logStore struct {
|
|
|
atomic.Value
|
|
|
}
|
|
|
|
|
|
-type backendStore struct {
|
|
|
- atomic.Value
|
|
|
-}
|
|
|
+type BackendContainer map[string]backends.Backend
|
|
|
|
|
|
type daemonEvent func(c *AppConfig)
|
|
|
type serverEvent func(sc *ServerConfig)
|
|
|
+type backendEvent func(c *AppConfig, gateway string)
|
|
|
|
|
|
// Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
|
|
|
func (ls *logStore) mainlog() log.Logger {
|
|
@@ -80,13 +83,43 @@ func (ls *logStore) setMainlog(log log.Logger) {
|
|
|
ls.Store(log)
|
|
|
}
|
|
|
|
|
|
+// makeConfiguredBackends makes backends from the config
|
|
|
+func (g *guerrilla) makeConfiguredBackends(l log.Logger) ([]backends.Backend, error) {
|
|
|
+ var list []backends.Backend
|
|
|
+ config := g.Config.BackendConfig[backends.ConfigGateways.String()]
|
|
|
+ count := len(config)
|
|
|
+ if count == 0 {
|
|
|
+ return list, errors.New("no backends configured")
|
|
|
+ }
|
|
|
+ list = make([]backends.Backend, count)
|
|
|
+ for name := range config {
|
|
|
+ if b, err := backends.New(name, g.Config.BackendConfig, l); err != nil {
|
|
|
+ return nil, err
|
|
|
+ } else {
|
|
|
+ list = append(list, b)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return list, nil
|
|
|
+}
|
|
|
+
|
|
|
+// New creates a new Guerrilla instance configured with backends and a logger
|
|
|
// Returns a new instance of Guerrilla with the given config, not yet running. Backend started.
|
|
|
-func New(ac *AppConfig, b backends.Backend, l log.Logger) (Guerrilla, error) {
|
|
|
+// b can be nil. If nil. then it will use the config to make the backends
|
|
|
+func New(ac *AppConfig, l log.Logger, b ...backends.Backend) (Guerrilla, error) {
|
|
|
g := &guerrilla{
|
|
|
Config: *ac, // take a local copy
|
|
|
servers: make(map[string]*server, len(ac.Servers)),
|
|
|
}
|
|
|
- g.backendStore.Store(b)
|
|
|
+ if 0 == len(b) {
|
|
|
+ var err error
|
|
|
+ b, err = g.makeConfiguredBackends(l)
|
|
|
+ if err != nil {
|
|
|
+ return g, err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ for i := range b {
|
|
|
+ g.storeBackend(b[i])
|
|
|
+ }
|
|
|
g.setMainlog(l)
|
|
|
|
|
|
if ac.LogLevel != "" {
|
|
@@ -106,8 +139,11 @@ func New(ac *AppConfig, b backends.Backend, l log.Logger) (Guerrilla, error) {
|
|
|
return g, err
|
|
|
}
|
|
|
|
|
|
- // start backend for processing email
|
|
|
- err = g.backend().Start()
|
|
|
+ // start backends for processing email
|
|
|
+ _, err = g.mapBackends(func(b backends.Backend) error {
|
|
|
+ return b.Start()
|
|
|
+ })
|
|
|
+
|
|
|
if err != nil {
|
|
|
return g, err
|
|
|
}
|
|
@@ -133,7 +169,7 @@ func (g *guerrilla) makeServers() error {
|
|
|
continue
|
|
|
} else {
|
|
|
sc := sc // pin!
|
|
|
- server, err := newServer(&sc, g.backend(), g.mainlog())
|
|
|
+ server, err := newServer(&sc, g.backend(sc.Gateway), g.mainlog())
|
|
|
if err != nil {
|
|
|
g.mainlog().WithError(err).Errorf("Failed to create server [%s]", sc.ListenInterface)
|
|
|
errs = append(errs, err)
|
|
@@ -197,6 +233,28 @@ func (g *guerrilla) mapServers(callback func(*server)) map[string]*server {
|
|
|
return g.servers
|
|
|
}
|
|
|
|
|
|
+type mapBackendErrors []error
|
|
|
+
|
|
|
+func (e mapBackendErrors) Error() string {
|
|
|
+ data := make([]string, len(e))
|
|
|
+ for i, s := range e {
|
|
|
+ data[i] = fmt.Sprint(s)
|
|
|
+ }
|
|
|
+ return strings.Join(data, ",")
|
|
|
+}
|
|
|
+
|
|
|
+func (g *guerrilla) mapBackends(callback func(backend backends.Backend) error) (BackendContainer, error) {
|
|
|
+ defer g.beGuard.Unlock()
|
|
|
+ g.beGuard.Lock()
|
|
|
+ var e mapBackendErrors
|
|
|
+ for name := range g.backends {
|
|
|
+ if err := callback(g.backends[name]); err != nil {
|
|
|
+ e = append(e, err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return g.backends, e
|
|
|
+}
|
|
|
+
|
|
|
// subscribeEvents subscribes event handlers for configuration change events
|
|
|
func (g *guerrilla) subscribeEvents() {
|
|
|
|
|
@@ -268,7 +326,6 @@ func (g *guerrilla) subscribeEvents() {
|
|
|
g.mainlog().Debugf("event fired [%s] %s", EventConfigServerNew, sc.ListenInterface)
|
|
|
if _, err := g.findServer(sc.ListenInterface); err != nil {
|
|
|
// not found, lets add it
|
|
|
- //
|
|
|
if err := g.makeServers(); err != nil {
|
|
|
g.mainlog().WithError(err).Errorf("cannot add server [%s]", sc.ListenInterface)
|
|
|
return
|
|
@@ -373,38 +430,82 @@ func (g *guerrilla) subscribeEvents() {
|
|
|
g.mainlog().Infof("Server [%s] re-opened log file [%s]", sc.ListenInterface, sc.LogFile)
|
|
|
}
|
|
|
})
|
|
|
- // when the backend changes
|
|
|
- events[EventConfigBackendConfig] = daemonEvent(func(appConfig *AppConfig) {
|
|
|
+
|
|
|
+ // when the server's gateway setting changed
|
|
|
+ events[EventConfigServerGatewayConfig] = serverEvent(func(sc *ServerConfig) {
|
|
|
+ b := g.backend(sc.Gateway)
|
|
|
+ if b == nil {
|
|
|
+ g.mainlog().WithField("gateway", sc.Gateway).Error("could not change to gateway, not configured")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ g.storeBackend(b)
|
|
|
+ })
|
|
|
+
|
|
|
+ events[EventConfigBackendConfigChanged] = backendEvent(func(appConfig *AppConfig, name string) {
|
|
|
logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel)
|
|
|
// shutdown the backend first.
|
|
|
var err error
|
|
|
- if err = g.backend().Shutdown(); err != nil {
|
|
|
- logger.WithError(err).Warn("Backend failed to shutdown")
|
|
|
+ fields := logrus.Fields{"error": err, "gateway": name}
|
|
|
+ if err = g.backend(name).Shutdown(); err != nil {
|
|
|
+ logger.WithFields(fields).Warn("gateway failed to shutdown")
|
|
|
return
|
|
|
}
|
|
|
// init a new backend, Revert to old backend config if it fails
|
|
|
- if newBackend, newErr := backends.New(appConfig.BackendConfig, logger); newErr != nil {
|
|
|
- logger.WithError(newErr).Error("Error while loading the backend")
|
|
|
- err = g.backend().Reinitialize()
|
|
|
+ if newBackend, newErr := backends.New(name, appConfig.BackendConfig, logger); newErr != nil {
|
|
|
+ logger.WithFields(fields).Error("error while loading the gateway")
|
|
|
+ err = g.backend(name).Reinitialize()
|
|
|
if err != nil {
|
|
|
- logger.WithError(err).Fatal("failed to revert to old backend config")
|
|
|
+ logger.WithFields(fields).Fatal("failed to revert to old gateway config")
|
|
|
return
|
|
|
}
|
|
|
- err = g.backend().Start()
|
|
|
+ err = g.backend(name).Start()
|
|
|
if err != nil {
|
|
|
- logger.WithError(err).Fatal("failed to start backend with old config")
|
|
|
+ logger.WithFields(fields).Fatal("failed to start gateway with old config")
|
|
|
return
|
|
|
}
|
|
|
- logger.Info("reverted to old backend config")
|
|
|
+ logger.WithField("gateway", name).Info("reverted to old gateway config")
|
|
|
} else {
|
|
|
- // swap to the bew backend (assuming old backend was shutdown so it can be safely swapped)
|
|
|
+ // swap to the bew backend (assuming old gateway was shutdown so it can be safely swapped)
|
|
|
if err := newBackend.Start(); err != nil {
|
|
|
- logger.WithError(err).Error("backend could not start")
|
|
|
+ logger.WithFields(fields).Error("gateway could not start")
|
|
|
}
|
|
|
- logger.Info("new backend started")
|
|
|
+ logger.WithField("gateway", name).Info("new gateway started")
|
|
|
g.storeBackend(newBackend)
|
|
|
}
|
|
|
})
|
|
|
+
|
|
|
+ // a new gateway was added
|
|
|
+ events[EventConfigBackendConfigAdded] = backendEvent(func(appConfig *AppConfig, name string) {
|
|
|
+ logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel)
|
|
|
+ // shutdown any old backend first.
|
|
|
+ var err error
|
|
|
+ fields := logrus.Fields{"error": err, "gateway": name}
|
|
|
+ if newBackend, newErr := backends.New(name, appConfig.BackendConfig, logger); newErr != nil {
|
|
|
+ logger.WithFields(fields).Error("Error while loading the gateway")
|
|
|
+ } else {
|
|
|
+ // swap to the bew gateway (assuming old gateway was shutdown so it can be safely swapped)
|
|
|
+ if err := newBackend.Start(); err != nil {
|
|
|
+ logger.WithFields(fields).Error("gateway could not start")
|
|
|
+ }
|
|
|
+ logger.WithField("gateway", name).Info("new gateway started")
|
|
|
+ g.storeBackend(newBackend)
|
|
|
+ }
|
|
|
+
|
|
|
+ })
|
|
|
+
|
|
|
+ // remove a gateway (shut it down)
|
|
|
+ events[EventConfigBackendConfigRemoved] = backendEvent(func(appConfig *AppConfig, name string) {
|
|
|
+ logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel)
|
|
|
+ // shutdown the backend first.
|
|
|
+ var err error
|
|
|
+ if err = g.backend(name).Shutdown(); err != nil {
|
|
|
+ logger.WithFields(logrus.Fields{"error": err, "gateway": name}).Warn("gateway failed to shutdown")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ g.removeBackend(g.backend(name))
|
|
|
+ logger.WithField("gateway", name).Info("gateway removed")
|
|
|
+ })
|
|
|
+
|
|
|
var err error
|
|
|
for topic, fn := range events {
|
|
|
switch f := fn.(type) {
|
|
@@ -418,18 +519,28 @@ func (g *guerrilla) subscribeEvents() {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
+}
|
|
|
+
|
|
|
+func (g *guerrilla) removeBackend(b backends.Backend) {
|
|
|
+ g.beGuard.Lock()
|
|
|
+ defer g.beGuard.Unlock()
|
|
|
+ delete(g.backends, b.Name())
|
|
|
|
|
|
}
|
|
|
|
|
|
func (g *guerrilla) storeBackend(b backends.Backend) {
|
|
|
- g.backendStore.Store(b)
|
|
|
+ g.beGuard.Lock()
|
|
|
+ defer g.beGuard.Unlock()
|
|
|
+ g.backends[b.Name()] = b
|
|
|
g.mapServers(func(server *server) {
|
|
|
server.setBackend(b)
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-func (g *guerrilla) backend() backends.Backend {
|
|
|
- if b, ok := g.backendStore.Load().(backends.Backend); ok {
|
|
|
+func (g *guerrilla) backend(name string) backends.Backend {
|
|
|
+ g.beGuard.Lock()
|
|
|
+ defer g.beGuard.Unlock()
|
|
|
+ if b, ok := g.backends[name]; ok {
|
|
|
return b
|
|
|
}
|
|
|
return nil
|
|
@@ -448,10 +559,12 @@ func (g *guerrilla) Start() error {
|
|
|
}
|
|
|
if g.state == daemonStateStopped {
|
|
|
// when a backend is shutdown, we need to re-initialize before it can be started again
|
|
|
- if err := g.backend().Reinitialize(); err != nil {
|
|
|
- startErrors = append(startErrors, err)
|
|
|
- }
|
|
|
- if err := g.backend().Start(); err != nil {
|
|
|
+ if _, err := g.mapBackends(func(b backends.Backend) error {
|
|
|
+ if err := b.Reinitialize(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return b.Start()
|
|
|
+ }); err != nil {
|
|
|
startErrors = append(startErrors, err)
|
|
|
}
|
|
|
}
|
|
@@ -508,7 +621,10 @@ func (g *guerrilla) Shutdown() {
|
|
|
g.state = daemonStateStopped
|
|
|
defer g.guard.Unlock()
|
|
|
}()
|
|
|
- if err := g.backend().Shutdown(); err != nil {
|
|
|
+
|
|
|
+ if _, err := g.mapBackends(func(b backends.Backend) error {
|
|
|
+ return b.Shutdown()
|
|
|
+ }); err != nil {
|
|
|
g.mainlog().WithError(err).Warn("Backend failed to shutdown")
|
|
|
} else {
|
|
|
g.mainlog().Infof("Backend shutdown completed")
|