package guerrilla import ( "errors" "fmt" "os" "strings" "sync" "sync/atomic" "github.com/flashmob/go-guerrilla/backends" "github.com/flashmob/go-guerrilla/log" ) const ( // all configured servers were just been created daemonStateNew = iota // ... been started and running daemonStateStarted // ... been stopped daemonStateStopped ) type Errors []error // implement the Error interface func (e Errors) Error() string { if len(e) == 1 { return e[0].Error() } // multiple errors msg := "" for _, err := range e { msg += "\n" + err.Error() } return msg } type Guerrilla interface { Start() error Shutdown() Subscribe(topic Event, fn interface{}) error Publish(topic Event, args ...interface{}) Unsubscribe(topic Event, handler interface{}) error SetLogger(log.Logger) } type guerrilla struct { Config AppConfig servers map[string]*server // guard controls access to g.servers guard sync.Mutex state int8 EventHandler logStore beGuard sync.Mutex backends BackendContainer } type logStore struct { atomic.Value } type BackendContainer map[string]backends.Backend type daemonEvent func(c *AppConfig) type serverEvent func(sc *ServerConfig) type backendEvent func(c *AppConfig, gateway string) // Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load func (ls *logStore) mainlog() log.Logger { if v, ok := ls.Load().(log.Logger); ok { return v } l, _ := log.GetLogger(log.OutputStderr.String(), log.InfoLevel.String()) return l } // setMainlog stores the log value in an atomic operation func (ls *logStore) setMainlog(log log.Logger) { ls.Store(log) } // makeConfiguredBackends makes backends from the config func (g *guerrilla) makeConfiguredBackends(l log.Logger) ([]backends.Backend, error) { var list []backends.Backend config := g.Config.BackendConfig[backends.ConfigGateways] if len(config) == 0 { return list, errors.New("no backends configured") } list = make([]backends.Backend, 0) for name := range config { if b, err := backends.New(name, g.Config.BackendConfig, l); err != nil { return nil, err } else { list = append(list, b) } } return list, nil } // New creates a new Guerrilla instance configured with backends and a logger // Returns a new instance of Guerrilla with the given config, not yet running. Backend started. // b can be nil. If nil. then it will use the config to make the backends func New(ac *AppConfig, l log.Logger, b ...backends.Backend) (Guerrilla, error) { g := &guerrilla{ Config: *ac, // take a local copy servers: make(map[string]*server, len(ac.Servers)), } if 0 == len(b) { var err error b, err = g.makeConfiguredBackends(l) if err != nil { return g, err } } if g.backends == nil { g.backends = make(BackendContainer) } for i := range b { if b[i] == nil { return g, errors.New("cannot use a nil backend") } g.storeBackend(b[i]) } g.setMainlog(l) if ac.LogLevel != "" { if h, ok := l.(*log.HookedLogger); ok { if h, err := log.GetLogger(h.GetLogDest(), ac.LogLevel); err == nil { g.setMainlog(h) } } } // Write the process id (pid) to a file // we should still be able to continue even if we can't write the pid, error will be logged by writePid() _ = g.writePid() g.state = daemonStateNew err := g.makeServers() if err != nil { return g, err } // start backends for processing email _, err = g.mapBackends(func(b backends.Backend) error { return b.Start() }) if err != nil { return g, err } // subscribe for any events that may come in while running g.subscribeEvents() return g, err } // Instantiate servers func (g *guerrilla) makeServers() error { g.mainlog().Debug("making servers") var errs Errors for serverID, sc := range g.Config.Servers { if _, ok := g.servers[sc.ListenInterface]; ok { // server already instantiated continue } if err := sc.Validate(); err != nil { g.mainlog().Fields("error", errs, "iface", sc.ListenInterface). Error("failed to create server") errs = append(errs, err) continue } else { sc := sc // pin! server, err := newServer(&sc, g.backend(sc.Gateway), g.mainlog(), serverID) if err != nil { g.mainlog().Fields("error", err, "iface", sc.ListenInterface). Error("failed to create server") errs = append(errs, err) } if server != nil { g.servers[sc.ListenInterface] = server server.setAllowedHosts(g.Config.AllowedHosts) } } } if len(g.servers) == 0 { errs = append(errs, errors.New("there are no servers that can start, please check your config")) } if len(errs) == 0 { return nil } return errs } // findServer finds a server by iface (interface), retuning the server or err func (g *guerrilla) findServer(iface string) (*server, error) { g.guard.Lock() defer g.guard.Unlock() if server, ok := g.servers[iface]; ok { return server, nil } return nil, errors.New("server not found in g.servers") } // removeServer removes a server from the list of servers func (g *guerrilla) removeServer(iface string) { g.guard.Lock() defer g.guard.Unlock() delete(g.servers, iface) } // setConfig sets the app config func (g *guerrilla) setConfig(c *AppConfig) { g.guard.Lock() defer g.guard.Unlock() g.Config = *c } // setServerConfig config updates the server's config, which will update for the next connected client func (g *guerrilla) setServerConfig(sc *ServerConfig) { g.guard.Lock() defer g.guard.Unlock() if _, ok := g.servers[sc.ListenInterface]; ok { g.servers[sc.ListenInterface].setConfig(sc) } } // mapServers calls a callback on each server in g.servers map // It locks the g.servers map before mapping func (g *guerrilla) mapServers(callback func(*server)) map[string]*server { defer g.guard.Unlock() g.guard.Lock() for _, server := range g.servers { callback(server) } return g.servers } type mapBackendErrors []error func (e mapBackendErrors) Error() string { data := make([]string, len(e)) for i, s := range e { data[i] = fmt.Sprint(s) } return strings.Join(data, ",") } func (g *guerrilla) mapBackends(callback func(backend backends.Backend) error) (BackendContainer, error) { defer g.beGuard.Unlock() g.beGuard.Lock() var e mapBackendErrors for name := range g.backends { if err := callback(g.backends[name]); err != nil { e = append(e, err) } } if len(e) == 0 { return g.backends, nil } return g.backends, e } // subscribeEvents subscribes event handlers for configuration change events func (g *guerrilla) subscribeEvents() { events := map[Event]interface{}{} // main config changed events[EventConfigNewConfig] = daemonEvent(func(c *AppConfig) { g.setConfig(c) }) // allowed_hosts changed, set for all servers events[EventConfigAllowedHosts] = daemonEvent(func(c *AppConfig) { g.mapServers(func(server *server) { server.setAllowedHosts(c.AllowedHosts) g.mainlog().Fields("serverID", server.serverID, "event", EventConfigAllowedHosts). Info("allowed_hosts config changed, a new list was set") }) }) // the main log file changed events[EventConfigLogFile] = daemonEvent(func(c *AppConfig) { var err error var l log.Logger if l, err = log.GetLogger(c.LogFile, c.LogLevel); err == nil { g.setMainlog(l) g.mapServers(func(server *server) { // it will change server's logger when the next client gets accepted server.mainlogStore.Store(l) }) g.mainlog().Fields("file", c.LogFile). Info("main log for new clients changed") } else { g.mainlog().Fields("error", err, "file", c.LogFile). Error("main logging change failed") } }) // re-open the main log file (file not changed) events[EventConfigLogReopen] = daemonEvent(func(c *AppConfig) { err := g.mainlog().Reopen() if err != nil { g.mainlog().Fields("error", err, "file", c.LogFile). Error("main log file failed to re-open") return } g.mainlog().Fields("file", c.LogFile).Info("re-opened main log file") }) // when log level changes, apply to mainlog and server logs events[EventConfigLogLevel] = daemonEvent(func(c *AppConfig) { l, err := log.GetLogger(g.mainlog().GetLogDest(), c.LogLevel) if err == nil { g.logStore.Store(l) g.mapServers(func(server *server) { server.logStore.Store(l) }) g.mainlog().Fields("level", c.LogLevel).Info("log level changed") } }) // write out our pid whenever the file name changes in the config events[EventConfigPidFile] = daemonEvent(func(ac *AppConfig) { _ = g.writePid() }) // server config was updated events[EventConfigServerConfig] = serverEvent(func(sc *ServerConfig) { g.setServerConfig(sc) g.mainlog().Fields("iface", sc.ListenInterface). Info("server config change event, a new config has been saved") }) // add a new server to the config & start events[EventConfigServerNew] = serverEvent(func(sc *ServerConfig) { values := []interface{}{"iface", sc.ListenInterface, "event", EventConfigServerNew} g.mainlog().Fields(values...). Debug("event fired") if _, err := g.findServer(sc.ListenInterface); err != nil { // not found, lets add it if err := g.makeServers(); err != nil { g.mainlog().Fields(append(values, "error", err)...). Error("cannot add server") return } g.mainlog().Fields(values...).Info("new server added") if g.state == daemonStateStarted { err := g.Start() if err != nil { g.mainlog().Fields(append(values, "error", err)...). Error("new server errors when starting") } } } else { g.mainlog().Fields(values...). Debug("new event, but server already fund") } }) // start a server that already exists in the config and has been enabled events[EventConfigServerStart] = serverEvent(func(sc *ServerConfig) { if server, err := g.findServer(sc.ListenInterface); err == nil { fields := []interface{}{ "iface", server.listenInterface, "serverID", server.serverID, "event", EventConfigServerStart} if server.state == ServerStateStopped || server.state == ServerStateNew { g.mainlog().Fields(fields...). Info("starting server") err := g.Start() if err != nil { g.mainlog().Fields(append(fields, "error", err)...). Info("event server_change:start_server returned errors when starting") } } } }) // stop running a server events[EventConfigServerStop] = serverEvent(func(sc *ServerConfig) { if server, err := g.findServer(sc.ListenInterface); err == nil { if server.state == ServerStateRunning { server.Shutdown() g.mainlog().Fields( "event", EventConfigServerStop, "server", sc.ListenInterface, "serverID", server.serverID). Info("server stopped.") } } }) // server was removed from config events[EventConfigServerRemove] = serverEvent(func(sc *ServerConfig) { if server, err := g.findServer(sc.ListenInterface); err == nil { server.Shutdown() g.removeServer(sc.ListenInterface) g.mainlog().Fields( "event", EventConfigServerRemove, "server", sc.ListenInterface, "serverID", server.serverID). Info("server removed from config, stopped it") } }) // TLS changes events[EventConfigServerTLSConfig] = serverEvent(func(sc *ServerConfig) { if server, err := g.findServer(sc.ListenInterface); err == nil { fields := []interface{}{ "iface", server.listenInterface, "serverID", server.serverID, "event", EventConfigServerTLSConfig} if err := server.configureTLS(); err == nil { g.mainlog().Fields(fields...).Info("server new TLS configuration loaded") } else { g.mainlog().Fields(append(fields, "error", err)...). Error("Server failed to load the new TLS configuration") } } }) // when server's timeout change. events[EventConfigServerTimeout] = serverEvent(func(sc *ServerConfig) { g.mapServers(func(server *server) { fields := []interface{}{ "iface", server.listenInterface, "serverID", server.serverID, "event", EventConfigServerTimeout, "timeout", sc.Timeout, } server.setTimeout(sc.Timeout) g.mainlog().Fields(fields...).Info("server timeout set") }) }) // when server's max clients change. events[EventConfigServerMaxClients] = serverEvent(func(sc *ServerConfig) { g.mapServers(func(server *server) { // TODO resize the pool somehow }) }) // when a server's log file changes events[EventConfigServerLogFile] = serverEvent(func(sc *ServerConfig) { if server, err := g.findServer(sc.ListenInterface); err == nil { var err error var l log.Logger level := g.mainlog().GetLevel() fields := []interface{}{ "iface", server.listenInterface, "serverID", server.serverID, "event", EventConfigServerLogFile, "file", sc.LogFile, } if l, err = log.GetLogger(sc.LogFile, level); err == nil { g.setMainlog(l) backends.Svc.SetMainlog(l) // it will change to the new logger on the next accepted client server.logStore.Store(l) g.mainlog().Fields(fields...).Info("server log changed", sc.ListenInterface, sc.LogFile, ) } else { g.mainlog().Fields(append(fields, "error", err)...).Error( "server log change failed") } } }) // when the daemon caught a sighup, event for individual server events[EventConfigServerLogReopen] = serverEvent(func(sc *ServerConfig) { if server, err := g.findServer(sc.ListenInterface); err == nil { fields := []interface{}{"file", sc.LogFile, "iface", sc.ListenInterface, "serverID", server.serverID, "file", sc.LogFile, "event", EventConfigServerLogReopen} if err = server.log().Reopen(); err != nil { g.mainlog().Fields( append(fields, "error", err)...). Error("server log file failed to re-open") return } g.mainlog().Fields(fields).Info("server re-opened log file") } }) // when the server's gateway setting changed events[EventConfigServerGatewayConfig] = serverEvent(func(sc *ServerConfig) { b := g.backend(sc.Gateway) if b == nil { g.mainlog().Fields("gateway", sc.Gateway, "event", EventConfigServerGatewayConfig). Error("could not change to gateway, not configured") return } g.storeBackend(b) }) revertIfError := func(err error, name string, logger log.Logger, g *guerrilla) { if err != nil { logger.Fields("error", err, "gateway", name, "event", EventConfigServerGatewayConfig). Error("cannot change gateway config, reverting to old config") err = g.backend(name).Reinitialize() if err != nil { logger.Fields("error", err, "gateway", name, "event", EventConfigServerGatewayConfig). Error("failed to revert to old gateway config") return } err = g.backend(name).Start() if err != nil { logger.Fields("error", err, "gateway", name, "event", EventConfigServerGatewayConfig). Error("failed to start gateway with old config") return } logger.Fields("gateway", name, "event", EventConfigServerGatewayConfig). Info("reverted to old gateway config") } } events[EventConfigBackendConfigChanged] = backendEvent(func(appConfig *AppConfig, name string) { logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel) var err error // shutdown the backend first. if err = g.backend(name).Shutdown(); err != nil { logger.Fields("error", err, "gateway", name, "event", EventConfigBackendConfigChanged). Error("gateway failed to shutdown") return // we can't do anything then } if newBackend, newErr := backends.New(name, appConfig.BackendConfig, logger); newErr != nil { err = newErr revertIfError(newErr, name, logger, g) // revert to old backend return } else { if err = newBackend.Start(); err != nil { logger.Fields("error", err, "gateway", name, "event", EventConfigBackendConfigChanged). Error("gateway could not start") revertIfError(err, name, logger, g) // revert to old backend return } else { logger.Fields("gateway", name, "event", EventConfigBackendConfigChanged). Info("gateway with new config started") g.storeBackend(newBackend) } } }) // a new gateway was added events[EventConfigBackendConfigAdded] = backendEvent(func(appConfig *AppConfig, name string) { logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel) // shutdown any old backend first. if newBackend, newErr := backends.New(name, appConfig.BackendConfig, logger); newErr != nil { logger.Fields("error", newErr, "gateway", name, "event", EventConfigBackendConfigAdded). Error("cannot add new gateway") } else { // swap to the bew gateway (assuming old gateway was shutdown so it can be safely swapped) if err := newBackend.Start(); err != nil { logger.Fields("error", err, "gateway", name, "event", EventConfigBackendConfigAdded). Error("cannot start new gateway") } logger.Fields("gateway", name).Info("new gateway started") g.storeBackend(newBackend) } }) // remove a gateway (shut it down) events[EventConfigBackendConfigRemoved] = backendEvent(func(appConfig *AppConfig, name string) { logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel) // shutdown the backend first. var err error // revert defer revertIfError(err, name, logger, g) if err = g.backend(name).Shutdown(); err != nil { logger.Fields("error", err, "gateway", name, "event", EventConfigBackendConfigRemoved). Error("gateway failed to shutdown") return } g.removeBackend(g.backend(name)) logger.Fields("gateway", name, "event", EventConfigBackendConfigRemoved).Info("gateway removed") }) // subscribe all of the above events var err error for topic, fn := range events { err = g.Subscribe(topic, fn) if err != nil { g.mainlog().Fields("error", err, "event", topic). Error("failed to subscribe on topic") break } } } func (g *guerrilla) removeBackend(b backends.Backend) { g.beGuard.Lock() defer g.beGuard.Unlock() delete(g.backends, b.Name()) } func (g *guerrilla) storeBackend(b backends.Backend) { g.beGuard.Lock() defer g.beGuard.Unlock() g.backends[b.Name()] = b g.mapServers(func(server *server) { sc := server.configStore.Load().(ServerConfig) if b.Name() == sc.Gateway { server.setBackend(b) } }) } func (g *guerrilla) backend(name string) backends.Backend { g.beGuard.Lock() defer g.beGuard.Unlock() if b, ok := g.backends[name]; ok { return b } // if not found, return a random one for b := range g.backends { return g.backends[b] } return nil } // Entry point for the application. Starts all servers. func (g *guerrilla) Start() error { var startErrors Errors g.guard.Lock() defer func() { g.state = daemonStateStarted g.guard.Unlock() }() if len(g.servers) == 0 { return append(startErrors, errors.New("no servers to start, please check the config")) } if g.state == daemonStateStopped { // when a backend is shutdown, we need to re-initialize before it can be started again if _, err := g.mapBackends(func(b backends.Backend) error { if err := b.Reinitialize(); err != nil { return err } return b.Start() }); err != nil { startErrors = append(startErrors, err) } } // channel for reading errors errs := make(chan error, len(g.servers)) var startWG sync.WaitGroup // start servers, send any errors back to errs channel for ListenInterface := range g.servers { if !g.servers[ListenInterface].isEnabled() { // not enabled continue } if g.servers[ListenInterface].state != ServerStateNew && g.servers[ListenInterface].state != ServerStateStopped { continue } startWG.Add(1) go func(s *server) { g.mainlog().Fields("iface", s.listenInterface, "serverID", s.serverID). Info("starting server") if err := s.Start(&startWG); err != nil { errs <- err } }(g.servers[ListenInterface]) } // wait for all servers to start (or fail) startWG.Wait() // close, then read any errors close(errs) for err := range errs { if err != nil { startErrors = append(startErrors, err) } } if len(startErrors) > 0 { return startErrors } return nil } func (g *guerrilla) Shutdown() { // shut down the servers first g.mapServers(func(s *server) { if s.state == ServerStateRunning { s.Shutdown() g.mainlog().Fields("iface", s.listenInterface, "serverID", s.serverID).Info("shutdown completed") } }) g.guard.Lock() defer func() { g.state = daemonStateStopped defer g.guard.Unlock() }() if _, err := g.mapBackends(func(b backends.Backend) error { return b.Shutdown() }); err != nil { fmt.Println(err) g.mainlog().Fields("error", err).Error("backend failed to shutdown") } else { g.mainlog().Info("backend shutdown completed") } } // SetLogger sets the logger for the app and propagates it to sub-packages (eg. func (g *guerrilla) SetLogger(l log.Logger) { g.setMainlog(l) backends.Svc.SetMainlog(l) } // writePid writes the pid (process id) to the file specified in the config. // Won't write anything if no file specified func (g *guerrilla) writePid() (err error) { var f *os.File defer func() { if f != nil { if closeErr := f.Close(); closeErr != nil { err = closeErr } } if err != nil { g.mainlog().Fields("error", err, "file", g.Config.PidFile).Error("error while writing pidFile") } }() if len(g.Config.PidFile) > 0 { if f, err = os.Create(g.Config.PidFile); err != nil { return err } pid := os.Getpid() if _, err := f.WriteString(fmt.Sprintf("%d", pid)); err != nil { return err } if err = f.Sync(); err != nil { return err } g.mainlog().Fields("file", g.Config.PidFile, "pid", pid).Info("pid_file written") } return nil } // CheckFileLimit checks the number of files we can open (works on OS'es that support the ulimit command) func CheckFileLimit(c *AppConfig) (bool, int, uint64) { fileLimit, err := getFileLimit() maxClients := 0 if err != nil { // since we can't get the limit, return true to indicate the check passed return true, maxClients, fileLimit } if c.Servers == nil { // no servers have been configured, assuming default maxClients = defaultMaxClients } else { for _, s := range c.Servers { maxClients += s.MaxClients } } if uint64(maxClients) > fileLimit { return false, maxClients, fileLimit } return true, maxClients, fileLimit }