gateway.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "reflect"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/flashmob/go-guerrilla/log"
  11. "github.com/flashmob/go-guerrilla/mail"
  12. "github.com/flashmob/go-guerrilla/response"
  13. "runtime/debug"
  14. )
  15. // A backend gateway is a proxy that implements the Backend interface.
  16. // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
  17. // via a channel. Shutting down via Shutdown() will stop all workers.
  18. // The rest of this program always talks to the backend via this gateway.
  19. type BackendGateway struct {
  20. // name is the name of the gateway given in the config
  21. name string
  22. // channel for distributing envelopes to workers
  23. conveyor chan *workerMsg
  24. conveyorValidation chan *workerMsg
  25. conveyorStream chan *workerMsg
  26. conveyorStreamBg chan *workerMsg
  27. // waits for backend workers to start/stop
  28. wg sync.WaitGroup
  29. workStoppers []chan bool
  30. processors []Processor
  31. validators []ValidatingProcessor
  32. streamers []streamer
  33. background []streamer
  34. producer *StreamDecorator
  35. decoratorLookup map[ConfigSection]map[string]*StreamDecorator
  36. workerID int
  37. // controls access to state
  38. sync.Mutex
  39. State backendState
  40. config BackendConfig
  41. gwConfig *GatewayConfig
  42. //buffers []byte // stream output buffer
  43. buffers map[int][]byte
  44. }
  45. // workerMsg is what get placed on the BackendGateway.saveMailChan channel
  46. type workerMsg struct {
  47. // The email data
  48. e *mail.Envelope
  49. // notifyMe is used to notify the gateway of workers finishing their processing
  50. notifyMe chan *notifyMsg
  51. // select the task type
  52. task SelectTask
  53. // io.Reader for streamed processor
  54. r io.Reader
  55. }
  56. type streamer struct {
  57. // StreamProcessor is a chain of StreamProcessor
  58. sp StreamProcessor
  59. // so that we can call Open and Close
  60. d []*StreamDecorator
  61. }
  62. func (s streamer) Write(p []byte) (n int, err error) {
  63. return s.sp.Write(p)
  64. }
  65. func (s *streamer) open(e *mail.Envelope) error {
  66. var err Errors
  67. for i := range s.d {
  68. if s.d[i].Open != nil {
  69. if e := s.d[i].Open(e); e != nil {
  70. err = append(err, e)
  71. }
  72. }
  73. }
  74. if len(err) == 0 {
  75. return nil
  76. }
  77. return err
  78. }
  79. func (s *streamer) close() error {
  80. var err Errors
  81. // close in reverse order
  82. for i := len(s.d) - 1; i >= 0; i-- {
  83. if s.d[i].Close != nil {
  84. if e := s.d[i].Close(); e != nil {
  85. err = append(err, e)
  86. }
  87. }
  88. }
  89. if len(err) == 0 {
  90. return nil
  91. }
  92. return err
  93. }
  94. func (s *streamer) shutdown() error {
  95. var err Errors
  96. // shutdown in reverse order
  97. for i := len(s.d) - 1; i >= 0; i-- {
  98. if s.d[i].Shutdown != nil {
  99. if e := s.d[i].Shutdown(); e != nil {
  100. err = append(err, e)
  101. }
  102. }
  103. }
  104. if len(err) == 0 {
  105. return nil
  106. }
  107. return err
  108. }
  109. type backendState int
  110. // possible values for state
  111. const (
  112. BackendStateNew backendState = iota
  113. BackendStateRunning
  114. BackendStateShuttered
  115. BackendStateError
  116. BackendStateInitialized
  117. )
  118. func (s backendState) String() string {
  119. switch s {
  120. case BackendStateNew:
  121. return "NewState"
  122. case BackendStateRunning:
  123. return "RunningState"
  124. case BackendStateShuttered:
  125. return "ShutteredState"
  126. case BackendStateError:
  127. return "ErrorSate"
  128. case BackendStateInitialized:
  129. return "InitializedState"
  130. }
  131. return strconv.Itoa(int(s))
  132. }
  133. // New makes a new default BackendGateway backend, and initializes it using
  134. // backendConfig and stores the logger
  135. func New(name string, backendConfig BackendConfig, l log.Logger) (Backend, error) {
  136. Svc.SetMainlog(l)
  137. gateway := &BackendGateway{name: name}
  138. backendConfig.toLower()
  139. // keep the a copy of the config
  140. gateway.config = backendConfig
  141. err := gateway.Initialize(backendConfig)
  142. if err != nil {
  143. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  144. }
  145. return gateway, nil
  146. }
  147. var workerMsgPool = sync.Pool{
  148. // if not available, then create a new one
  149. New: func() interface{} {
  150. return &workerMsg{}
  151. },
  152. }
  153. // reset resets a workerMsg that has been borrowed from the pool
  154. func (w *workerMsg) reset(e *mail.Envelope, task SelectTask) {
  155. if w.notifyMe == nil {
  156. w.notifyMe = make(chan *notifyMsg)
  157. }
  158. w.e = e
  159. w.task = task
  160. }
  161. func (gw *BackendGateway) Name() string {
  162. return gw.name
  163. }
  164. // Process distributes an envelope to one of the backend workers with a TaskSaveMail task
  165. func (gw *BackendGateway) Process(e *mail.Envelope) Result {
  166. if gw.State != BackendStateRunning {
  167. return NewResult(response.Canned.FailBackendNotRunning, response.SP, gw.State)
  168. }
  169. // borrow a workerMsg from the pool
  170. workerMsg := workerMsgPool.Get().(*workerMsg)
  171. defer workerMsgPool.Put(workerMsg)
  172. workerMsg.reset(e, TaskSaveMail)
  173. // place on the channel so that one of the save mail workers can pick it up
  174. gw.conveyor <- workerMsg
  175. // wait for the save to complete
  176. // or timeout
  177. select {
  178. case status := <-workerMsg.notifyMe:
  179. // email saving transaction completed
  180. if status.result == BackendResultOK && status.queuedID != "" {
  181. return NewResult(response.Canned.SuccessMessageQueued, response.SP, status.queuedID)
  182. }
  183. // A custom result, there was probably an error, if so, log it
  184. if status.result != nil {
  185. if status.err != nil {
  186. Log().Error(status.err)
  187. }
  188. return status.result
  189. }
  190. // if there was no result, but there's an error, then make a new result from the error
  191. if status.err != nil {
  192. if _, err := strconv.Atoi(status.err.Error()[:3]); err != nil {
  193. return NewResult(response.Canned.FailBackendTransaction, response.SP, status.err)
  194. }
  195. return NewResult(status.err)
  196. }
  197. // both result & error are nil (should not happen)
  198. err := errors.New("no response from backend - processor did not return a result or an error")
  199. Log().Error(err)
  200. return NewResult(response.Canned.FailBackendTransaction, response.SP, err)
  201. case <-time.After(gw.saveTimeout()):
  202. Log().Fields("queuedId", e.QueuedId).Error("backend has timed out while saving email")
  203. e.Add(1) // lock the envelope - it's still processing here, we don't want the server to recycle it
  204. go func() {
  205. // keep waiting for the backend to finish processing
  206. <-workerMsg.notifyMe
  207. Log().Fields("queuedId", e.QueuedId).Error("finished processing mail after timeout")
  208. e.Done()
  209. }()
  210. return NewResult(response.Canned.FailBackendTimeout)
  211. }
  212. }
  213. // ValidateRcpt asks one of the workers to validate the recipient
  214. // Only the last recipient appended to e.RcptTo will be validated.
  215. func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
  216. if gw.State != BackendStateRunning {
  217. return StorageNotAvailable
  218. }
  219. if _, ok := gw.validators[0].(NoopProcessor); ok {
  220. // no validator processors configured
  221. return nil
  222. }
  223. // place on the channel so that one of the save mail workers can pick it up
  224. workerMsg := workerMsgPool.Get().(*workerMsg)
  225. defer workerMsgPool.Put(workerMsg)
  226. workerMsg.reset(e, TaskValidateRcpt)
  227. gw.conveyorValidation <- workerMsg
  228. // wait for the validation to complete
  229. // or timeout
  230. select {
  231. case status := <-workerMsg.notifyMe:
  232. if status.err != nil {
  233. return status.err
  234. }
  235. return nil
  236. case <-time.After(gw.validateRcptTimeout()):
  237. Log().Fields("queuedId", e.QueuedId).Error("backend has timed out while validating rcpt")
  238. e.Add(1) // lock the envelope - it's still processing here, we don't want the server to recycle it
  239. go func() {
  240. <-workerMsg.notifyMe
  241. Log().Fields("queuedId", e.QueuedId).Error("finished validating rcpt after timeout")
  242. e.Done()
  243. }()
  244. return StorageTimeout
  245. }
  246. }
  247. func (gw *BackendGateway) StreamOn() bool {
  248. return len(gw.gwConfig.SaveStream) != 0
  249. }
  250. // newStreamDecorator creates a new StreamDecorator and calls Configure with its corresponding configuration
  251. // cs - the item of 'list' property, result from newStackStreamProcessorConfig()
  252. // section - which section of the config
  253. func (gw *BackendGateway) newStreamDecorator(cs stackConfigExpression, section ConfigSection) *StreamDecorator {
  254. if makeFunc, ok := Streamers[cs.name]; !ok {
  255. return nil
  256. } else {
  257. d := makeFunc()
  258. config := gw.config.lookupGroup(section, cs.String())
  259. if config == nil {
  260. config = ConfigGroup{}
  261. }
  262. if d.Configure != nil {
  263. if err := d.Configure(config); err != nil {
  264. return nil
  265. }
  266. }
  267. return d
  268. }
  269. }
  270. func (gw *BackendGateway) ProcessBackground(e *mail.Envelope) {
  271. if d := gw.producer; d == nil {
  272. Log().Error("gateway has failed creating a post_process_producer, check config")
  273. return
  274. } else {
  275. r, err := d.GetEmail(e.MessageID)
  276. if err != nil {
  277. Log().Fields("queuedID", e.QueuedId, "messageID", e.MessageID).
  278. Error("gateway background process aborted: email with messageID not found")
  279. return
  280. }
  281. // borrow a workerMsg from the pool
  282. workerMsg := workerMsgPool.Get().(*workerMsg)
  283. defer workerMsgPool.Put(workerMsg)
  284. workerMsg.reset(e, TaskSaveMailStream)
  285. workerMsg.r = r
  286. // place on the channel so that one of the save mail workers can pick it up
  287. // buffered channel will block if full
  288. select {
  289. case gw.conveyorStreamBg <- workerMsg:
  290. break
  291. case <-time.After(gw.saveTimeout()):
  292. Log().Fields("queuedID", e.QueuedId).Error("post-processing timeout - queue full, aborting")
  293. return
  294. }
  295. // process in the background
  296. for {
  297. select {
  298. case status := <-workerMsg.notifyMe:
  299. // email saving transaction completed
  300. var fields []interface{}
  301. var code int
  302. if status.result != nil {
  303. code = status.result.Code()
  304. fields = append(fields, "queuedID", e.QueuedId, "code", code)
  305. }
  306. if code > 200 && code < 300 {
  307. fields = append(fields, "messageID", e.MessageID)
  308. Log().Fields(fields...).Info("background process done")
  309. return
  310. }
  311. if status.err != nil {
  312. fields = append(fields, "error", status.err)
  313. }
  314. if len(fields) > 0 {
  315. Log().Fields(fields...).Error("post-process completed with an error")
  316. return
  317. }
  318. // both result & error are nil (should not happen)
  319. Log().Fields("queuedID", e.QueuedId).Error("no response from backend - post-process did not return a result or an error")
  320. return
  321. case <-time.After(gw.saveTimeout()):
  322. Log().Fields("queuedID", e.QueuedId).Error("background post-processing timed-out, will keep waiting")
  323. // don't return here, keep waiting for workerMsg.notifyMe
  324. }
  325. }
  326. }
  327. }
  328. func (gw *BackendGateway) ProcessStream(r io.Reader, e *mail.Envelope) (Result, error) {
  329. res := response.Canned
  330. if gw.State != BackendStateRunning {
  331. return NewResult(res.FailBackendNotRunning, response.SP, gw.State), errors.New(res.FailBackendNotRunning.String())
  332. }
  333. // borrow a workerMsg from the pool
  334. workerMsg := workerMsgPool.Get().(*workerMsg)
  335. workerMsgPool.Put(workerMsg)
  336. workerMsg.reset(e, TaskSaveMailStream)
  337. workerMsg.r = r
  338. // place on the channel so that one of the save mail workers can pick it up
  339. gw.conveyorStream <- workerMsg
  340. // wait for the save to complete
  341. // or timeout
  342. select {
  343. case status := <-workerMsg.notifyMe:
  344. // email saving transaction completed
  345. if status.result == BackendResultOK && status.queuedID != "" {
  346. return NewResult(res.SuccessMessageQueued, response.SP, status.queuedID), status.err
  347. }
  348. // A custom result, there was probably an error, if so, log it
  349. if status.result != nil {
  350. if status.err != nil {
  351. Log().Error(status.err)
  352. }
  353. return status.result, status.err
  354. }
  355. // if there was no result, but there's an error, then make a new result from the error
  356. if status.err != nil {
  357. if _, err := strconv.Atoi(status.err.Error()[:3]); err != nil {
  358. return NewResult(res.FailBackendTransaction, response.SP, status.err), status.err
  359. }
  360. return NewResult(status.err), status.err
  361. }
  362. // both result & error are nil (should not happen)
  363. err := errors.New("no response from backend - processor did not return a result or an error")
  364. Log().Error(err)
  365. return NewResult(res.FailBackendTransaction, response.SP, err), err
  366. case <-time.After(gw.saveTimeout()):
  367. Log().Fields("queuedID", e.QueuedId).Error("backend has timed out while saving email stream")
  368. e.Add(1) // lock the envelope - it's still processing here, we don't want the server to recycle it
  369. go func() {
  370. // keep waiting for the backend to finish processing
  371. <-workerMsg.notifyMe
  372. e.Done()
  373. Log().Fields("queuedID", e.QueuedId).Info("backend has finished saving email stream after timeout")
  374. }()
  375. return NewResult(res.FailBackendTimeout), errors.New("gateway timeout")
  376. }
  377. }
  378. // Shutdown shuts down the backend and leaves it in BackendStateShuttered state
  379. func (gw *BackendGateway) Shutdown() error {
  380. gw.Lock()
  381. defer gw.Unlock()
  382. if gw.State != BackendStateShuttered {
  383. // send a signal to all workers
  384. gw.stopWorkers()
  385. // wait for workers to stop
  386. gw.wg.Wait()
  387. for stream := range gw.streamers {
  388. err := gw.streamers[stream].shutdown()
  389. if err != nil {
  390. Log().Fields("error", err, "gateway", gw.name).Error("failed shutting down stream")
  391. }
  392. }
  393. for stream := range gw.background {
  394. err := gw.background[stream].shutdown()
  395. if err != nil {
  396. Log().Fields("error", err, "gateway", gw.name).Error("failed shutting down background stream")
  397. }
  398. }
  399. // call shutdown on all processor shutdowners
  400. if err := Svc.shutdown(); err != nil {
  401. return err
  402. }
  403. gw.State = BackendStateShuttered
  404. }
  405. return nil
  406. }
  407. // Reinitialize initializes the gateway with the existing config after it was shutdown
  408. func (gw *BackendGateway) Reinitialize() error {
  409. if gw.State != BackendStateShuttered {
  410. return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
  411. }
  412. // clear the Initializers and Shutdowners
  413. Svc.reset()
  414. err := gw.Initialize(gw.config)
  415. if err != nil {
  416. return fmt.Errorf("error while initializing the backend: %s", err)
  417. }
  418. return err
  419. }
  420. // newStack creates a new Processor by chaining multiple Processors in a call stack
  421. // Decorators are functions of Decorator type, source files prefixed with p_*
  422. // Each decorator does a specific task during the processing stage.
  423. // This function uses the config value save_process or validate_process to figure out which Decorator to use
  424. func (gw *BackendGateway) newStack(stackConfig string) (Processor, error) {
  425. var decorators []Decorator
  426. c := newStackProcessorConfig(stackConfig, newAliasMap(gw.config[ConfigProcessors]))
  427. if len(c.list) == 0 {
  428. return NoopProcessor{}, nil
  429. }
  430. for i := range c.list {
  431. if makeFunc, ok := processors[c.list[i].name]; ok {
  432. decorators = append(decorators, makeFunc())
  433. } else {
  434. return nil, c.notFound(c.list[i].name)
  435. }
  436. }
  437. // build the call-stack of decorators
  438. p := Decorate(DefaultProcessor{}, decorators...)
  439. return p, nil
  440. }
  441. func (gw *BackendGateway) newStreamStack(stackConfig string) (streamer, error) {
  442. var decorators []*StreamDecorator
  443. noop := streamer{NoopStreamProcessor{}, decorators}
  444. groupName := ConfigStreamProcessors
  445. c := newStackStreamProcessorConfig(stackConfig, newAliasMap(gw.config[groupName]))
  446. if len(c.list) == 0 {
  447. return noop, nil
  448. }
  449. for i := range c.list {
  450. if d := gw.newStreamDecorator(c.list[i], groupName); d != nil {
  451. if gw.decoratorLookup[groupName] == nil {
  452. gw.decoratorLookup[groupName] = make(map[string]*StreamDecorator)
  453. }
  454. gw.decoratorLookup[groupName][c.list[i].String()] = d
  455. decorators = append(decorators, d)
  456. } else {
  457. return streamer{nil, decorators}, c.notFound(c.list[i].name)
  458. }
  459. }
  460. // build the call-stack of decorators
  461. sp, decorators := DecorateStream(&DefaultStreamProcessor{}, decorators)
  462. return streamer{sp, decorators}, nil
  463. }
  464. // loadConfig loads the config for the GatewayConfig
  465. func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
  466. configType := BaseConfig(&GatewayConfig{})
  467. // Note: treat config values as immutable
  468. // if you need to change a config value, change in the file then
  469. // send a SIGHUP
  470. if gw.name == "" {
  471. gw.name = DefaultGateway
  472. }
  473. if _, ok := cfg[ConfigGateways][gw.name]; !ok {
  474. return errors.New("no such gateway configured: " + gw.name)
  475. }
  476. bcfg, err := Svc.ExtractConfig(ConfigGateways, gw.name, cfg, configType)
  477. if err != nil {
  478. return err
  479. }
  480. gw.gwConfig = bcfg.(*GatewayConfig)
  481. return nil
  482. }
  483. // Initialize builds the workers and initializes each one
  484. func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
  485. gw.Lock()
  486. defer gw.Unlock()
  487. if gw.State != BackendStateNew && gw.State != BackendStateShuttered {
  488. return errors.New("can only Initialize in BackendStateNew or BackendStateShuttered state")
  489. }
  490. err := gw.loadConfig(cfg)
  491. if err != nil {
  492. gw.State = BackendStateError
  493. return err
  494. }
  495. gw.buffers = make(map[int][]byte) // individual buffers are made later
  496. gw.decoratorLookup = make(map[ConfigSection]map[string]*StreamDecorator)
  497. gw.processors = make([]Processor, 0)
  498. gw.validators = make([]ValidatingProcessor, 0)
  499. gw.streamers = make([]streamer, 0)
  500. gw.background = make([]streamer, 0)
  501. for i := 0; i < gw.gwConfig.saveWorkersCount(); i++ {
  502. p, err := gw.newStack(gw.gwConfig.SaveProcess)
  503. if err != nil {
  504. gw.State = BackendStateError
  505. return err
  506. }
  507. gw.processors = append(gw.processors, p)
  508. }
  509. for i := 0; i < gw.gwConfig.validateWorkersCount(); i++ {
  510. v, err := gw.newStack(gw.gwConfig.ValidateProcess)
  511. if err != nil {
  512. gw.State = BackendStateError
  513. return err
  514. }
  515. gw.validators = append(gw.validators, v)
  516. }
  517. for i := 0; i < gw.gwConfig.streamWorkersCount(); i++ {
  518. s, err := gw.newStreamStack(gw.gwConfig.SaveStream)
  519. if err != nil {
  520. gw.State = BackendStateError
  521. return err
  522. }
  523. gw.streamers = append(gw.streamers, s)
  524. }
  525. for i := 0; i < gw.gwConfig.backgroundWorkersCount(); i++ {
  526. c, err := gw.newStreamStack(gw.gwConfig.PostProcessConsumer)
  527. if err != nil {
  528. gw.State = BackendStateError
  529. return err
  530. }
  531. gw.background = append(gw.background, c)
  532. }
  533. if err = gw.initProducer(); err != nil {
  534. return err
  535. }
  536. // Initialize processors & stream processors
  537. if err := Svc.Initialize(cfg); err != nil {
  538. gw.State = BackendStateError
  539. return err
  540. }
  541. gw.conveyor = make(chan *workerMsg, gw.gwConfig.saveProcessSize())
  542. gw.conveyorValidation = make(chan *workerMsg, gw.gwConfig.validateProcessSize())
  543. gw.conveyorStream = make(chan *workerMsg, gw.gwConfig.saveStreamSize())
  544. gw.conveyorStreamBg = make(chan *workerMsg, gw.gwConfig.postProcessSize())
  545. // ready to start
  546. gw.State = BackendStateInitialized
  547. return nil
  548. }
  549. // Start starts the worker goroutines, assuming it has been initialized or shuttered before
  550. func (gw *BackendGateway) Start() error {
  551. gw.Lock()
  552. defer gw.Unlock()
  553. if gw.State == BackendStateInitialized || gw.State == BackendStateShuttered {
  554. // make our slice of channels for stopping
  555. gw.workStoppers = make([]chan bool, 0)
  556. gw.startWorkers(gw.conveyor, gw.processors)
  557. gw.startWorkers(gw.conveyorValidation, gw.validators)
  558. gw.startWorkers(gw.conveyorStream, gw.streamers)
  559. gw.startWorkers(gw.conveyorStreamBg, gw.background)
  560. gw.State = BackendStateRunning
  561. return nil
  562. } else {
  563. return fmt.Errorf("cannot start backend because it's in %s state", gw.State)
  564. }
  565. }
  566. func (gw *BackendGateway) startWorkers(conveyor chan *workerMsg, processors interface{}) {
  567. p := reflect.ValueOf(processors)
  568. if reflect.TypeOf(processors).Kind() != reflect.Slice {
  569. panic("processors must be a slice")
  570. }
  571. // set the wait group (when stopping, it will block for all goroutines to exit)
  572. gw.wg.Add(p.Len())
  573. for i := 0; i < p.Len(); i++ {
  574. // set the buffer
  575. gw.buffers[gw.workerID] = gw.makeBuffer()
  576. // stop is a channel used for stopping the worker
  577. stop := make(chan bool)
  578. // start the worker and keep it running
  579. go func(workerId int, stop chan bool, i int) {
  580. // blocks here until the worker exits
  581. // for-loop used so that if workDispatcher panics, re-enter gw.workDispatcher
  582. for {
  583. state := gw.workDispatcher(
  584. conveyor,
  585. p.Index(i).Interface(),
  586. workerId,
  587. stop)
  588. // keep running after panic
  589. if state != dispatcherStatePanic {
  590. break
  591. }
  592. }
  593. gw.wg.Done()
  594. }(gw.workerID, stop, i)
  595. gw.workStoppers = append(gw.workStoppers, stop)
  596. gw.workerID++
  597. }
  598. }
  599. type dispatcherState int
  600. const (
  601. dispatcherStateStopped dispatcherState = iota
  602. dispatcherStateIdle
  603. dispatcherStateWorking
  604. dispatcherStateNotify
  605. dispatcherStatePanic
  606. )
  607. func (gw *BackendGateway) workDispatcher(
  608. workIn chan *workerMsg,
  609. processor interface{},
  610. workerId int,
  611. stop chan bool) (state dispatcherState) {
  612. var msg *workerMsg
  613. defer func() {
  614. // panic recovery mechanism: it may panic when processing
  615. // since processors may call arbitrary code, some may be 3rd party / unstable
  616. // we need to detect the panic, and notify the backend that it failed & unlock the envelope
  617. if r := recover(); r != nil {
  618. Log().Error("worker recovered from panic:", r, string(debug.Stack()))
  619. if state == dispatcherStateWorking {
  620. msg.notifyMe <- &notifyMsg{err: errors.New("storage failed")}
  621. }
  622. state = dispatcherStatePanic
  623. return
  624. }
  625. // state is dispatcherStateStopped if it reached here
  626. }()
  627. state = dispatcherStateIdle
  628. Log().Fields("id", workerId+1, "gateway", gw.name).
  629. Info("processing worker started")
  630. for {
  631. select {
  632. case <-stop:
  633. state = dispatcherStateStopped
  634. Log().Fields("id", workerId+1, "gateway", gw.name).
  635. Info("stop signal for worker")
  636. return
  637. case msg = <-workIn:
  638. state = dispatcherStateWorking // recovers from panic if in this state
  639. switch v := processor.(type) {
  640. case Processor:
  641. result, err := v.Process(msg.e, msg.task)
  642. state = dispatcherStateNotify
  643. msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
  644. case ValidatingProcessor:
  645. result, err := v.Process(msg.e, msg.task)
  646. state = dispatcherStateNotify
  647. msg.notifyMe <- &notifyMsg{err: err, result: result}
  648. case streamer:
  649. err := v.open(msg.e)
  650. if err == nil {
  651. if msg.e.Size, err = io.CopyBuffer(v, msg.r, gw.buffers[workerId]); err != nil {
  652. Log().Fields("error", err, "workerID", workerId+1).Error("stream writing failed")
  653. }
  654. if err = v.close(); err != nil {
  655. Log().Fields("error", err, "workerID", workerId+1).Error("stream closing failed")
  656. }
  657. }
  658. state = dispatcherStateNotify
  659. var result Result
  660. if err != nil {
  661. result = NewResult(response.Canned.FailBackendTransaction, err)
  662. } else {
  663. result = NewResult(response.Canned.SuccessMessageQueued, response.SP, msg.e.QueuedId)
  664. }
  665. msg.notifyMe <- &notifyMsg{err: err, result: result, queuedID: msg.e.QueuedId}
  666. }
  667. }
  668. state = dispatcherStateIdle
  669. }
  670. }
  671. func (gw *BackendGateway) makeBuffer() []byte {
  672. if gw.buffers == nil {
  673. gw.buffers = make(map[int][]byte)
  674. }
  675. size := configStreamBufferSize
  676. if gw.gwConfig.StreamBufferSize > 0 {
  677. size = gw.gwConfig.StreamBufferSize
  678. }
  679. return make([]byte, size)
  680. }
  681. // stopWorkers sends a signal to all workers to stop
  682. func (gw *BackendGateway) stopWorkers() {
  683. for i := range gw.workStoppers {
  684. gw.workStoppers[i] <- true
  685. }
  686. gw.workerID = 0
  687. }
  688. func (gw *BackendGateway) initProducer() error {
  689. notValid := errors.New("gateway has no valid [post_process_producer] configured")
  690. if gw.gwConfig.PostProcessConsumer == "" {
  691. // consumer not configured, so not active
  692. return nil
  693. }
  694. if gw.gwConfig.PostProcessProducer == "" {
  695. return notValid
  696. }
  697. section := ConfigStreamProcessors // which section of the config (stream_processors)
  698. m := newAliasMap(gw.config[section])
  699. c := newStackStreamProcessorConfig(gw.gwConfig.PostProcessProducer, m)
  700. if len(c.list) == 0 {
  701. return notValid
  702. }
  703. // check it there's already an instance of it
  704. if gw.decoratorLookup[section] != nil {
  705. if v, ok := gw.decoratorLookup[section][c.list[0].String()]; ok {
  706. gw.producer = v
  707. return nil
  708. }
  709. }
  710. if d := gw.newStreamDecorator(c.list[0], section); d != nil {
  711. // use a new instance
  712. gw.producer = d
  713. return nil
  714. } else {
  715. return errors.New("please check gateway config [post_process_producer]")
  716. }
  717. }