gateway.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "github.com/flashmob/go-guerrilla/log"
  10. "github.com/flashmob/go-guerrilla/mail"
  11. "github.com/flashmob/go-guerrilla/response"
  12. "runtime/debug"
  13. "strings"
  14. )
  15. var ErrProcessorNotFound error
  16. // A backend gateway is a proxy that implements the Backend interface.
  17. // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
  18. // via a channel. Shutting down via Shutdown() will stop all workers.
  19. // The rest of this program always talks to the backend via this gateway.
  20. type BackendGateway struct {
  21. // channel for distributing envelopes to workers
  22. conveyor chan *workerMsg
  23. // waits for backend workers to start/stop
  24. wg sync.WaitGroup
  25. workStoppers []chan bool
  26. processors []Processor
  27. validators []Processor
  28. streamers []streamer
  29. // controls access to state
  30. sync.Mutex
  31. State backendState
  32. config BackendConfig
  33. gwConfig *GatewayConfig
  34. }
  35. type GatewayConfig struct {
  36. // WorkersSize controls how many concurrent workers to start. Defaults to 1
  37. WorkersSize int `json:"save_workers_size,omitempty"`
  38. // SaveProcess controls which processors to chain in a stack for saving email tasks
  39. SaveProcess string `json:"save_process,omitempty"`
  40. // ValidateProcess is like ProcessorStack, but for recipient validation tasks
  41. ValidateProcess string `json:"validate_process,omitempty"`
  42. // TimeoutSave is duration before timeout when saving an email, eg "29s"
  43. TimeoutSave string `json:"gw_save_timeout,omitempty"`
  44. // TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
  45. TimeoutValidateRcpt string `json:"gw_val_rcpt_timeout,omitempty"`
  46. // StreamSaveProcess is same as a ProcessorStack, but reads from an io.Reader to write email data
  47. StreamSaveProcess string `json:"stream_save_process,omitempty"`
  48. }
  49. // workerMsg is what get placed on the BackendGateway.saveMailChan channel
  50. type workerMsg struct {
  51. // The email data
  52. e *mail.Envelope
  53. // notifyMe is used to notify the gateway of workers finishing their processing
  54. notifyMe chan *notifyMsg
  55. // select the task type
  56. task SelectTask
  57. // io.Reader for streamed processor
  58. r io.Reader
  59. }
  60. type streamer struct {
  61. // StreamProcessor is a chain of StreamProcessor
  62. sp StreamProcessor
  63. // so that we can call Open and Close
  64. d []*StreamDecorator
  65. }
  66. func (s streamer) Write(p []byte) (n int, err error) {
  67. return s.sp.Write(p)
  68. }
  69. func (s *streamer) open(e *mail.Envelope) error {
  70. var err Errors
  71. for i := range s.d {
  72. if s.d[i].Open != nil {
  73. if e := s.d[i].Open(e); e != nil {
  74. err = append(err, e)
  75. }
  76. }
  77. }
  78. if len(err) == 0 {
  79. return nil
  80. }
  81. return err
  82. }
  83. func (s *streamer) close() error {
  84. var err Errors
  85. // close in reverse order
  86. for i := len(s.d) - 1; i >= 0; i-- {
  87. if s.d[i].Close != nil {
  88. if e := s.d[i].Close(); e != nil {
  89. err = append(err, e)
  90. }
  91. }
  92. }
  93. if len(err) == 0 {
  94. return nil
  95. }
  96. return err
  97. }
  98. type backendState int
  99. // possible values for state
  100. const (
  101. BackendStateNew backendState = iota
  102. BackendStateRunning
  103. BackendStateShuttered
  104. BackendStateError
  105. BackendStateInitialized
  106. // default timeout for saving email, if 'gw_save_timeout' not present in config
  107. saveTimeout = time.Second * 30
  108. // default timeout for validating rcpt to, if 'gw_val_rcpt_timeout' not present in config
  109. validateRcptTimeout = time.Second * 5
  110. defaultProcessor = "Debugger"
  111. )
  112. func (s backendState) String() string {
  113. switch s {
  114. case BackendStateNew:
  115. return "NewState"
  116. case BackendStateRunning:
  117. return "RunningState"
  118. case BackendStateShuttered:
  119. return "ShutteredState"
  120. case BackendStateError:
  121. return "ErrorSate"
  122. case BackendStateInitialized:
  123. return "InitializedState"
  124. }
  125. return strconv.Itoa(int(s))
  126. }
  127. // New makes a new default BackendGateway backend, and initializes it using
  128. // backendConfig and stores the logger
  129. func New(backendConfig BackendConfig, l log.Logger) (Backend, error) {
  130. Svc.SetMainlog(l)
  131. gateway := &BackendGateway{}
  132. err := gateway.Initialize(backendConfig)
  133. if err != nil {
  134. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  135. }
  136. // keep the config known to be good.
  137. gateway.config = backendConfig
  138. b = Backend(gateway)
  139. return b, nil
  140. }
  141. var workerMsgPool = sync.Pool{
  142. // if not available, then create a new one
  143. New: func() interface{} {
  144. return &workerMsg{}
  145. },
  146. }
  147. // reset resets a workerMsg that has been borrowed from the pool
  148. func (w *workerMsg) reset(e *mail.Envelope, task SelectTask) {
  149. if w.notifyMe == nil {
  150. w.notifyMe = make(chan *notifyMsg)
  151. }
  152. w.e = e
  153. w.task = task
  154. }
  155. // Process distributes an envelope to one of the backend workers with a TaskSaveMail task
  156. func (gw *BackendGateway) Process(e *mail.Envelope) Result {
  157. if gw.State != BackendStateRunning {
  158. return NewResult(response.Canned.FailBackendNotRunning, response.SP, gw.State)
  159. }
  160. // borrow a workerMsg from the pool
  161. workerMsg := workerMsgPool.Get().(*workerMsg)
  162. defer workerMsgPool.Put(workerMsg)
  163. workerMsg.reset(e, TaskSaveMail)
  164. // place on the channel so that one of the save mail workers can pick it up
  165. gw.conveyor <- workerMsg
  166. // wait for the save to complete
  167. // or timeout
  168. select {
  169. case status := <-workerMsg.notifyMe:
  170. // email saving transaction completed
  171. if status.result == BackendResultOK && status.queuedID != "" {
  172. return NewResult(response.Canned.SuccessMessageQueued, response.SP, status.queuedID)
  173. }
  174. // A custom result, there was probably an error, if so, log it
  175. if status.result != nil {
  176. if status.err != nil {
  177. Log().Error(status.err)
  178. }
  179. return status.result
  180. }
  181. // if there was no result, but there's an error, then make a new result from the error
  182. if status.err != nil {
  183. if _, err := strconv.Atoi(status.err.Error()[:3]); err != nil {
  184. return NewResult(response.Canned.FailBackendTransaction, response.SP, status.err)
  185. }
  186. return NewResult(status.err)
  187. }
  188. // both result & error are nil (should not happen)
  189. err := errors.New("no response from backend - processor did not return a result or an error")
  190. Log().Error(err)
  191. return NewResult(response.Canned.FailBackendTransaction, response.SP, err)
  192. case <-time.After(gw.saveTimeout()):
  193. Log().Error("Backend has timed out while saving email")
  194. e.Lock() // lock the envelope - it's still processing here, we don't want the server to recycle it
  195. go func() {
  196. // keep waiting for the backend to finish processing
  197. <-workerMsg.notifyMe
  198. e.Unlock()
  199. }()
  200. return NewResult(response.Canned.FailBackendTimeout)
  201. }
  202. }
  203. // ValidateRcpt asks one of the workers to validate the recipient
  204. // Only the last recipient appended to e.RcptTo will be validated.
  205. func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
  206. if gw.State != BackendStateRunning {
  207. return StorageNotAvailable
  208. }
  209. if _, ok := gw.validators[0].(NoopProcessor); ok {
  210. // no validator processors configured
  211. return nil
  212. }
  213. // place on the channel so that one of the save mail workers can pick it up
  214. workerMsg := workerMsgPool.Get().(*workerMsg)
  215. defer workerMsgPool.Put(workerMsg)
  216. workerMsg.reset(e, TaskValidateRcpt)
  217. gw.conveyor <- workerMsg
  218. // wait for the validation to complete
  219. // or timeout
  220. select {
  221. case status := <-workerMsg.notifyMe:
  222. if status.err != nil {
  223. return status.err
  224. }
  225. return nil
  226. case <-time.After(gw.validateRcptTimeout()):
  227. e.Lock()
  228. go func() {
  229. <-workerMsg.notifyMe
  230. e.Unlock()
  231. Log().Error("Backend has timed out while validating rcpt")
  232. }()
  233. return StorageTimeout
  234. }
  235. }
  236. func (gw *BackendGateway) StreamOn() bool {
  237. return len(gw.gwConfig.StreamSaveProcess) != 0
  238. }
  239. func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result, error) {
  240. res := response.Canned
  241. if gw.State != BackendStateRunning {
  242. return NewResult(res.FailBackendNotRunning, response.SP, gw.State), errors.New(res.FailBackendNotRunning.String())
  243. }
  244. // borrow a workerMsg from the pool
  245. workerMsg := workerMsgPool.Get().(*workerMsg)
  246. workerMsg.reset(e, TaskSaveMailStream)
  247. workerMsg.r = r
  248. // place on the channel so that one of the save mail workers can pick it up
  249. gw.conveyor <- workerMsg
  250. // wait for the save to complete
  251. // or timeout
  252. select {
  253. case status := <-workerMsg.notifyMe:
  254. // email saving transaction completed
  255. if status.result == BackendResultOK && status.queuedID != "" {
  256. return NewResult(res.SuccessMessageQueued, response.SP, status.queuedID), status.err
  257. }
  258. // A custom result, there was probably an error, if so, log it
  259. if status.result != nil {
  260. if status.err != nil {
  261. Log().Error(status.err)
  262. }
  263. return status.result, status.err
  264. }
  265. // if there was no result, but there's an error, then make a new result from the error
  266. if status.err != nil {
  267. if _, err := strconv.Atoi(status.err.Error()[:3]); err != nil {
  268. return NewResult(res.FailBackendTransaction, response.SP, status.err), status.err
  269. }
  270. return NewResult(status.err), status.err
  271. }
  272. // both result & error are nil (should not happen)
  273. err := errors.New("no response from backend - processor did not return a result or an error")
  274. Log().Error(err)
  275. return NewResult(res.FailBackendTransaction, response.SP, err), err
  276. case <-time.After(gw.saveTimeout()):
  277. Log().Error("Backend has timed out while saving email")
  278. e.Lock() // lock the envelope - it's still processing here, we don't want the server to recycle it
  279. go func() {
  280. // keep waiting for the backend to finish processing
  281. <-workerMsg.notifyMe
  282. e.Unlock()
  283. workerMsgPool.Put(workerMsg)
  284. }()
  285. return NewResult(res.FailBackendTimeout), errors.New("gateway timeout")
  286. }
  287. }
  288. // Shutdown shuts down the backend and leaves it in BackendStateShuttered state
  289. func (gw *BackendGateway) Shutdown() error {
  290. gw.Lock()
  291. defer gw.Unlock()
  292. if gw.State != BackendStateShuttered {
  293. // send a signal to all workers
  294. gw.stopWorkers()
  295. // wait for workers to stop
  296. gw.wg.Wait()
  297. // call shutdown on all processor shutdowners
  298. if err := Svc.shutdown(); err != nil {
  299. return err
  300. }
  301. gw.State = BackendStateShuttered
  302. }
  303. return nil
  304. }
  305. // Reinitialize initializes the gateway with the existing config after it was shutdown
  306. func (gw *BackendGateway) Reinitialize() error {
  307. if gw.State != BackendStateShuttered {
  308. return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
  309. }
  310. // clear the Initializers and Shutdowners
  311. Svc.reset()
  312. err := gw.Initialize(gw.config)
  313. if err != nil {
  314. fmt.Println("reinitialize to ", gw.config, err)
  315. return fmt.Errorf("error while initializing the backend: %s", err)
  316. }
  317. return err
  318. }
  319. // newStack creates a new Processor by chaining multiple Processors in a call stack
  320. // Decorators are functions of Decorator type, source files prefixed with p_*
  321. // Each decorator does a specific task during the processing stage.
  322. // This function uses the config value save_process or validate_process to figure out which Decorator to use
  323. func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
  324. var decorators []Decorator
  325. cfg := strings.ToLower(strings.TrimSpace(stackConfig))
  326. if len(cfg) == 0 {
  327. return NoopProcessor{}, nil
  328. }
  329. items := strings.Split(cfg, "|")
  330. for i := range items {
  331. name := items[len(items)-1-i] // reverse order, since decorators are stacked
  332. if makeFunc, ok := processors[name]; ok {
  333. decorators = append(decorators, makeFunc())
  334. } else {
  335. ErrProcessorNotFound = errors.New(fmt.Sprintf("processor [%s] not found", name))
  336. return nil, ErrProcessorNotFound
  337. }
  338. }
  339. // build the call-stack of decorators
  340. p := Decorate(DefaultProcessor{}, decorators...)
  341. return p, nil
  342. }
  343. func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
  344. var decorators []*StreamDecorator
  345. cfg := strings.ToLower(strings.TrimSpace(stackConfig))
  346. if len(cfg) == 0 {
  347. return streamer{NoopStreamProcessor{}, decorators}, nil
  348. }
  349. items := strings.Split(cfg, "|")
  350. for i := range items {
  351. name := items[len(items)-1-i] // reverse order, since decorators are stacked
  352. if makeFunc, ok := streamers[name]; ok {
  353. decorators = append(decorators, makeFunc())
  354. } else {
  355. ErrProcessorNotFound = errors.New(fmt.Sprintf("stream processor [%s] not found", name))
  356. return streamer{nil, decorators}, ErrProcessorNotFound
  357. }
  358. }
  359. // build the call-stack of decorators
  360. sp, decorators := DecorateStream(&DefaultStreamProcessor{}, decorators)
  361. return streamer{sp, decorators}, nil
  362. }
  363. // loadConfig loads the config for the GatewayConfig
  364. func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
  365. configType := BaseConfig(&GatewayConfig{})
  366. // Note: treat config values as immutable
  367. // if you need to change a config value, change in the file then
  368. // send a SIGHUP
  369. bcfg, err := Svc.ExtractConfig(cfg, configType)
  370. if err != nil {
  371. return err
  372. }
  373. gw.gwConfig = bcfg.(*GatewayConfig)
  374. return nil
  375. }
  376. // Initialize builds the workers and initializes each one
  377. func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
  378. gw.Lock()
  379. defer gw.Unlock()
  380. if gw.State != BackendStateNew && gw.State != BackendStateShuttered {
  381. return errors.New("can only Initialize in BackendStateNew or BackendStateShuttered state")
  382. }
  383. err := gw.loadConfig(cfg)
  384. if err != nil {
  385. gw.State = BackendStateError
  386. return err
  387. }
  388. workersSize := gw.workersSize()
  389. if workersSize < 1 {
  390. gw.State = BackendStateError
  391. return errors.New("must have at least 1 worker")
  392. }
  393. gw.processors = make([]Processor, 0)
  394. gw.validators = make([]Processor, 0)
  395. gw.streamers = make([]streamer, 0)
  396. for i := 0; i < workersSize; i++ {
  397. p, err := gw.newStack(gw.gwConfig.SaveProcess)
  398. if err != nil {
  399. gw.State = BackendStateError
  400. return err
  401. }
  402. gw.processors = append(gw.processors, p)
  403. v, err := gw.newStack(gw.gwConfig.ValidateProcess)
  404. if err != nil {
  405. gw.State = BackendStateError
  406. return err
  407. }
  408. gw.validators = append(gw.validators, v)
  409. s, err := gw.newStreamStack(gw.gwConfig.StreamSaveProcess)
  410. if err != nil {
  411. gw.State = BackendStateError
  412. return err
  413. }
  414. gw.streamers = append(gw.streamers, s)
  415. }
  416. // initialize processors
  417. if err := Svc.initialize(cfg); err != nil {
  418. gw.State = BackendStateError
  419. return err
  420. }
  421. if gw.conveyor == nil {
  422. gw.conveyor = make(chan *workerMsg, workersSize)
  423. }
  424. // ready to start
  425. gw.State = BackendStateInitialized
  426. return nil
  427. }
  428. // Start starts the worker goroutines, assuming it has been initialized or shuttered before
  429. func (gw *BackendGateway) Start() error {
  430. gw.Lock()
  431. defer gw.Unlock()
  432. if gw.State == BackendStateInitialized || gw.State == BackendStateShuttered {
  433. // we start our workers
  434. workersSize := gw.workersSize()
  435. // make our slice of channels for stopping
  436. gw.workStoppers = make([]chan bool, 0)
  437. // set the wait group
  438. gw.wg.Add(workersSize)
  439. for i := 0; i < workersSize; i++ {
  440. stop := make(chan bool)
  441. go func(workerId int, stop chan bool) {
  442. // blocks here until the worker exits
  443. for {
  444. state := gw.workDispatcher(
  445. gw.conveyor,
  446. gw.processors[workerId],
  447. gw.validators[workerId],
  448. gw.streamers[workerId],
  449. workerId+1,
  450. stop)
  451. // keep running after panic
  452. if state != dispatcherStatePanic {
  453. break
  454. }
  455. }
  456. gw.wg.Done()
  457. }(i, stop)
  458. gw.workStoppers = append(gw.workStoppers, stop)
  459. }
  460. gw.State = BackendStateRunning
  461. return nil
  462. } else {
  463. return errors.New(fmt.Sprintf("cannot start backend because it's in %s state", gw.State))
  464. }
  465. }
  466. // workersSize gets the number of workers to use for saving email by reading the save_workers_size config value
  467. // Returns 1 if no config value was set
  468. func (gw *BackendGateway) workersSize() int {
  469. if gw.gwConfig.WorkersSize <= 0 {
  470. return 1
  471. }
  472. return gw.gwConfig.WorkersSize
  473. }
  474. // saveTimeout returns the maximum amount of seconds to wait before timing out a save processing task
  475. func (gw *BackendGateway) saveTimeout() time.Duration {
  476. if gw.gwConfig.TimeoutSave == "" {
  477. return saveTimeout
  478. }
  479. t, err := time.ParseDuration(gw.gwConfig.TimeoutSave)
  480. if err != nil {
  481. return saveTimeout
  482. }
  483. return t
  484. }
  485. // validateRcptTimeout returns the maximum amount of seconds to wait before timing out a recipient validation task
  486. func (gw *BackendGateway) validateRcptTimeout() time.Duration {
  487. if gw.gwConfig.TimeoutValidateRcpt == "" {
  488. return validateRcptTimeout
  489. }
  490. t, err := time.ParseDuration(gw.gwConfig.TimeoutValidateRcpt)
  491. if err != nil {
  492. return validateRcptTimeout
  493. }
  494. return t
  495. }
  496. type dispatcherState int
  497. const (
  498. dispatcherStateStopped dispatcherState = iota
  499. dispatcherStateIdle
  500. dispatcherStateWorking
  501. dispatcherStateNotify
  502. dispatcherStatePanic
  503. )
  504. func (gw *BackendGateway) workDispatcher(
  505. workIn chan *workerMsg,
  506. save Processor,
  507. validate Processor,
  508. stream streamer,
  509. workerId int,
  510. stop chan bool) (state dispatcherState) {
  511. var msg *workerMsg
  512. defer func() {
  513. // panic recovery mechanism: it may panic when processing
  514. // since processors may call arbitrary code, some may be 3rd party / unstable
  515. // we need to detect the panic, and notify the backend that it failed & unlock the envelope
  516. if r := recover(); r != nil {
  517. Log().Error("worker recovered from panic:", r, string(debug.Stack()))
  518. if state == dispatcherStateWorking {
  519. msg.notifyMe <- &notifyMsg{err: errors.New("storage failed")}
  520. }
  521. state = dispatcherStatePanic
  522. return
  523. }
  524. // state is dispatcherStateStopped if it reached here
  525. return
  526. }()
  527. state = dispatcherStateIdle
  528. Log().Infof("processing worker started (#%d)", workerId)
  529. for {
  530. select {
  531. case <-stop:
  532. state = dispatcherStateStopped
  533. Log().Infof("stop signal for worker (#%d)", workerId)
  534. return
  535. case msg = <-workIn:
  536. state = dispatcherStateWorking // recovers from panic if in this state
  537. if msg.task == TaskSaveMail {
  538. result, err := save.Process(msg.e, msg.task)
  539. state = dispatcherStateNotify
  540. msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
  541. } else if msg.task == TaskSaveMailStream {
  542. err := stream.open(msg.e)
  543. if err == nil {
  544. var buf []byte
  545. // TODO make the buffer configurable
  546. buf = make([]byte, 1024*4)
  547. if msg.e.Values["size"], err = io.CopyBuffer(stream, msg.r, buf); err != nil {
  548. Log().WithError(err).Error("stream writing failed")
  549. }
  550. if err = stream.close(); err != nil {
  551. Log().WithError(err).Error("stream closing failed")
  552. }
  553. }
  554. state = dispatcherStateNotify
  555. var result Result
  556. if err != nil {
  557. result = NewResult(response.Canned.FailBackendTransaction, err)
  558. } else {
  559. result = NewResult(response.Canned.SuccessMessageQueued, response.SP, msg.e.QueuedId)
  560. }
  561. msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
  562. } else {
  563. result, err := validate.Process(msg.e, msg.task)
  564. state = dispatcherStateNotify
  565. msg.notifyMe <- &notifyMsg{err: err, result: result}
  566. }
  567. }
  568. state = dispatcherStateIdle
  569. }
  570. }
  571. // stopWorkers sends a signal to all workers to stop
  572. func (gw *BackendGateway) stopWorkers() {
  573. for i := range gw.workStoppers {
  574. gw.workStoppers[i] <- true
  575. }
  576. }