backend.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "time"
  9. "github.com/flashmob/go-guerrilla/envelope"
  10. "github.com/flashmob/go-guerrilla/log"
  11. "github.com/flashmob/go-guerrilla/response"
  12. )
  13. var mainlog log.Logger
  14. // Backends process received mail. Depending on the implementation, they can store mail in the database,
  15. // write to a file, check for spam, re-transmit to another server, etc.
  16. // Must return an SMTP message (i.e. "250 OK") and a boolean indicating
  17. // whether the message was processed successfully.
  18. type Backend interface {
  19. // Public methods
  20. Process(*envelope.Envelope) BackendResult
  21. Initialize(BackendConfig) error
  22. Shutdown() error
  23. // start save mail worker(s)
  24. saveMailWorker(chan *savePayload)
  25. // get the number of workers that will be stared
  26. getNumberOfWorkers() int
  27. // test database settings, permissions, correct paths, etc, before starting workers
  28. testSettings() error
  29. // parse the configuration files
  30. loadConfig(BackendConfig) error
  31. }
  32. type BackendConfig map[string]interface{}
  33. var backends = map[string]Backend{}
  34. type baseConfig interface{}
  35. type saveStatus struct {
  36. err error
  37. hash string
  38. }
  39. type savePayload struct {
  40. mail *envelope.Envelope
  41. from *envelope.EmailAddress
  42. recipient *envelope.EmailAddress
  43. savedNotify chan *saveStatus
  44. }
  45. // BackendResult represents a response to an SMTP client after receiving DATA.
  46. // The String method should return an SMTP message ready to send back to the
  47. // client, for example `250 OK: Message received`.
  48. type BackendResult interface {
  49. fmt.Stringer
  50. // Code should return the SMTP code associated with this response, ie. `250`
  51. Code() int
  52. }
  53. // Internal implementation of BackendResult for use by backend implementations.
  54. type backendResult string
  55. func (br backendResult) String() string {
  56. return string(br)
  57. }
  58. // Parses the SMTP code from the first 3 characters of the SMTP message.
  59. // Returns 554 if code cannot be parsed.
  60. func (br backendResult) Code() int {
  61. trimmed := strings.TrimSpace(string(br))
  62. if len(trimmed) < 3 {
  63. return 554
  64. }
  65. code, err := strconv.Atoi(trimmed[:3])
  66. if err != nil {
  67. return 554
  68. }
  69. return code
  70. }
  71. func NewBackendResult(message string) BackendResult {
  72. return backendResult(message)
  73. }
  74. // A backend gateway is a proxy that implements the Backend interface.
  75. // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
  76. // via a channel. Shutting down via Shutdown() will stop all workers.
  77. // The rest of this program always talks to the backend via this gateway.
  78. type BackendGateway struct {
  79. AbstractBackend
  80. saveMailChan chan *savePayload
  81. // waits for backend workers to start/stop
  82. wg sync.WaitGroup
  83. b Backend
  84. // controls access to state
  85. stateGuard sync.Mutex
  86. State backendState
  87. config BackendConfig
  88. }
  89. // possible values for state
  90. const (
  91. BackendStateRunning = iota
  92. BackendStateShuttered
  93. BackendStateError
  94. )
  95. type backendState int
  96. func (s backendState) String() string {
  97. return strconv.Itoa(int(s))
  98. }
  99. // New retrieve a backend specified by the backendName, and initialize it using
  100. // backendConfig
  101. func New(backendName string, backendConfig BackendConfig, l log.Logger) (Backend, error) {
  102. backend, found := backends[backendName]
  103. mainlog = l
  104. if !found {
  105. return nil, fmt.Errorf("backend %q not found", backendName)
  106. }
  107. gateway := &BackendGateway{b: backend, config: backendConfig}
  108. err := gateway.Initialize(backendConfig)
  109. if err != nil {
  110. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  111. }
  112. gateway.State = BackendStateRunning
  113. return gateway, nil
  114. }
  115. // Process distributes an envelope to one of the backend workers
  116. func (gw *BackendGateway) Process(e *envelope.Envelope) BackendResult {
  117. if gw.State != BackendStateRunning {
  118. return NewBackendResult(response.Canned.FailBackendNotRunning + gw.State.String())
  119. }
  120. to := e.RcptTo
  121. from := e.MailFrom
  122. // place on the channel so that one of the save mail workers can pick it up
  123. // TODO: support multiple recipients
  124. savedNotify := make(chan *saveStatus)
  125. gw.saveMailChan <- &savePayload{e, from, &to[0], savedNotify}
  126. // wait for the save to complete
  127. // or timeout
  128. select {
  129. case status := <-savedNotify:
  130. if status.err != nil {
  131. return NewBackendResult(response.Canned.FailBackendTransaction + status.err.Error())
  132. }
  133. return NewBackendResult(response.Canned.SuccessMessageQueued + status.hash)
  134. case <-time.After(time.Second * 30):
  135. mainlog.Infof("Backend has timed out")
  136. return NewBackendResult(response.Canned.FailBackendTimeout)
  137. }
  138. }
  139. func (gw *BackendGateway) Shutdown() error {
  140. gw.stateGuard.Lock()
  141. defer gw.stateGuard.Unlock()
  142. if gw.State != BackendStateShuttered {
  143. err := gw.b.Shutdown()
  144. if err == nil {
  145. close(gw.saveMailChan) // workers will stop
  146. gw.wg.Wait()
  147. gw.State = BackendStateShuttered
  148. }
  149. return err
  150. }
  151. return nil
  152. }
  153. // Reinitialize starts up a backend gateway that was shutdown before
  154. func (gw *BackendGateway) Reinitialize() error {
  155. if gw.State != BackendStateShuttered {
  156. return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
  157. }
  158. err := gw.Initialize(gw.config)
  159. if err != nil {
  160. return fmt.Errorf("error while initializing the backend: %s", err)
  161. }
  162. gw.State = BackendStateRunning
  163. return err
  164. }
  165. func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
  166. err := gw.b.Initialize(cfg)
  167. if err == nil {
  168. workersSize := gw.b.getNumberOfWorkers()
  169. if workersSize < 1 {
  170. gw.State = BackendStateError
  171. return errors.New("Must have at least 1 worker")
  172. }
  173. if err := gw.b.testSettings(); err != nil {
  174. gw.State = BackendStateError
  175. return err
  176. }
  177. gw.saveMailChan = make(chan *savePayload, workersSize)
  178. // start our savemail workers
  179. gw.wg.Add(workersSize)
  180. for i := 0; i < workersSize; i++ {
  181. go func() {
  182. gw.b.saveMailWorker(gw.saveMailChan)
  183. gw.wg.Done()
  184. }()
  185. }
  186. } else {
  187. gw.State = BackendStateError
  188. }
  189. return err
  190. }