gateway.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "sync"
  7. "time"
  8. "github.com/flashmob/go-guerrilla/log"
  9. "github.com/flashmob/go-guerrilla/mail"
  10. "github.com/flashmob/go-guerrilla/response"
  11. "runtime/debug"
  12. "strings"
  13. )
  14. var ErrProcessorNotFound error
  15. // A backend gateway is a proxy that implements the Backend interface.
  16. // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
  17. // via a channel. Shutting down via Shutdown() will stop all workers.
  18. // The rest of this program always talks to the backend via this gateway.
  19. type BackendGateway struct {
  20. // channel for distributing envelopes to workers
  21. conveyor chan *workerMsg
  22. // waits for backend workers to start/stop
  23. wg sync.WaitGroup
  24. workStoppers []chan bool
  25. chains []Processor
  26. // controls access to state
  27. sync.Mutex
  28. State backendState
  29. config BackendConfig
  30. gwConfig *GatewayConfig
  31. }
  32. type GatewayConfig struct {
  33. WorkersSize int `json:"save_workers_size,omitempty"`
  34. ProcessorStack string `json:"process_stack,omitempty"`
  35. }
  36. // workerMsg is what get placed on the BackendGateway.saveMailChan channel
  37. type workerMsg struct {
  38. // The email data
  39. e *mail.Envelope
  40. // notifyMe is used to notify the gateway of workers finishing their processing
  41. notifyMe chan *notifyMsg
  42. // select the task type
  43. task SelectTask
  44. }
  45. type backendState int
  46. // possible values for state
  47. const (
  48. BackendStateNew backendState = iota
  49. BackendStateRunning
  50. BackendStateShuttered
  51. BackendStateError
  52. BackendStateInitialized
  53. processTimeout = time.Second * 30
  54. defaultProcessor = "Debugger"
  55. )
  56. func (s backendState) String() string {
  57. switch s {
  58. case BackendStateNew:
  59. return "NewState"
  60. case BackendStateRunning:
  61. return "RunningState"
  62. case BackendStateShuttered:
  63. return "ShutteredState"
  64. case BackendStateError:
  65. return "ErrorSate"
  66. case BackendStateInitialized:
  67. return "InitializedState"
  68. }
  69. return strconv.Itoa(int(s))
  70. }
  71. // New makes a new default BackendGateway backend, and initializes it using
  72. // backendConfig and stores the logger
  73. func New(backendConfig BackendConfig, l log.Logger) (Backend, error) {
  74. Svc.SetMainlog(l)
  75. gateway := &BackendGateway{}
  76. err := gateway.Initialize(backendConfig)
  77. if err != nil {
  78. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  79. }
  80. // keep the config known to be good.
  81. gateway.config = backendConfig
  82. b = Backend(gateway)
  83. return b, nil
  84. }
  85. // Process distributes an envelope to one of the backend workers
  86. func (gw *BackendGateway) Process(e *mail.Envelope) Result {
  87. if gw.State != BackendStateRunning {
  88. return NewResult(response.Canned.FailBackendNotRunning + gw.State.String())
  89. }
  90. // place on the channel so that one of the save mail workers can pick it up
  91. savedNotify := make(chan *notifyMsg)
  92. gw.conveyor <- &workerMsg{e, savedNotify, TaskSaveMail}
  93. // wait for the save to complete
  94. // or timeout
  95. select {
  96. case status := <-savedNotify:
  97. if status.err != nil {
  98. return NewResult(response.Canned.FailBackendTransaction + status.err.Error())
  99. }
  100. return NewResult(response.Canned.SuccessMessageQueued + status.queuedID)
  101. case <-time.After(processTimeout):
  102. Log().Infof("Backend has timed out")
  103. return NewResult(response.Canned.FailBackendTimeout)
  104. }
  105. }
  106. // ValidateRcpt asks one of the workers to validate the recipient
  107. // Only the last recipient appended to e.RcptTo will be validated.
  108. func (gw *BackendGateway) ValidateRcpt(e *mail.Envelope) RcptError {
  109. if gw.State != BackendStateRunning {
  110. return StorageNotAvailable
  111. }
  112. // place on the channel so that one of the save mail workers can pick it up
  113. notify := make(chan *notifyMsg)
  114. gw.conveyor <- &workerMsg{e, notify, TaskValidateRcpt}
  115. // wait for the validation to complete
  116. // or timeout
  117. select {
  118. case status := <-notify:
  119. if status.err != nil {
  120. return status.err
  121. }
  122. return nil
  123. case <-time.After(time.Second):
  124. Log().Infof("Backend has timed out")
  125. return StorageTimeout
  126. }
  127. }
  128. // Shutdown shuts down the backend and leaves it in BackendStateShuttered state
  129. func (gw *BackendGateway) Shutdown() error {
  130. gw.Lock()
  131. defer gw.Unlock()
  132. if gw.State != BackendStateShuttered {
  133. // send a signal to all workers
  134. gw.stopWorkers()
  135. // wait for workers to stop
  136. gw.wg.Wait()
  137. // call shutdown on all processor shutdowners
  138. if err := Svc.shutdown(); err != nil {
  139. return err
  140. }
  141. gw.State = BackendStateShuttered
  142. }
  143. return nil
  144. }
  145. // Reinitialize initializes the gateway with the existing config after it was shutdown
  146. func (gw *BackendGateway) Reinitialize() error {
  147. if gw.State != BackendStateShuttered {
  148. return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
  149. }
  150. //
  151. Svc.reset()
  152. err := gw.Initialize(gw.config)
  153. if err != nil {
  154. fmt.Println("reinitialize to ", gw.config, err)
  155. return fmt.Errorf("error while initializing the backend: %s", err)
  156. }
  157. return err
  158. }
  159. // newChain creates a new Processor by chaining multiple Processors in a call stack
  160. // Decorators are functions of Decorator type, source files prefixed with p_*
  161. // Each decorator does a specific task during the processing stage.
  162. // This function uses the config value process_stack to figure out which Decorator to use
  163. func (gw *BackendGateway) newChain() (Processor, error) {
  164. var decorators []Decorator
  165. cfg := strings.ToLower(strings.TrimSpace(gw.gwConfig.ProcessorStack))
  166. if len(cfg) == 0 {
  167. cfg = strings.ToLower(defaultProcessor)
  168. }
  169. items := strings.Split(cfg, "|")
  170. for i := range items {
  171. name := items[len(items)-1-i] // reverse order, since decorators are stacked
  172. if makeFunc, ok := processors[name]; ok {
  173. decorators = append(decorators, makeFunc())
  174. } else {
  175. ErrProcessorNotFound = errors.New(fmt.Sprintf("processor [%s] not found", name))
  176. return nil, ErrProcessorNotFound
  177. }
  178. }
  179. // build the call-stack of decorators
  180. p := Decorate(DefaultProcessor{}, decorators...)
  181. return p, nil
  182. }
  183. // loadConfig loads the config for the GatewayConfig
  184. func (gw *BackendGateway) loadConfig(cfg BackendConfig) error {
  185. configType := BaseConfig(&GatewayConfig{})
  186. // Note: treat config values as immutable
  187. // if you need to change a config value, change in the file then
  188. // send a SIGHUP
  189. bcfg, err := Svc.ExtractConfig(cfg, configType)
  190. if err != nil {
  191. return err
  192. }
  193. gw.gwConfig = bcfg.(*GatewayConfig)
  194. return nil
  195. }
  196. // Initialize builds the workers and initializes each one
  197. func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
  198. gw.Lock()
  199. defer gw.Unlock()
  200. if gw.State != BackendStateNew && gw.State != BackendStateShuttered {
  201. return errors.New("Can only Initialize in BackendStateNew or BackendStateShuttered state")
  202. }
  203. err := gw.loadConfig(cfg)
  204. if err == nil {
  205. workersSize := gw.workersSize()
  206. if workersSize < 1 {
  207. gw.State = BackendStateError
  208. return errors.New("Must have at least 1 worker")
  209. }
  210. gw.chains = make([]Processor, 0)
  211. for i := 0; i < workersSize; i++ {
  212. p, err := gw.newChain()
  213. if err != nil {
  214. gw.State = BackendStateError
  215. return err
  216. }
  217. gw.chains = append(gw.chains, p)
  218. }
  219. // initialize processors
  220. if err := Svc.initialize(cfg); err != nil {
  221. gw.State = BackendStateError
  222. return err
  223. }
  224. if gw.conveyor == nil {
  225. gw.conveyor = make(chan *workerMsg, workersSize)
  226. }
  227. // ready to start
  228. gw.State = BackendStateInitialized
  229. return nil
  230. }
  231. gw.State = BackendStateError
  232. return err
  233. }
  234. // Start starts the worker goroutines, assuming it has been initialized or shuttered before
  235. func (gw *BackendGateway) Start() error {
  236. gw.Lock()
  237. defer gw.Unlock()
  238. if gw.State == BackendStateInitialized || gw.State == BackendStateShuttered {
  239. // we start our workers
  240. workersSize := gw.workersSize()
  241. // make our slice of channels for stopping
  242. gw.workStoppers = make([]chan bool, 0)
  243. // set the wait group
  244. gw.wg.Add(workersSize)
  245. for i := 0; i < workersSize; i++ {
  246. stop := make(chan bool)
  247. go func(workerId int, stop chan bool) {
  248. // blocks here until the worker exits
  249. gw.workDispatcher(gw.conveyor, gw.chains[workerId], workerId+1, stop)
  250. gw.wg.Done()
  251. }(i, stop)
  252. gw.workStoppers = append(gw.workStoppers, stop)
  253. }
  254. gw.State = BackendStateRunning
  255. return nil
  256. } else {
  257. return errors.New(fmt.Sprintf("cannot start backend because it's in %s state", gw.State))
  258. }
  259. }
  260. // workersSize gets the number of workers to use for saving email by reading the save_workers_size config value
  261. // Returns 1 if no config value was set
  262. func (gw *BackendGateway) workersSize() int {
  263. if gw.gwConfig.WorkersSize == 0 {
  264. return 1
  265. }
  266. return gw.gwConfig.WorkersSize
  267. }
  268. func (gw *BackendGateway) workDispatcher(workIn chan *workerMsg, p Processor, workerId int, stop chan bool) {
  269. defer func() {
  270. if r := recover(); r != nil {
  271. // recover form closed channel
  272. Log().Error("worker recovered form panic:", r, string(debug.Stack()))
  273. }
  274. // close any connections / files
  275. Svc.shutdown()
  276. }()
  277. Log().Infof("processing worker started (#%d)", workerId)
  278. for {
  279. select {
  280. case <-stop:
  281. Log().Infof("stop signal for worker (#%d)", workerId)
  282. return
  283. case msg := <-workIn:
  284. if msg == nil {
  285. Log().Debugf("worker stopped (#%d)", workerId)
  286. return
  287. }
  288. if msg.task == TaskSaveMail {
  289. // process the email here
  290. // TODO we should check the err
  291. result, _ := p.Process(msg.e, TaskSaveMail)
  292. if result.Code() < 300 {
  293. // if all good, let the gateway know that it was queued
  294. msg.notifyMe <- &notifyMsg{nil, msg.e.QueuedId}
  295. } else {
  296. // notify the gateway about the error
  297. msg.notifyMe <- &notifyMsg{err: errors.New(result.String())}
  298. }
  299. } else if msg.task == TaskValidateRcpt {
  300. _, err := p.Process(msg.e, TaskValidateRcpt)
  301. if err != nil {
  302. // validation failed
  303. msg.notifyMe <- &notifyMsg{err: err}
  304. } else {
  305. // all good.
  306. msg.notifyMe <- &notifyMsg{err: nil}
  307. }
  308. }
  309. }
  310. }
  311. }
  312. // stopWorkers sends a signal to all workers to stop
  313. func (gw *BackendGateway) stopWorkers() {
  314. for i := range gw.workStoppers {
  315. gw.workStoppers[i] <- true
  316. }
  317. }