guerrilla.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. package guerrilla
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/flashmob/go-guerrilla/backends"
  6. "github.com/flashmob/go-guerrilla/log"
  7. "os"
  8. "sync"
  9. "sync/atomic"
  10. )
  11. const (
  12. // server has just been created
  13. GuerrillaStateNew = iota
  14. // Server has been started and is running
  15. GuerrillaStateStarted
  16. // Server has just been stopped
  17. GuerrillaStateStopped
  18. )
  19. type Errors []error
  20. // implement the Error interface
  21. func (e Errors) Error() string {
  22. if len(e) == 1 {
  23. return e[0].Error()
  24. }
  25. // multiple errors
  26. msg := ""
  27. for _, err := range e {
  28. msg += "\n" + err.Error()
  29. }
  30. return msg
  31. }
  32. type Guerrilla interface {
  33. Start() error
  34. Shutdown()
  35. Subscribe(topic Event, fn interface{}) error
  36. Publish(topic Event, args ...interface{})
  37. Unsubscribe(topic Event, handler interface{}) error
  38. SetLogger(log.Logger)
  39. }
  40. type guerrilla struct {
  41. Config AppConfig
  42. servers map[string]*server
  43. // guard controls access to g.servers
  44. guard sync.Mutex
  45. state int8
  46. EventHandler
  47. logStore
  48. backendStore
  49. }
  50. type logStore struct {
  51. atomic.Value
  52. }
  53. type backendStore struct {
  54. atomic.Value
  55. }
  56. // Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
  57. func (ls *logStore) mainlog() log.Logger {
  58. if v, ok := ls.Load().(log.Logger); ok {
  59. return v
  60. }
  61. l, _ := log.GetLogger(log.OutputStderr.String(), log.InfoLevel.String())
  62. return l
  63. }
  64. // storeMainlog stores the log value in an atomic operation
  65. func (ls *logStore) setMainlog(log log.Logger) {
  66. ls.Store(log)
  67. }
  68. // Returns a new instance of Guerrilla with the given config, not yet running. Backend started.
  69. func New(ac *AppConfig, b backends.Backend, l log.Logger) (Guerrilla, error) {
  70. g := &guerrilla{
  71. Config: *ac, // take a local copy
  72. servers: make(map[string]*server, len(ac.Servers)),
  73. }
  74. g.backendStore.Store(b)
  75. g.setMainlog(l)
  76. if ac.LogLevel != "" {
  77. if h, ok := l.(*log.HookedLogger); ok {
  78. if h, err := log.GetLogger(h.GetLogDest(), ac.LogLevel); err == nil {
  79. g.setMainlog(h)
  80. }
  81. }
  82. }
  83. g.state = GuerrillaStateNew
  84. err := g.makeServers()
  85. // start backend for processing email
  86. err = g.backend().Start()
  87. if err != nil {
  88. return g, err
  89. }
  90. g.writePid()
  91. // subscribe for any events that may come in while running
  92. g.subscribeEvents()
  93. return g, err
  94. }
  95. // Instantiate servers
  96. func (g *guerrilla) makeServers() error {
  97. g.mainlog().Debug("making servers")
  98. var errs Errors
  99. for _, sc := range g.Config.Servers {
  100. if _, ok := g.servers[sc.ListenInterface]; ok {
  101. // server already instantiated
  102. continue
  103. }
  104. if err := sc.Validate(); err != nil {
  105. g.mainlog().WithError(errs).Errorf("Failed to create server [%s]", sc.ListenInterface)
  106. errs = append(errs, err)
  107. continue
  108. } else {
  109. server, err := newServer(&sc, g.backend(), g.mainlog())
  110. if err != nil {
  111. g.mainlog().WithError(err).Errorf("Failed to create server [%s]", sc.ListenInterface)
  112. errs = append(errs, err)
  113. }
  114. if server != nil {
  115. g.servers[sc.ListenInterface] = server
  116. server.setAllowedHosts(g.Config.AllowedHosts)
  117. }
  118. }
  119. }
  120. if len(g.servers) == 0 {
  121. errs = append(errs, errors.New("There are no servers that can start, please check your config"))
  122. }
  123. if len(errs) == 0 {
  124. return nil
  125. }
  126. return errs
  127. }
  128. // findServer finds a server by iface (interface), retuning the server or err
  129. func (g *guerrilla) findServer(iface string) (*server, error) {
  130. g.guard.Lock()
  131. defer g.guard.Unlock()
  132. if server, ok := g.servers[iface]; ok {
  133. return server, nil
  134. }
  135. return nil, errors.New("server not found in g.servers")
  136. }
  137. // removeServer removes a server from the list of servers
  138. func (g *guerrilla) removeServer(iface string) {
  139. g.guard.Lock()
  140. defer g.guard.Unlock()
  141. delete(g.servers, iface)
  142. }
  143. // setConfig sets the app config
  144. func (g *guerrilla) setConfig(c *AppConfig) {
  145. g.guard.Lock()
  146. defer g.guard.Unlock()
  147. g.Config = *c
  148. }
  149. // setServerConfig config updates the server's config, which will update for the next connected client
  150. func (g *guerrilla) setServerConfig(sc *ServerConfig) {
  151. g.guard.Lock()
  152. defer g.guard.Unlock()
  153. if _, ok := g.servers[sc.ListenInterface]; ok {
  154. g.servers[sc.ListenInterface].setConfig(sc)
  155. }
  156. }
  157. // mapServers calls a callback on each server in g.servers map
  158. // It locks the g.servers map before mapping
  159. func (g *guerrilla) mapServers(callback func(*server)) map[string]*server {
  160. defer g.guard.Unlock()
  161. g.guard.Lock()
  162. for _, server := range g.servers {
  163. callback(server)
  164. }
  165. return g.servers
  166. }
  167. // subscribeEvents subscribes event handlers for configuration change events
  168. func (g *guerrilla) subscribeEvents() {
  169. // main config changed
  170. g.Subscribe(EventConfigNewConfig, func(c *AppConfig) {
  171. g.setConfig(c)
  172. })
  173. // allowed_hosts changed, set for all servers
  174. g.Subscribe(EventConfigAllowedHosts, func(c *AppConfig) {
  175. g.mapServers(func(server *server) {
  176. server.setAllowedHosts(c.AllowedHosts)
  177. })
  178. g.mainlog().Infof("allowed_hosts config changed, a new list was set")
  179. })
  180. // the main log file changed
  181. g.Subscribe(EventConfigLogFile, func(c *AppConfig) {
  182. var err error
  183. var l log.Logger
  184. if l, err = log.GetLogger(c.LogFile, c.LogLevel); err == nil {
  185. g.setMainlog(l)
  186. g.mapServers(func(server *server) {
  187. // it will change server's logger when the next client gets accepted
  188. server.mainlogStore.Store(l)
  189. })
  190. g.mainlog().Infof("main log for new clients changed to [%s]", c.LogFile)
  191. } else {
  192. g.mainlog().WithError(err).Errorf("main logging change failed [%s]", c.LogFile)
  193. }
  194. })
  195. // re-open the main log file (file not changed)
  196. g.Subscribe(EventConfigLogReopen, func(c *AppConfig) {
  197. g.mainlog().Reopen()
  198. g.mainlog().Infof("re-opened main log file [%s]", c.LogFile)
  199. })
  200. // when log level changes, apply to mainlog and server logs
  201. g.Subscribe(EventConfigLogLevel, func(c *AppConfig) {
  202. l, err := log.GetLogger(g.mainlog().GetLogDest(), c.LogLevel)
  203. if err == nil {
  204. g.logStore.Store(l)
  205. g.mapServers(func(server *server) {
  206. server.logStore.Store(l)
  207. })
  208. g.mainlog().Infof("log level changed to [%s]", c.LogLevel)
  209. }
  210. })
  211. // write out our pid whenever the file name changes in the config
  212. g.Subscribe(EventConfigPidFile, func(ac *AppConfig) {
  213. g.writePid()
  214. })
  215. // server config was updated
  216. g.Subscribe(EventConfigServerConfig, func(sc *ServerConfig) {
  217. g.setServerConfig(sc)
  218. g.mainlog().Infof("server %s config change event, a new config has been saved", sc.ListenInterface)
  219. })
  220. // add a new server to the config & start
  221. g.Subscribe(EventConfigServerNew, func(sc *ServerConfig) {
  222. g.mainlog().Debugf("event fired [%s] %s", EventConfigServerNew, sc.ListenInterface)
  223. if _, err := g.findServer(sc.ListenInterface); err != nil {
  224. // not found, lets add it
  225. //
  226. if err := g.makeServers(); err != nil {
  227. g.mainlog().WithError(err).Errorf("cannot add server [%s]", sc.ListenInterface)
  228. return
  229. }
  230. g.mainlog().Infof("New server added [%s]", sc.ListenInterface)
  231. if g.state == GuerrillaStateStarted {
  232. err := g.Start()
  233. if err != nil {
  234. g.mainlog().WithError(err).Info("Event server_change:new_server returned errors when starting")
  235. }
  236. }
  237. } else {
  238. g.mainlog().Debugf("new event, but server already fund")
  239. }
  240. })
  241. // start a server that already exists in the config and has been enabled
  242. g.Subscribe(EventConfigServerStart, func(sc *ServerConfig) {
  243. if server, err := g.findServer(sc.ListenInterface); err == nil {
  244. if server.state == ServerStateStopped || server.state == ServerStateNew {
  245. g.mainlog().Infof("Starting server [%s]", server.listenInterface)
  246. err := g.Start()
  247. if err != nil {
  248. g.mainlog().WithError(err).Info("Event server_change:start_server returned errors when starting")
  249. }
  250. }
  251. }
  252. })
  253. // stop running a server
  254. g.Subscribe(EventConfigServerStop, func(sc *ServerConfig) {
  255. if server, err := g.findServer(sc.ListenInterface); err == nil {
  256. if server.state == ServerStateRunning {
  257. server.Shutdown()
  258. g.mainlog().Infof("Server [%s] stopped.", sc.ListenInterface)
  259. }
  260. }
  261. })
  262. // server was removed from config
  263. g.Subscribe(EventConfigServerRemove, func(sc *ServerConfig) {
  264. if server, err := g.findServer(sc.ListenInterface); err == nil {
  265. server.Shutdown()
  266. g.removeServer(sc.ListenInterface)
  267. g.mainlog().Infof("Server [%s] removed from config, stopped it.", sc.ListenInterface)
  268. }
  269. })
  270. // TLS changes
  271. g.Subscribe(EventConfigServerTLSConfig, func(sc *ServerConfig) {
  272. if server, err := g.findServer(sc.ListenInterface); err == nil {
  273. if err := server.configureSSL(); err == nil {
  274. g.mainlog().Infof("Server [%s] new TLS configuration loaded", sc.ListenInterface)
  275. } else {
  276. g.mainlog().WithError(err).Errorf("Server [%s] failed to load the new TLS configuration", sc.ListenInterface)
  277. }
  278. }
  279. })
  280. // when server's timeout change.
  281. g.Subscribe(EventConfigServerTimeout, func(sc *ServerConfig) {
  282. g.mapServers(func(server *server) {
  283. server.setTimeout(sc.Timeout)
  284. })
  285. })
  286. // when server's max clients change.
  287. g.Subscribe(EventConfigServerMaxClients, func(sc *ServerConfig) {
  288. g.mapServers(func(server *server) {
  289. // TODO resize the pool somehow
  290. })
  291. })
  292. // when a server's log file changes
  293. g.Subscribe(EventConfigServerLogFile, func(sc *ServerConfig) {
  294. if server, err := g.findServer(sc.ListenInterface); err == nil {
  295. var err error
  296. var l log.Logger
  297. level := g.mainlog().GetLevel()
  298. if l, err = log.GetLogger(sc.LogFile, level); err == nil {
  299. g.setMainlog(l)
  300. backends.Svc.SetMainlog(l)
  301. // it will change to the new logger on the next accepted client
  302. server.logStore.Store(l)
  303. g.mainlog().Infof("Server [%s] changed, new clients will log to: [%s]",
  304. sc.ListenInterface,
  305. sc.LogFile,
  306. )
  307. } else {
  308. g.mainlog().WithError(err).Errorf(
  309. "Server [%s] log change failed to: [%s]",
  310. sc.ListenInterface,
  311. sc.LogFile,
  312. )
  313. }
  314. }
  315. })
  316. // when the daemon caught a sighup, event for individual server
  317. g.Subscribe(EventConfigServerLogReopen, func(sc *ServerConfig) {
  318. if server, err := g.findServer(sc.ListenInterface); err == nil {
  319. server.log().Reopen()
  320. g.mainlog().Infof("Server [%s] re-opened log file [%s]", sc.ListenInterface, sc.LogFile)
  321. }
  322. })
  323. // when the backend changes
  324. g.Subscribe(EventConfigBackendConfig, func(appConfig *AppConfig) {
  325. logger, _ := log.GetLogger(appConfig.LogFile, appConfig.LogLevel)
  326. // shutdown the backend first.
  327. var err error
  328. if err = g.backend().Shutdown(); err != nil {
  329. logger.WithError(err).Warn("Backend failed to shutdown")
  330. return
  331. }
  332. // init a new backend, Revert to old backend config if it fails
  333. if newBackend, newErr := backends.New(appConfig.BackendConfig, logger); newErr != nil {
  334. logger.WithError(newErr).Error("Error while loading the backend")
  335. err = g.backend().Reinitialize()
  336. if err != nil {
  337. logger.WithError(err).Fatal("failed to revert to old backend config")
  338. return
  339. }
  340. err = g.backend().Start()
  341. if err != nil {
  342. logger.WithError(err).Fatal("failed to start backend with old config")
  343. return
  344. }
  345. logger.Info("reverted to old backend config")
  346. } else {
  347. // swap to the bew backend (assuming old backend was shutdown so it can be safely swapped)
  348. if err := newBackend.Start(); err != nil {
  349. logger.WithError(err).Error("backend could not start")
  350. }
  351. logger.Info("new backend started")
  352. g.storeBackend(newBackend)
  353. }
  354. })
  355. }
  356. func (g *guerrilla) storeBackend(b backends.Backend) {
  357. g.backendStore.Store(b)
  358. g.mapServers(func(server *server) {
  359. server.setBackend(b)
  360. })
  361. }
  362. func (g *guerrilla) backend() backends.Backend {
  363. if b, ok := g.backendStore.Load().(backends.Backend); ok {
  364. return b
  365. }
  366. return nil
  367. }
  368. // Entry point for the application. Starts all servers.
  369. func (g *guerrilla) Start() error {
  370. var startErrors Errors
  371. g.guard.Lock()
  372. defer func() {
  373. g.state = GuerrillaStateStarted
  374. g.guard.Unlock()
  375. }()
  376. if len(g.servers) == 0 {
  377. return append(startErrors, errors.New("No servers to start, please check the config"))
  378. }
  379. if g.state == GuerrillaStateStopped {
  380. // when a backend is shutdown, we need to re-initialize before it can be started again
  381. g.backend().Reinitialize()
  382. g.backend().Start()
  383. }
  384. // channel for reading errors
  385. errs := make(chan error, len(g.servers))
  386. var startWG sync.WaitGroup
  387. // start servers, send any errors back to errs channel
  388. for ListenInterface := range g.servers {
  389. if !g.servers[ListenInterface].isEnabled() {
  390. // not enabled
  391. continue
  392. }
  393. if g.servers[ListenInterface].state != ServerStateNew &&
  394. g.servers[ListenInterface].state != ServerStateStopped {
  395. continue
  396. }
  397. startWG.Add(1)
  398. go func(s *server) {
  399. g.mainlog().Infof("Starting: %s", s.listenInterface)
  400. if err := s.Start(&startWG); err != nil {
  401. errs <- err
  402. }
  403. }(g.servers[ListenInterface])
  404. }
  405. // wait for all servers to start (or fail)
  406. startWG.Wait()
  407. // close, then read any errors
  408. close(errs)
  409. for err := range errs {
  410. if err != nil {
  411. startErrors = append(startErrors, err)
  412. }
  413. }
  414. if len(startErrors) > 0 {
  415. return startErrors
  416. }
  417. return nil
  418. }
  419. func (g *guerrilla) Shutdown() {
  420. // shut down the servers first
  421. g.mapServers(func(s *server) {
  422. if s.state == ServerStateRunning {
  423. s.Shutdown()
  424. g.mainlog().Infof("shutdown completed for [%s]", s.listenInterface)
  425. }
  426. })
  427. g.guard.Lock()
  428. defer func() {
  429. g.state = GuerrillaStateStopped
  430. defer g.guard.Unlock()
  431. }()
  432. if err := g.backend().Shutdown(); err != nil {
  433. g.mainlog().WithError(err).Warn("Backend failed to shutdown")
  434. } else {
  435. g.mainlog().Infof("Backend shutdown completed")
  436. }
  437. }
  438. // SetLogger sets the logger for the app and propagates it to sub-packages (eg.
  439. func (g *guerrilla) SetLogger(l log.Logger) {
  440. g.setMainlog(l)
  441. backends.Svc.SetMainlog(l)
  442. }
  443. // writePid writes the pid (process id) to the file specified in the config.
  444. // Won't write anything if no file specified
  445. func (g *guerrilla) writePid() error {
  446. if len(g.Config.PidFile) > 0 {
  447. if f, err := os.Create(g.Config.PidFile); err == nil {
  448. defer f.Close()
  449. pid := os.Getpid()
  450. if _, err := f.WriteString(fmt.Sprintf("%d", pid)); err == nil {
  451. f.Sync()
  452. g.mainlog().Infof("pid_file (%s) written with pid:%v", g.Config.PidFile, pid)
  453. } else {
  454. g.mainlog().WithError(err).Errorf("Error while writing pidFile (%s)", g.Config.PidFile)
  455. return err
  456. }
  457. } else {
  458. g.mainlog().WithError(err).Errorf("Error while creating pidFile (%s)", g.Config.PidFile)
  459. return err
  460. }
  461. }
  462. return nil
  463. }