gateway.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "runtime/debug"
  10. "strings"
  11. "github.com/flashmob/go-guerrilla/log"
  12. "github.com/flashmob/go-guerrilla/mail"
  13. "github.com/flashmob/go-guerrilla/response"
  14. )
  15. var ErrProcessorNotFound error
  16. // A backend gateway is a proxy that implements the Backend interface.
  17. // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
  18. // via a channel. Shutting down via Shutdown() will stop all workers.
  19. // The rest of this program always talks to the backend via this gateway.
  20. type BackendGateway struct {
  21. // channel for distributing envelopes to workers
  22. conveyor chan *workerMsg
  23. // waits for backend workers to start/stop
  24. wg sync.WaitGroup
  25. workStoppers []chan bool
  26. processors []Processor
  27. validators []Processor
  28. streamers []streamer
  29. // controls access to state
  30. sync.Mutex
  31. State backendState
  32. config BackendConfig
  33. gwConfig *GatewayConfig
  34. buf []byte // stream output buffer
  35. }
  36. type GatewayConfig struct {
  37. // WorkersSize controls how many concurrent workers to start. Defaults to 1
  38. WorkersSize int `json:"save_workers_size,omitempty"`
  39. // SaveProcess controls which processors to chain in a stack for saving email tasks
  40. SaveProcess string `json:"save_process,omitempty"`
  41. // ValidateProcess is like ProcessorStack, but for recipient validation tasks
  42. ValidateProcess string `json:"validate_process,omitempty"`
  43. // TimeoutSave is duration before timeout when saving an email, eg "29s"
  44. TimeoutSave string `json:"gw_save_timeout,omitempty"`
  45. // TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
  46. TimeoutValidateRcpt string `json:"gw_val_rcpt_timeout,omitempty"`
  47. // StreamSaveProcess is same as a ProcessorStack, but reads from an io.Reader to write email data
  48. StreamSaveProcess string `json:"stream_save_process,omitempty"`
  49. // StreamBufferLen controls the size of the output buffer, in bytes. Default is 4096
  50. StreamBufferSize int `json:"stream_buffer_size,omitempty"`
  51. }
  52. // workerMsg is what get placed on the BackendGateway.saveMailChan channel
  53. type workerMsg struct {
  54. // The email data
  55. e *mail.Envelope
  56. // notifyMe is used to notify the gateway of workers finishing their processing
  57. notifyMe chan *notifyMsg
  58. // select the task type
  59. task SelectTask
  60. // io.Reader for streamed processor
  61. r io.Reader
  62. }
  63. type streamer struct {
  64. // StreamProcessor is a chain of StreamProcessor
  65. sp StreamProcessor
  66. // so that we can call Open and Close
  67. d []*StreamDecorator
  68. }
  69. func (s streamer) Write(p []byte) (n int, err error) {
  70. return s.sp.Write(p)
  71. }
  72. func (s *streamer) open(e *mail.Envelope) error {
  73. var err Errors
  74. for i := range s.d {
  75. if s.d[i].Open != nil {
  76. if e := s.d[i].Open(e); e != nil {
  77. err = append(err, e)
  78. }
  79. }
  80. }
  81. if len(err) == 0 {
  82. return nil
  83. }
  84. return err
  85. }
  86. func (s *streamer) close() error {
  87. var err Errors
  88. // close in reverse order
  89. for i := len(s.d) - 1; i >= 0; i-- {
  90. if s.d[i].Close != nil {
  91. if e := s.d[i].Close(); e != nil {
  92. err = append(err, e)
  93. }
  94. }
  95. }
  96. if len(err) == 0 {
  97. return nil
  98. }
  99. return err
  100. }
  101. type backendState int
  102. // possible values for state
  103. const (
  104. BackendStateNew backendState = iota
  105. BackendStateRunning
  106. BackendStateShuttered
  107. BackendStateError
  108. BackendStateInitialized
  109. // default timeout for saving email, if 'gw_save_timeout' not present in config
  110. saveTimeout = time.Second * 30
  111. // default timeout for validating rcpt to, if 'gw_val_rcpt_timeout' not present in config
  112. validateRcptTimeout = time.Second * 5
  113. defaultProcessor = "Debugger"
  114. // streamBufferSize sets the size of the buffer for the streaming processors,
  115. // can be configured using `stream_buffer_size`
  116. streamBufferSize = 4096
  117. )
  118. func (s backendState) String() string {
  119. switch s {
  120. case BackendStateNew:
  121. return "NewState"
  122. case BackendStateRunning:
  123. return "RunningState"
  124. case BackendStateShuttered:
  125. return "ShutteredState"
  126. case BackendStateError:
  127. return "ErrorSate"
  128. case BackendStateInitialized:
  129. return "InitializedState"
  130. }
  131. return strconv.Itoa(int(s))
  132. }
  133. // New makes a new default BackendGateway backend, and initializes it using
  134. // backendConfig and stores the logger
  135. func New(backendConfig BackendConfig, l log.Logger) (Backend, error) {
  136. Svc.SetMainlog(l)
  137. gateway := &BackendGateway{}
  138. err := gateway.Initialize(backendConfig)
  139. if err != nil {
  140. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  141. }
  142. // keep the config known to be good.
  143. gateway.config = backendConfig
  144. b = Backend(gateway)
  145. return b, nil
  146. }
  147. var workerMsgPool = sync.Pool{
  148. // if not available, then create a new one
  149. New: func() interface{} {
  150. return &workerMsg{}
  151. },
  152. }
  153. // reset resets a workerMsg that has been borrowed from the pool
  154. func (w *workerMsg) reset(e *mail.Envelope, task SelectTask) {
  155. if w.notifyMe == nil {
  156. w.notifyMe = make(chan *notifyMsg)
  157. }
  158. w.e = e
  159. w.task = task
  160. }
  161. // Process distributes an envelope to one of the backend workers with a TaskSaveMail task
  162. func (gw *BackendGateway) Process(e *mail.Envelope) Result {
  163. if gw.State != BackendStateRunning {
  164. return NewResult(response.Canned.FailBackendNotRunning, response.SP, gw.State)
  165. }
  166. // borrow a workerMsg from the pool
  167. workerMsg := workerMsgPool.Get().(*workerMsg)
  168. defer workerMsgPool.Put(workerMsg)
  169. workerMsg.reset(e, TaskSaveMail)
  170. // place on the channel so that one of the save mail workers can pick it up
  171. gw.conveyor <- workerMsg
  172. // wait for the save to complete
  173. // or timeout
  174. select {
  175. case status := <-workerMsg.notifyMe:
  176. // email saving transaction completed
  177. if status.result == BackendResultOK && status.queuedID != "" {
  178. return NewResult(response.Canned.SuccessMessageQueued, response.SP, status.queuedID)
  179. }
  180. // A custom result, there was probably an error, if so, log it
  181. if status.result != nil {
  182. if status.err != nil {
  183. Log().Error(status.err)
  184. }
  185. return status.result
  186. }
  187. // if there was no result, but there's an error, then make a new result from the error
  188. if status.err != nil {
  189. if _, err := strconv.Atoi(status.err.Error()[:3]); err != nil {
  190. return NewResult(response.Canned.FailBackendTransaction, response.SP, status.err)
  191. }
  192. return NewResult(status.err)
  193. }
  194. // both result & error are nil (should not happen)
  195. err := errors.New("no response from backend - processor did not return a result or an error")
  196. Log().Error(err)
  197. return NewResult(response.Canned.FailBackendTransaction, response.SP, err)
  198. case <-time.After(gw.saveTimeout()):
  199. Log().Error("Backend has timed out while saving email")
  200. e.Lock() // lock the envelope - it's still processing here, we don't want the server to recycle it
  201. go func() {
  202. // keep waiting for the backend to finish processing
  203. <-workerMsg.notifyMe
  204. e.Unlock()
  205. }()
  206. return NewResult(response.Canned.FailBackendTimeout)
  207. }
  208. }
  209. // ValidateRcpt asks one of the workers to validate the recipient
  210. // Only the last recipient appended to e.RcptTo will be validated.
  211. func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
  212. if gw.State != BackendStateRunning {
  213. return StorageNotAvailable
  214. }
  215. if _, ok := gw.validators[0].(NoopProcessor); ok {
  216. // no validator processors configured
  217. return nil
  218. }
  219. // place on the channel so that one of the save mail workers can pick it up
  220. workerMsg := workerMsgPool.Get().(*workerMsg)
  221. defer workerMsgPool.Put(workerMsg)
  222. workerMsg.reset(e, TaskValidateRcpt)
  223. gw.conveyor <- workerMsg
  224. // wait for the validation to complete
  225. // or timeout
  226. select {
  227. case status := <-workerMsg.notifyMe:
  228. if status.err != nil {
  229. return status.err
  230. }
  231. return nil
  232. case <-time.After(gw.validateRcptTimeout()):
  233. e.Lock()
  234. go func() {
  235. <-workerMsg.notifyMe
  236. e.Unlock()
  237. Log().Error("Backend has timed out while validating rcpt")
  238. }()
  239. return StorageTimeout
  240. }
  241. }
  242. func (gw *BackendGateway) StreamOn() bool {
  243. return len(gw.gwConfig.StreamSaveProcess) != 0
  244. }
  245. func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result, error) {
  246. res := response.Canned
  247. if gw.State != BackendStateRunning {
  248. return NewResult(res.FailBackendNotRunning, response.SP, gw.State), errors.New(res.FailBackendNotRunning.String())
  249. }
  250. // borrow a workerMsg from the pool
  251. workerMsg := workerMsgPool.Get().(*workerMsg)
  252. workerMsg.reset(e, TaskSaveMailStream)
  253. workerMsg.r = r
  254. // place on the channel so that one of the save mail workers can pick it up
  255. gw.conveyor <- workerMsg
  256. // wait for the save to complete
  257. // or timeout
  258. select {
  259. case status := <-workerMsg.notifyMe:
  260. // email saving transaction completed
  261. if status.result == BackendResultOK && status.queuedID != "" {
  262. return NewResult(res.SuccessMessageQueued, response.SP, status.queuedID), status.err
  263. }
  264. // A custom result, there was probably an error, if so, log it
  265. if status.result != nil {
  266. if status.err != nil {
  267. Log().Error(status.err)
  268. }
  269. return status.result, status.err
  270. }
  271. // if there was no result, but there's an error, then make a new result from the error
  272. if status.err != nil {
  273. if _, err := strconv.Atoi(status.err.Error()[:3]); err != nil {
  274. return NewResult(res.FailBackendTransaction, response.SP, status.err), status.err
  275. }
  276. return NewResult(status.err), status.err
  277. }
  278. // both result & error are nil (should not happen)
  279. err := errors.New("no response from backend - processor did not return a result or an error")
  280. Log().Error(err)
  281. return NewResult(res.FailBackendTransaction, response.SP, err), err
  282. case <-time.After(gw.saveTimeout()):
  283. Log().Error("Backend has timed out while saving email")
  284. e.Lock() // lock the envelope - it's still processing here, we don't want the server to recycle it
  285. go func() {
  286. // keep waiting for the backend to finish processing
  287. <-workerMsg.notifyMe
  288. e.Unlock()
  289. workerMsgPool.Put(workerMsg)
  290. }()
  291. return NewResult(res.FailBackendTimeout), errors.New("gateway timeout")
  292. }
  293. }
  294. // Shutdown shuts down the backend and leaves it in BackendStateShuttered state
  295. func (gw *BackendGateway) Shutdown() error {
  296. gw.Lock()
  297. defer gw.Unlock()
  298. if gw.State != BackendStateShuttered {
  299. // send a signal to all workers
  300. gw.stopWorkers()
  301. // wait for workers to stop
  302. gw.wg.Wait()
  303. // call shutdown on all processor shutdowners
  304. if err := Svc.shutdown(); err != nil {
  305. return err
  306. }
  307. gw.State = BackendStateShuttered
  308. }
  309. return nil
  310. }
  311. // Reinitialize initializes the gateway with the existing config after it was shutdown
  312. func (gw *BackendGateway) Reinitialize() error {
  313. if gw.State != BackendStateShuttered {
  314. return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
  315. }
  316. // clear the Initializers and Shutdowners
  317. Svc.reset()
  318. err := gw.Initialize(gw.config)
  319. if err != nil {
  320. fmt.Println("reinitialize to ", gw.config, err)
  321. return fmt.Errorf("error while initializing the backend: %s", err)
  322. }
  323. return err
  324. }
  325. // newStack creates a new Processor by chaining multiple Processors in a call stack
  326. // Decorators are functions of Decorator type, source files prefixed with p_*
  327. // Each decorator does a specific task during the processing stage.
  328. // This function uses the config value save_process or validate_process to figure out which Decorator to use
  329. func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
  330. var decorators []Decorator
  331. cfg := strings.ToLower(strings.TrimSpace(stackConfig))
  332. if len(cfg) == 0 {
  333. return NoopProcessor{}, nil
  334. }
  335. items := strings.Split(cfg, "|")
  336. for i := range items {
  337. name := items[len(items)-1-i] // reverse order, since decorators are stacked
  338. if makeFunc, ok := processors[name]; ok {
  339. decorators = append(decorators, makeFunc())
  340. } else {
  341. ErrProcessorNotFound = fmt.Errorf("processor [%s] not found", name)
  342. return nil, ErrProcessorNotFound
  343. }
  344. }
  345. // build the call-stack of decorators
  346. p := Decorate(DefaultProcessor{}, decorators...)
  347. return p, nil
  348. }
  349. func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
  350. var decorators []*StreamDecorator
  351. cfg := strings.ToLower(strings.TrimSpace(stackConfig))
  352. if len(cfg) == 0 {
  353. return streamer{NoopStreamProcessor{}, decorators}, nil
  354. }
  355. items := strings.Split(cfg, "|")
  356. for i := range items {
  357. name := items[len(items)-1-i] // reverse order, since decorators are stacked
  358. if makeFunc, ok := Streamers[name]; ok {
  359. decorators = append(decorators, makeFunc())
  360. } else {
  361. ErrProcessorNotFound = errors.New(fmt.Sprintf("stream processor [%s] not found", name))
  362. return streamer{nil, decorators}, ErrProcessorNotFound
  363. }
  364. }
  365. // build the call-stack of decorators
  366. sp, decorators := DecorateStream(&DefaultStreamProcessor{}, decorators)
  367. return streamer{sp, decorators}, nil
  368. }
  369. // loadConfig loads the config for the GatewayConfig
  370. func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
  371. configType := BaseConfig(&GatewayConfig{})
  372. // Note: treat config values as immutable
  373. // if you need to change a config value, change in the file then
  374. // send a SIGHUP
  375. bcfg, err := Svc.ExtractConfig(cfg, configType)
  376. if err != nil {
  377. return err
  378. }
  379. gw.gwConfig = bcfg.(*GatewayConfig)
  380. return nil
  381. }
  382. // Initialize builds the workers and initializes each one
  383. func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
  384. gw.Lock()
  385. defer gw.Unlock()
  386. if gw.State != BackendStateNew && gw.State != BackendStateShuttered {
  387. return errors.New("can only Initialize in BackendStateNew or BackendStateShuttered state")
  388. }
  389. err := gw.loadConfig(cfg)
  390. if err != nil {
  391. gw.State = BackendStateError
  392. return err
  393. }
  394. workersSize := gw.workersSize()
  395. if workersSize < 1 {
  396. gw.State = BackendStateError
  397. return errors.New("must have at least 1 worker")
  398. }
  399. gw.processors = make([]Processor, 0)
  400. gw.validators = make([]Processor, 0)
  401. gw.streamers = make([]streamer, 0)
  402. for i := 0; i < workersSize; i++ {
  403. p, err := gw.newStack(gw.gwConfig.SaveProcess)
  404. if err != nil {
  405. gw.State = BackendStateError
  406. return err
  407. }
  408. gw.processors = append(gw.processors, p)
  409. v, err := gw.newStack(gw.gwConfig.ValidateProcess)
  410. if err != nil {
  411. gw.State = BackendStateError
  412. return err
  413. }
  414. gw.validators = append(gw.validators, v)
  415. s, err := gw.newStreamStack(gw.gwConfig.StreamSaveProcess)
  416. if err != nil {
  417. gw.State = BackendStateError
  418. return err
  419. }
  420. gw.streamers = append(gw.streamers, s)
  421. }
  422. // Initialize processors
  423. if err := Svc.Initialize(cfg); err != nil {
  424. gw.State = BackendStateError
  425. return err
  426. }
  427. if gw.conveyor == nil {
  428. gw.conveyor = make(chan *workerMsg, workersSize)
  429. }
  430. size := streamBufferSize
  431. if gw.gwConfig.StreamBufferSize > 0 {
  432. size = gw.gwConfig.StreamBufferSize
  433. }
  434. gw.buf = make([]byte, size)
  435. // ready to start
  436. gw.State = BackendStateInitialized
  437. return nil
  438. }
  439. // Start starts the worker goroutines, assuming it has been initialized or shuttered before
  440. func (gw *BackendGateway) Start() error {
  441. gw.Lock()
  442. defer gw.Unlock()
  443. if gw.State == BackendStateInitialized || gw.State == BackendStateShuttered {
  444. // we start our workers
  445. workersSize := gw.workersSize()
  446. // make our slice of channels for stopping
  447. gw.workStoppers = make([]chan bool, 0)
  448. // set the wait group
  449. gw.wg.Add(workersSize)
  450. for i := 0; i < workersSize; i++ {
  451. stop := make(chan bool)
  452. go func(workerId int, stop chan bool) {
  453. // blocks here until the worker exits
  454. for {
  455. state := gw.workDispatcher(
  456. gw.conveyor,
  457. gw.processors[workerId],
  458. gw.validators[workerId],
  459. gw.streamers[workerId],
  460. workerId+1,
  461. stop)
  462. // keep running after panic
  463. if state != dispatcherStatePanic {
  464. break
  465. }
  466. }
  467. gw.wg.Done()
  468. }(i, stop)
  469. gw.workStoppers = append(gw.workStoppers, stop)
  470. }
  471. gw.State = BackendStateRunning
  472. return nil
  473. } else {
  474. return fmt.Errorf("cannot start backend because it's in %s state", gw.State)
  475. }
  476. }
  477. // workersSize gets the number of workers to use for saving email by reading the save_workers_size config value
  478. // Returns 1 if no config value was set
  479. func (gw *BackendGateway) workersSize() int {
  480. if gw.gwConfig.WorkersSize <= 0 {
  481. return 1
  482. }
  483. return gw.gwConfig.WorkersSize
  484. }
  485. // saveTimeout returns the maximum amount of seconds to wait before timing out a save processing task
  486. func (gw *BackendGateway) saveTimeout() time.Duration {
  487. if gw.gwConfig.TimeoutSave == "" {
  488. return saveTimeout
  489. }
  490. t, err := time.ParseDuration(gw.gwConfig.TimeoutSave)
  491. if err != nil {
  492. return saveTimeout
  493. }
  494. return t
  495. }
  496. // validateRcptTimeout returns the maximum amount of seconds to wait before timing out a recipient validation task
  497. func (gw *BackendGateway) validateRcptTimeout() time.Duration {
  498. if gw.gwConfig.TimeoutValidateRcpt == "" {
  499. return validateRcptTimeout
  500. }
  501. t, err := time.ParseDuration(gw.gwConfig.TimeoutValidateRcpt)
  502. if err != nil {
  503. return validateRcptTimeout
  504. }
  505. return t
  506. }
  507. type dispatcherState int
  508. const (
  509. dispatcherStateStopped dispatcherState = iota
  510. dispatcherStateIdle
  511. dispatcherStateWorking
  512. dispatcherStateNotify
  513. dispatcherStatePanic
  514. )
  515. func (gw *BackendGateway) workDispatcher(
  516. workIn chan *workerMsg,
  517. save Processor,
  518. validate Processor,
  519. stream streamer,
  520. workerId int,
  521. stop chan bool) (state dispatcherState) {
  522. var msg *workerMsg
  523. defer func() {
  524. // panic recovery mechanism: it may panic when processing
  525. // since processors may call arbitrary code, some may be 3rd party / unstable
  526. // we need to detect the panic, and notify the backend that it failed & unlock the envelope
  527. if r := recover(); r != nil {
  528. Log().Error("worker recovered from panic:", r, string(debug.Stack()))
  529. if state == dispatcherStateWorking {
  530. msg.notifyMe <- &notifyMsg{err: errors.New("storage failed")}
  531. }
  532. state = dispatcherStatePanic
  533. return
  534. }
  535. // state is dispatcherStateStopped if it reached here
  536. }()
  537. state = dispatcherStateIdle
  538. Log().Infof("processing worker started (#%d)", workerId)
  539. for {
  540. select {
  541. case <-stop:
  542. state = dispatcherStateStopped
  543. Log().Infof("stop signal for worker (#%d)", workerId)
  544. return
  545. case msg = <-workIn:
  546. state = dispatcherStateWorking // recovers from panic if in this state
  547. if msg.task == TaskSaveMail {
  548. result, err := save.Process(msg.e, msg.task)
  549. state = dispatcherStateNotify
  550. msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
  551. } else if msg.task == TaskSaveMailStream {
  552. err := stream.open(msg.e)
  553. if err == nil {
  554. if msg.e.Values["size"], err = io.CopyBuffer(stream, msg.r, gw.buf); err != nil {
  555. Log().WithError(err).Error("stream writing failed")
  556. }
  557. if err = stream.close(); err != nil {
  558. Log().WithError(err).Error("stream closing failed")
  559. }
  560. }
  561. state = dispatcherStateNotify
  562. var result Result
  563. if err != nil {
  564. result = NewResult(response.Canned.FailBackendTransaction, err)
  565. } else {
  566. result = NewResult(response.Canned.SuccessMessageQueued, response.SP, msg.e.QueuedId)
  567. }
  568. msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
  569. } else {
  570. result, err := validate.Process(msg.e, msg.task)
  571. state = dispatcherStateNotify
  572. msg.notifyMe <- &notifyMsg{err: err, result: result}
  573. }
  574. }
  575. state = dispatcherStateIdle
  576. }
  577. }
  578. // stopWorkers sends a signal to all workers to stop
  579. func (gw *BackendGateway) stopWorkers() {
  580. for i := range gw.workStoppers {
  581. gw.workStoppers[i] <- true
  582. }
  583. }