|
@@ -2,8 +2,10 @@ package guerrilla
|
|
|
|
|
|
import (
|
|
import (
|
|
"errors"
|
|
"errors"
|
|
|
|
+ "fmt"
|
|
"github.com/flashmob/go-guerrilla/backends"
|
|
"github.com/flashmob/go-guerrilla/backends"
|
|
"github.com/flashmob/go-guerrilla/log"
|
|
"github.com/flashmob/go-guerrilla/log"
|
|
|
|
+ "os"
|
|
"sync"
|
|
"sync"
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
)
|
|
)
|
|
@@ -44,18 +46,22 @@ type Guerrilla interface {
|
|
type guerrilla struct {
|
|
type guerrilla struct {
|
|
Config AppConfig
|
|
Config AppConfig
|
|
servers map[string]*server
|
|
servers map[string]*server
|
|
- backend backends.Backend
|
|
|
|
// guard controls access to g.servers
|
|
// guard controls access to g.servers
|
|
guard sync.Mutex
|
|
guard sync.Mutex
|
|
state int8
|
|
state int8
|
|
EventHandler
|
|
EventHandler
|
|
logStore
|
|
logStore
|
|
|
|
+ backendStore
|
|
}
|
|
}
|
|
|
|
|
|
type logStore struct {
|
|
type logStore struct {
|
|
atomic.Value
|
|
atomic.Value
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+type backendStore struct {
|
|
|
|
+ atomic.Value
|
|
|
|
+}
|
|
|
|
+
|
|
// Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
|
|
// Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
|
|
func (ls *logStore) mainlog() log.Logger {
|
|
func (ls *logStore) mainlog() log.Logger {
|
|
if v, ok := ls.Load().(log.Logger); ok {
|
|
if v, ok := ls.Load().(log.Logger); ok {
|
|
@@ -75,8 +81,8 @@ func New(ac *AppConfig, b backends.Backend, l log.Logger) (Guerrilla, error) {
|
|
g := &guerrilla{
|
|
g := &guerrilla{
|
|
Config: *ac, // take a local copy
|
|
Config: *ac, // take a local copy
|
|
servers: make(map[string]*server, len(ac.Servers)),
|
|
servers: make(map[string]*server, len(ac.Servers)),
|
|
- backend: b,
|
|
|
|
}
|
|
}
|
|
|
|
+ g.backendStore.Store(b)
|
|
g.setMainlog(l)
|
|
g.setMainlog(l)
|
|
|
|
|
|
if ac.LogLevel != "" {
|
|
if ac.LogLevel != "" {
|
|
@@ -87,19 +93,24 @@ func New(ac *AppConfig, b backends.Backend, l log.Logger) (Guerrilla, error) {
|
|
err := g.makeServers()
|
|
err := g.makeServers()
|
|
|
|
|
|
// start backend for processing email
|
|
// start backend for processing email
|
|
- err = g.backend.Start()
|
|
|
|
|
|
+ err = g.backend().Start()
|
|
|
|
+
|
|
if err != nil {
|
|
if err != nil {
|
|
return g, err
|
|
return g, err
|
|
}
|
|
}
|
|
|
|
+ g.writePid()
|
|
|
|
|
|
// subscribe for any events that may come in while running
|
|
// subscribe for any events that may come in while running
|
|
g.subscribeEvents()
|
|
g.subscribeEvents()
|
|
|
|
+
|
|
return g, err
|
|
return g, err
|
|
}
|
|
}
|
|
|
|
|
|
// Instantiate servers
|
|
// Instantiate servers
|
|
func (g *guerrilla) makeServers() error {
|
|
func (g *guerrilla) makeServers() error {
|
|
g.mainlog().Debug("making servers")
|
|
g.mainlog().Debug("making servers")
|
|
|
|
+ g.guard.Lock()
|
|
|
|
+ defer g.guard.Unlock()
|
|
var errs Errors
|
|
var errs Errors
|
|
for _, sc := range g.Config.Servers {
|
|
for _, sc := range g.Config.Servers {
|
|
if _, ok := g.servers[sc.ListenInterface]; ok {
|
|
if _, ok := g.servers[sc.ListenInterface]; ok {
|
|
@@ -111,7 +122,7 @@ func (g *guerrilla) makeServers() error {
|
|
errs = append(errs, err)
|
|
errs = append(errs, err)
|
|
continue
|
|
continue
|
|
} else {
|
|
} else {
|
|
- server, err := newServer(&sc, g.backend, g.mainlog())
|
|
|
|
|
|
+ server, err := newServer(&sc, g.backend(), g.mainlog())
|
|
if err != nil {
|
|
if err != nil {
|
|
g.mainlog().WithError(err).Errorf("Failed to create server [%s]", sc.ListenInterface)
|
|
g.mainlog().WithError(err).Errorf("Failed to create server [%s]", sc.ListenInterface)
|
|
errs = append(errs, err)
|
|
errs = append(errs, err)
|
|
@@ -131,7 +142,7 @@ func (g *guerrilla) makeServers() error {
|
|
return errs
|
|
return errs
|
|
}
|
|
}
|
|
|
|
|
|
-// find a server by interface, retuning the server or err
|
|
|
|
|
|
+// findServer finds a server by iface (interface), retuning the server or err
|
|
func (g *guerrilla) findServer(iface string) (*server, error) {
|
|
func (g *guerrilla) findServer(iface string) (*server, error) {
|
|
g.guard.Lock()
|
|
g.guard.Lock()
|
|
defer g.guard.Unlock()
|
|
defer g.guard.Unlock()
|
|
@@ -141,6 +152,7 @@ func (g *guerrilla) findServer(iface string) (*server, error) {
|
|
return nil, errors.New("server not found in g.servers")
|
|
return nil, errors.New("server not found in g.servers")
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// removeServer removes a server from the list of servers
|
|
func (g *guerrilla) removeServer(iface string) {
|
|
func (g *guerrilla) removeServer(iface string) {
|
|
g.guard.Lock()
|
|
g.guard.Lock()
|
|
defer g.guard.Unlock()
|
|
defer g.guard.Unlock()
|
|
@@ -222,6 +234,11 @@ func (g *guerrilla) subscribeEvents() {
|
|
g.mainlog().Infof("log level changed to [%s]", c.LogLevel)
|
|
g.mainlog().Infof("log level changed to [%s]", c.LogLevel)
|
|
})
|
|
})
|
|
|
|
|
|
|
|
+ // write out our pid whenever the file name changes in the config
|
|
|
|
+ g.Subscribe(EventConfigPidFile, func(ac *AppConfig) {
|
|
|
|
+ g.writePid()
|
|
|
|
+ })
|
|
|
|
+
|
|
// server config was updated
|
|
// server config was updated
|
|
g.Subscribe(EventConfigServerConfig, func(sc *ServerConfig) {
|
|
g.Subscribe(EventConfigServerConfig, func(sc *ServerConfig) {
|
|
g.setServerConfig(sc)
|
|
g.setServerConfig(sc)
|
|
@@ -229,8 +246,10 @@ func (g *guerrilla) subscribeEvents() {
|
|
|
|
|
|
// add a new server to the config & start
|
|
// add a new server to the config & start
|
|
g.Subscribe(EventConfigServerNew, func(sc *ServerConfig) {
|
|
g.Subscribe(EventConfigServerNew, func(sc *ServerConfig) {
|
|
|
|
+ g.mainlog().Debugf("event fired [%s] %s", EventConfigServerNew, sc.ListenInterface)
|
|
if _, err := g.findServer(sc.ListenInterface); err != nil {
|
|
if _, err := g.findServer(sc.ListenInterface); err != nil {
|
|
// not found, lets add it
|
|
// not found, lets add it
|
|
|
|
+ //
|
|
if err := g.makeServers(); err != nil {
|
|
if err := g.makeServers(); err != nil {
|
|
g.mainlog().WithError(err).Errorf("cannot add server [%s]", sc.ListenInterface)
|
|
g.mainlog().WithError(err).Errorf("cannot add server [%s]", sc.ListenInterface)
|
|
return
|
|
return
|
|
@@ -242,6 +261,8 @@ func (g *guerrilla) subscribeEvents() {
|
|
g.mainlog().WithError(err).Info("Event server_change:new_server returned errors when starting")
|
|
g.mainlog().WithError(err).Info("Event server_change:new_server returned errors when starting")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ } else {
|
|
|
|
+ g.mainlog().Debugf("new event, but server already fund")
|
|
}
|
|
}
|
|
})
|
|
})
|
|
// start a server that already exists in the config and has been enabled
|
|
// start a server that already exists in the config and has been enabled
|
|
@@ -306,7 +327,6 @@ func (g *guerrilla) subscribeEvents() {
|
|
backends.Svc.SetMainlog(l)
|
|
backends.Svc.SetMainlog(l)
|
|
// it will change to the new logger on the next accepted client
|
|
// it will change to the new logger on the next accepted client
|
|
server.logStore.Store(l)
|
|
server.logStore.Store(l)
|
|
-
|
|
|
|
g.mainlog().Infof("Server [%s] changed, new clients will log to: [%s]",
|
|
g.mainlog().Infof("Server [%s] changed, new clients will log to: [%s]",
|
|
sc.ListenInterface,
|
|
sc.ListenInterface,
|
|
sc.LogFile,
|
|
sc.LogFile,
|
|
@@ -332,19 +352,19 @@ func (g *guerrilla) subscribeEvents() {
|
|
logger, _ := log.GetLogger(appConfig.LogFile)
|
|
logger, _ := log.GetLogger(appConfig.LogFile)
|
|
// shutdown the backend first.
|
|
// shutdown the backend first.
|
|
var err error
|
|
var err error
|
|
- if err = g.backend.Shutdown(); err != nil {
|
|
|
|
|
|
+ if err = g.backend().Shutdown(); err != nil {
|
|
logger.WithError(err).Warn("Backend failed to shutdown")
|
|
logger.WithError(err).Warn("Backend failed to shutdown")
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- // init a new backend, Revert to old backend config if it failes
|
|
|
|
|
|
+ // init a new backend, Revert to old backend config if it fails
|
|
if newBackend, newErr := backends.New(appConfig.BackendConfig, logger); newErr != nil {
|
|
if newBackend, newErr := backends.New(appConfig.BackendConfig, logger); newErr != nil {
|
|
logger.WithError(newErr).Error("Error while loading the backend")
|
|
logger.WithError(newErr).Error("Error while loading the backend")
|
|
- err = g.backend.Reinitialize()
|
|
|
|
|
|
+ err = g.backend().Reinitialize()
|
|
if err != nil {
|
|
if err != nil {
|
|
logger.WithError(err).Fatal("failed to revert to old backend config")
|
|
logger.WithError(err).Fatal("failed to revert to old backend config")
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- err = g.backend.Start()
|
|
|
|
|
|
+ err = g.backend().Start()
|
|
if err != nil {
|
|
if err != nil {
|
|
logger.WithError(err).Fatal("failed to start backend with old config")
|
|
logger.WithError(err).Fatal("failed to start backend with old config")
|
|
return
|
|
return
|
|
@@ -352,14 +372,29 @@ func (g *guerrilla) subscribeEvents() {
|
|
logger.Info("reverted to old backend config")
|
|
logger.Info("reverted to old backend config")
|
|
} else {
|
|
} else {
|
|
// swap to the bew backend (assuming old backend was shutdown so it can be safely swapped)
|
|
// swap to the bew backend (assuming old backend was shutdown so it can be safely swapped)
|
|
- newBackend.Start()
|
|
|
|
- g.backend = newBackend
|
|
|
|
- logger.Info("new backend started")
|
|
|
|
|
|
+ if err := newBackend.Start(); err != nil {
|
|
|
|
+ logger.WithError(err).Error("backend could not start")
|
|
|
|
+ }
|
|
|
|
+ g.storeBackend(newBackend)
|
|
}
|
|
}
|
|
})
|
|
})
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func (g *guerrilla) storeBackend(b backends.Backend) {
|
|
|
|
+ g.backendStore.Store(b)
|
|
|
|
+ g.mapServers(func(server *server) {
|
|
|
|
+ server.setBackend(b)
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (g *guerrilla) backend() backends.Backend {
|
|
|
|
+ if b, ok := g.backendStore.Load().(backends.Backend); ok {
|
|
|
|
+ return b
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
// Entry point for the application. Starts all servers.
|
|
// Entry point for the application. Starts all servers.
|
|
func (g *guerrilla) Start() error {
|
|
func (g *guerrilla) Start() error {
|
|
var startErrors Errors
|
|
var startErrors Errors
|
|
@@ -373,8 +408,8 @@ func (g *guerrilla) Start() error {
|
|
}
|
|
}
|
|
if g.state == GuerrillaStateStopped {
|
|
if g.state == GuerrillaStateStopped {
|
|
// when a backend is shutdown, we need to re-initialize before it can be started again
|
|
// when a backend is shutdown, we need to re-initialize before it can be started again
|
|
- g.backend.Reinitialize()
|
|
|
|
- g.backend.Start()
|
|
|
|
|
|
+ g.backend().Reinitialize()
|
|
|
|
+ g.backend().Start()
|
|
}
|
|
}
|
|
// channel for reading errors
|
|
// channel for reading errors
|
|
errs := make(chan error, len(g.servers))
|
|
errs := make(chan error, len(g.servers))
|
|
@@ -382,7 +417,6 @@ func (g *guerrilla) Start() error {
|
|
|
|
|
|
// start servers, send any errors back to errs channel
|
|
// start servers, send any errors back to errs channel
|
|
for ListenInterface := range g.servers {
|
|
for ListenInterface := range g.servers {
|
|
- g.mainlog().Infof("Starting: %s", ListenInterface)
|
|
|
|
if !g.servers[ListenInterface].isEnabled() {
|
|
if !g.servers[ListenInterface].isEnabled() {
|
|
// not enabled
|
|
// not enabled
|
|
continue
|
|
continue
|
|
@@ -393,6 +427,7 @@ func (g *guerrilla) Start() error {
|
|
}
|
|
}
|
|
startWG.Add(1)
|
|
startWG.Add(1)
|
|
go func(s *server) {
|
|
go func(s *server) {
|
|
|
|
+ g.mainlog().Infof("Starting: %s", s.listenInterface)
|
|
if err := s.Start(&startWG); err != nil {
|
|
if err := s.Start(&startWG); err != nil {
|
|
errs <- err
|
|
errs <- err
|
|
}
|
|
}
|
|
@@ -420,13 +455,14 @@ func (g *guerrilla) Shutdown() {
|
|
g.state = GuerrillaStateStopped
|
|
g.state = GuerrillaStateStopped
|
|
defer g.guard.Unlock()
|
|
defer g.guard.Unlock()
|
|
}()
|
|
}()
|
|
- for ListenInterface, s := range g.servers {
|
|
|
|
|
|
+ g.mapServers(func(s *server) {
|
|
if s.state == ServerStateRunning {
|
|
if s.state == ServerStateRunning {
|
|
s.Shutdown()
|
|
s.Shutdown()
|
|
- g.mainlog().Infof("shutdown completed for [%s]", ListenInterface)
|
|
|
|
|
|
+ g.mainlog().Infof("shutdown completed for [%s]", s.listenInterface)
|
|
}
|
|
}
|
|
- }
|
|
|
|
- if err := g.backend.Shutdown(); err != nil {
|
|
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ if err := g.backend().Shutdown(); err != nil {
|
|
g.mainlog().WithError(err).Warn("Backend failed to shutdown")
|
|
g.mainlog().WithError(err).Warn("Backend failed to shutdown")
|
|
} else {
|
|
} else {
|
|
g.mainlog().Infof("Backend shutdown completed")
|
|
g.mainlog().Infof("Backend shutdown completed")
|
|
@@ -439,3 +475,25 @@ func (g *guerrilla) SetLogger(l log.Logger) {
|
|
g.setMainlog(l)
|
|
g.setMainlog(l)
|
|
backends.Svc.SetMainlog(l)
|
|
backends.Svc.SetMainlog(l)
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+// writePid writes the pid (process id) to the file specified in the config.
|
|
|
|
+// Won't write anything if no file specified
|
|
|
|
+func (g *guerrilla) writePid() error {
|
|
|
|
+ if len(g.Config.PidFile) > 0 {
|
|
|
|
+ if f, err := os.Create(g.Config.PidFile); err == nil {
|
|
|
|
+ defer f.Close()
|
|
|
|
+ pid := os.Getpid()
|
|
|
|
+ if _, err := f.WriteString(fmt.Sprintf("%d", pid)); err == nil {
|
|
|
|
+ f.Sync()
|
|
|
|
+ g.mainlog().Infof("pid_file (%s) written with pid:%v", g.Config.PidFile, pid)
|
|
|
|
+ } else {
|
|
|
|
+ g.mainlog().WithError(err).Errorf("Error while writing pidFile (%s)", g.Config.PidFile)
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ g.mainlog().WithError(err).Errorf("Error while creating pidFile (%s)", g.Config.PidFile)
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|