gateway.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "runtime/debug"
  9. "strings"
  10. "github.com/flashmob/go-guerrilla/log"
  11. "github.com/flashmob/go-guerrilla/mail"
  12. "github.com/flashmob/go-guerrilla/response"
  13. )
  14. var ErrProcessorNotFound error
  15. // A backend gateway is a proxy that implements the Backend interface.
  16. // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
  17. // via a channel. Shutting down via Shutdown() will stop all workers.
  18. // The rest of this program always talks to the backend via this gateway.
  19. type BackendGateway struct {
  20. // channel for distributing envelopes to workers
  21. conveyor chan *workerMsg
  22. // waits for backend workers to start/stop
  23. wg sync.WaitGroup
  24. workStoppers []chan bool
  25. processors []Processor
  26. validators []Processor
  27. // controls access to state
  28. sync.Mutex
  29. State backendState
  30. config BackendConfig
  31. gwConfig *GatewayConfig
  32. }
  33. type GatewayConfig struct {
  34. // WorkersSize controls how many concurrent workers to start. Defaults to 1
  35. WorkersSize int `json:"save_workers_size,omitempty"`
  36. // SaveProcess controls which processors to chain in a stack for saving email tasks
  37. SaveProcess string `json:"save_process,omitempty"`
  38. // ValidateProcess is like ProcessorStack, but for recipient validation tasks
  39. ValidateProcess string `json:"validate_process,omitempty"`
  40. // TimeoutSave is duration before timeout when saving an email, eg "29s"
  41. TimeoutSave string `json:"gw_save_timeout,omitempty"`
  42. // TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
  43. TimeoutValidateRcpt string `json:"gw_val_rcpt_timeout,omitempty"`
  44. }
  45. // workerMsg is what get placed on the BackendGateway.saveMailChan channel
  46. type workerMsg struct {
  47. // The email data
  48. e *mail.Envelope
  49. // notifyMe is used to notify the gateway of workers finishing their processing
  50. notifyMe chan *notifyMsg
  51. // select the task type
  52. task SelectTask
  53. }
  54. type backendState int
  55. // possible values for state
  56. const (
  57. BackendStateNew backendState = iota
  58. BackendStateRunning
  59. BackendStateShuttered
  60. BackendStateError
  61. BackendStateInitialized
  62. // default timeout for saving email, if 'gw_save_timeout' not present in config
  63. saveTimeout = time.Second * 30
  64. // default timeout for validating rcpt to, if 'gw_val_rcpt_timeout' not present in config
  65. validateRcptTimeout = time.Second * 5
  66. defaultProcessor = "Debugger"
  67. )
  68. func (s backendState) String() string {
  69. switch s {
  70. case BackendStateNew:
  71. return "NewState"
  72. case BackendStateRunning:
  73. return "RunningState"
  74. case BackendStateShuttered:
  75. return "ShutteredState"
  76. case BackendStateError:
  77. return "ErrorSate"
  78. case BackendStateInitialized:
  79. return "InitializedState"
  80. }
  81. return strconv.Itoa(int(s))
  82. }
  83. // New makes a new default BackendGateway backend, and initializes it using
  84. // backendConfig and stores the logger
  85. func New(backendConfig BackendConfig, l log.Logger) (Backend, error) {
  86. Svc.SetMainlog(l)
  87. gateway := &BackendGateway{}
  88. err := gateway.Initialize(backendConfig)
  89. if err != nil {
  90. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  91. }
  92. // keep the config known to be good.
  93. gateway.config = backendConfig
  94. b = Backend(gateway)
  95. return b, nil
  96. }
  97. var workerMsgPool = sync.Pool{
  98. // if not available, then create a new one
  99. New: func() interface{} {
  100. return &workerMsg{}
  101. },
  102. }
  103. // reset resets a workerMsg that has been borrowed from the pool
  104. func (w *workerMsg) reset(e *mail.Envelope, task SelectTask) {
  105. if w.notifyMe == nil {
  106. w.notifyMe = make(chan *notifyMsg)
  107. }
  108. w.e = e
  109. w.task = task
  110. }
  111. // Process distributes an envelope to one of the backend workers with a TaskSaveMail task
  112. func (gw *BackendGateway) Process(e *mail.Envelope) Result {
  113. if gw.State != BackendStateRunning {
  114. return NewResult(response.Canned.FailBackendNotRunning, response.SP, gw.State)
  115. }
  116. // borrow a workerMsg from the pool
  117. workerMsg := workerMsgPool.Get().(*workerMsg)
  118. workerMsg.reset(e, TaskSaveMail)
  119. // place on the channel so that one of the save mail workers can pick it up
  120. gw.conveyor <- workerMsg
  121. // wait for the save to complete
  122. // or timeout
  123. select {
  124. case status := <-workerMsg.notifyMe:
  125. // email saving transaction completed
  126. if status.result == BackendResultOK && status.queuedID != "" {
  127. return NewResult(response.Canned.SuccessMessageQueued, response.SP, status.queuedID)
  128. }
  129. // A custom result, there was probably an error, if so, log it
  130. if status.result != nil {
  131. if status.err != nil {
  132. Log().Error(status.err)
  133. }
  134. return status.result
  135. }
  136. // if there was no result, but there's an error, then make a new result from the error
  137. if status.err != nil {
  138. if _, err := strconv.Atoi(status.err.Error()[:3]); err != nil {
  139. return NewResult(response.Canned.FailBackendTransaction, response.SP, status.err)
  140. }
  141. return NewResult(status.err)
  142. }
  143. // both result & error are nil (should not happen)
  144. err := errors.New("no response from backend - processor did not return a result or an error")
  145. Log().Error(err)
  146. return NewResult(response.Canned.FailBackendTransaction, response.SP, err)
  147. case <-time.After(gw.saveTimeout()):
  148. Log().Error("Backend has timed out while saving email")
  149. e.Lock() // lock the envelope - it's still processing here, we don't want the server to recycle it
  150. go func() {
  151. // keep waiting for the backend to finish processing
  152. <-workerMsg.notifyMe
  153. e.Unlock()
  154. workerMsgPool.Put(workerMsg)
  155. }()
  156. return NewResult(response.Canned.FailBackendTimeout)
  157. }
  158. }
  159. // ValidateRcpt asks one of the workers to validate the recipient
  160. // Only the last recipient appended to e.RcptTo will be validated.
  161. func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
  162. if gw.State != BackendStateRunning {
  163. return StorageNotAvailable
  164. }
  165. if _, ok := gw.validators[0].(NoopProcessor); ok {
  166. // no validator processors configured
  167. return nil
  168. }
  169. // place on the channel so that one of the save mail workers can pick it up
  170. workerMsg := workerMsgPool.Get().(*workerMsg)
  171. workerMsg.reset(e, TaskValidateRcpt)
  172. gw.conveyor <- workerMsg
  173. // wait for the validation to complete
  174. // or timeout
  175. select {
  176. case status := <-workerMsg.notifyMe:
  177. workerMsgPool.Put(workerMsg)
  178. if status.err != nil {
  179. return status.err
  180. }
  181. return nil
  182. case <-time.After(gw.validateRcptTimeout()):
  183. e.Lock()
  184. go func() {
  185. <-workerMsg.notifyMe
  186. e.Unlock()
  187. workerMsgPool.Put(workerMsg)
  188. Log().Error("Backend has timed out while validating rcpt")
  189. }()
  190. return StorageTimeout
  191. }
  192. }
  193. // Shutdown shuts down the backend and leaves it in BackendStateShuttered state
  194. func (gw *BackendGateway) Shutdown() error {
  195. gw.Lock()
  196. defer gw.Unlock()
  197. if gw.State != BackendStateShuttered {
  198. // send a signal to all workers
  199. gw.stopWorkers()
  200. // wait for workers to stop
  201. gw.wg.Wait()
  202. // call shutdown on all processor shutdowners
  203. if err := Svc.shutdown(); err != nil {
  204. return err
  205. }
  206. gw.State = BackendStateShuttered
  207. }
  208. return nil
  209. }
  210. // Reinitialize initializes the gateway with the existing config after it was shutdown
  211. func (gw *BackendGateway) Reinitialize() error {
  212. if gw.State != BackendStateShuttered {
  213. return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
  214. }
  215. // clear the Initializers and Shutdowners
  216. Svc.reset()
  217. err := gw.Initialize(gw.config)
  218. if err != nil {
  219. fmt.Println("reinitialize to ", gw.config, err)
  220. return fmt.Errorf("error while initializing the backend: %s", err)
  221. }
  222. return err
  223. }
  224. // newStack creates a new Processor by chaining multiple Processors in a call stack
  225. // Decorators are functions of Decorator type, source files prefixed with p_*
  226. // Each decorator does a specific task during the processing stage.
  227. // This function uses the config value save_process or validate_process to figure out which Decorator to use
  228. func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
  229. var decorators []Decorator
  230. cfg := strings.ToLower(strings.TrimSpace(stackConfig))
  231. if len(cfg) == 0 {
  232. //cfg = strings.ToLower(defaultProcessor)
  233. return NoopProcessor{}, nil
  234. }
  235. items := strings.Split(cfg, "|")
  236. for i := range items {
  237. name := items[len(items)-1-i] // reverse order, since decorators are stacked
  238. if makeFunc, ok := processors[name]; ok {
  239. decorators = append(decorators, makeFunc())
  240. } else {
  241. ErrProcessorNotFound = errors.New(fmt.Sprintf("processor [%s] not found", name))
  242. return nil, ErrProcessorNotFound
  243. }
  244. }
  245. // build the call-stack of decorators
  246. p := Decorate(DefaultProcessor{}, decorators...)
  247. return p, nil
  248. }
  249. // loadConfig loads the config for the GatewayConfig
  250. func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
  251. configType := BaseConfig(&GatewayConfig{})
  252. // Note: treat config values as immutable
  253. // if you need to change a config value, change in the file then
  254. // send a SIGHUP
  255. bcfg, err := Svc.ExtractConfig(cfg, configType)
  256. if err != nil {
  257. return err
  258. }
  259. gw.gwConfig = bcfg.(*GatewayConfig)
  260. return nil
  261. }
  262. // Initialize builds the workers and initializes each one
  263. func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
  264. gw.Lock()
  265. defer gw.Unlock()
  266. if gw.State != BackendStateNew && gw.State != BackendStateShuttered {
  267. return errors.New("can only Initialize in BackendStateNew or BackendStateShuttered state")
  268. }
  269. err := gw.loadConfig(cfg)
  270. if err != nil {
  271. gw.State = BackendStateError
  272. return err
  273. }
  274. workersSize := gw.workersSize()
  275. if workersSize < 1 {
  276. gw.State = BackendStateError
  277. return errors.New("must have at least 1 worker")
  278. }
  279. gw.processors = make([]Processor, 0)
  280. gw.validators = make([]Processor, 0)
  281. for i := 0; i < workersSize; i++ {
  282. p, err := gw.newStack(gw.gwConfig.SaveProcess)
  283. if err != nil {
  284. gw.State = BackendStateError
  285. return err
  286. }
  287. gw.processors = append(gw.processors, p)
  288. v, err := gw.newStack(gw.gwConfig.ValidateProcess)
  289. if err != nil {
  290. gw.State = BackendStateError
  291. return err
  292. }
  293. gw.validators = append(gw.validators, v)
  294. }
  295. // initialize processors
  296. if err := Svc.initialize(cfg); err != nil {
  297. gw.State = BackendStateError
  298. return err
  299. }
  300. if gw.conveyor == nil {
  301. gw.conveyor = make(chan *workerMsg, workersSize)
  302. }
  303. // ready to start
  304. gw.State = BackendStateInitialized
  305. return nil
  306. }
  307. // Start starts the worker goroutines, assuming it has been initialized or shuttered before
  308. func (gw *BackendGateway) Start() error {
  309. gw.Lock()
  310. defer gw.Unlock()
  311. if gw.State == BackendStateInitialized || gw.State == BackendStateShuttered {
  312. // we start our workers
  313. workersSize := gw.workersSize()
  314. // make our slice of channels for stopping
  315. gw.workStoppers = make([]chan bool, 0)
  316. // set the wait group
  317. gw.wg.Add(workersSize)
  318. for i := 0; i < workersSize; i++ {
  319. stop := make(chan bool)
  320. go func(workerId int, stop chan bool) {
  321. // blocks here until the worker exits
  322. for {
  323. state := gw.workDispatcher(
  324. gw.conveyor,
  325. gw.processors[workerId],
  326. gw.validators[workerId],
  327. workerId+1,
  328. stop)
  329. // keep running after panic
  330. if state != dispatcherStatePanic {
  331. break
  332. }
  333. }
  334. gw.wg.Done()
  335. }(i, stop)
  336. gw.workStoppers = append(gw.workStoppers, stop)
  337. }
  338. gw.State = BackendStateRunning
  339. return nil
  340. } else {
  341. return errors.New(fmt.Sprintf("cannot start backend because it's in %s state", gw.State))
  342. }
  343. }
  344. // workersSize gets the number of workers to use for saving email by reading the save_workers_size config value
  345. // Returns 1 if no config value was set
  346. func (gw *BackendGateway) workersSize() int {
  347. if gw.gwConfig.WorkersSize <= 0 {
  348. return 1
  349. }
  350. return gw.gwConfig.WorkersSize
  351. }
  352. // saveTimeout returns the maximum amount of seconds to wait before timing out a save processing task
  353. func (gw *BackendGateway) saveTimeout() time.Duration {
  354. if gw.gwConfig.TimeoutSave == "" {
  355. return saveTimeout
  356. }
  357. t, err := time.ParseDuration(gw.gwConfig.TimeoutSave)
  358. if err != nil {
  359. return saveTimeout
  360. }
  361. return t
  362. }
  363. // validateRcptTimeout returns the maximum amount of seconds to wait before timing out a recipient validation task
  364. func (gw *BackendGateway) validateRcptTimeout() time.Duration {
  365. if gw.gwConfig.TimeoutValidateRcpt == "" {
  366. return validateRcptTimeout
  367. }
  368. t, err := time.ParseDuration(gw.gwConfig.TimeoutValidateRcpt)
  369. if err != nil {
  370. return validateRcptTimeout
  371. }
  372. return t
  373. }
  374. type dispatcherState int
  375. const (
  376. dispatcherStateStopped dispatcherState = iota
  377. dispatcherStateIdle
  378. dispatcherStateWorking
  379. dispatcherStateNotify
  380. dispatcherStatePanic
  381. )
  382. func (gw *BackendGateway) workDispatcher(
  383. workIn chan *workerMsg,
  384. save Processor,
  385. validate Processor,
  386. workerId int,
  387. stop chan bool) (state dispatcherState) {
  388. var msg *workerMsg
  389. defer func() {
  390. // panic recovery mechanism: it may panic when processing
  391. // since processors may call arbitrary code, some may be 3rd party / unstable
  392. // we need to detect the panic, and notify the backend that it failed & unlock the envelope
  393. if r := recover(); r != nil {
  394. Log().Error("worker recovered from panic:", r, string(debug.Stack()))
  395. if state == dispatcherStateWorking {
  396. msg.notifyMe <- &notifyMsg{err: errors.New("storage failed")}
  397. }
  398. state = dispatcherStatePanic
  399. return
  400. }
  401. // state is dispatcherStateStopped if it reached here
  402. return
  403. }()
  404. state = dispatcherStateIdle
  405. Log().Infof("processing worker started (#%d)", workerId)
  406. for {
  407. select {
  408. case <-stop:
  409. state = dispatcherStateStopped
  410. Log().Infof("stop signal for worker (#%d)", workerId)
  411. return
  412. case msg = <-workIn:
  413. state = dispatcherStateWorking // recovers from panic if in this state
  414. result, err := save.Process(msg.e, msg.task)
  415. state = dispatcherStateNotify
  416. if msg.task == TaskSaveMail {
  417. msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
  418. } else {
  419. msg.notifyMe <- &notifyMsg{err: err, result: result}
  420. }
  421. }
  422. state = dispatcherStateIdle
  423. }
  424. }
  425. // stopWorkers sends a signal to all workers to stop
  426. func (gw *BackendGateway) stopWorkers() {
  427. for i := range gw.workStoppers {
  428. gw.workStoppers[i] <- true
  429. }
  430. }