backend.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. log "github.com/Sirupsen/logrus"
  6. "github.com/flashmob/go-guerrilla/envelope"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. // Backends process received mail. Depending on the implementation, they can store mail in the database,
  13. // write to a file, check for spam, re-transmit to another server, etc.
  14. // Must return an SMTP message (i.e. "250 OK") and a boolean indicating
  15. // whether the message was processed successfully.
  16. type Backend interface {
  17. // Public methods
  18. Process(*envelope.Envelope) BackendResult
  19. Initialize(BackendConfig) error
  20. Shutdown() error
  21. // start save mail worker(s)
  22. saveMailWorker(chan *savePayload)
  23. // get the number of workers that will be stared
  24. getNumberOfWorkers() int
  25. // test database settings, permissions, correct paths, etc, before starting workers
  26. testSettings() error
  27. // parse the configuration files
  28. loadConfig(BackendConfig) error
  29. }
  30. type configLoader interface {
  31. loadConfig(backendConfig BackendConfig) (err error)
  32. }
  33. type BackendConfig map[string]interface{}
  34. var backends = map[string]Backend{}
  35. type baseConfig interface{}
  36. type saveStatus struct {
  37. err error
  38. hash string
  39. }
  40. type savePayload struct {
  41. mail *envelope.Envelope
  42. from *envelope.EmailAddress
  43. recipient *envelope.EmailAddress
  44. savedNotify chan *saveStatus
  45. }
  46. // BackendResult represents a response to an SMTP client after receiving DATA.
  47. // The String method should return an SMTP message ready to send back to the
  48. // client, for example `250 OK: Message received`.
  49. type BackendResult interface {
  50. fmt.Stringer
  51. // Code should return the SMTP code associated with this response, ie. `250`
  52. Code() int
  53. }
  54. // Internal implementation of BackendResult for use by backend implementations.
  55. type backendResult string
  56. func (br backendResult) String() string {
  57. return string(br)
  58. }
  59. // Parses the SMTP code from the first 3 characters of the SMTP message.
  60. // Returns 554 if code cannot be parsed.
  61. func (br backendResult) Code() int {
  62. trimmed := strings.TrimSpace(string(br))
  63. if len(trimmed) < 3 {
  64. return 554
  65. }
  66. code, err := strconv.Atoi(trimmed[:3])
  67. if err != nil {
  68. return 554
  69. }
  70. return code
  71. }
  72. func NewBackendResult(message string) BackendResult {
  73. return backendResult(message)
  74. }
  75. // A backend gateway is a proxy that implements the Backend interface.
  76. // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
  77. // via a channel. Shutting down via Shutdown() will stop all workers.
  78. // The rest of this program always talks to the backend via this gateway.
  79. type BackendGateway struct {
  80. AbstractBackend
  81. saveMailChan chan *savePayload
  82. // waits for backend workers to start/stop
  83. wg sync.WaitGroup
  84. b Backend
  85. // controls access to state
  86. stateGuard sync.Mutex
  87. State int
  88. config BackendConfig
  89. }
  90. // possible values for state
  91. const (
  92. BackendStateRunning = iota
  93. BackendStateShuttered
  94. BackendStateError
  95. )
  96. // New retrieve a backend specified by the backendName, and initialize it using
  97. // backendConfig
  98. func New(backendName string, backendConfig BackendConfig) (Backend, error) {
  99. backend, found := backends[backendName]
  100. if !found {
  101. return nil, fmt.Errorf("backend %q not found", backendName)
  102. }
  103. gateway := &BackendGateway{b: backend, config: backendConfig}
  104. err := gateway.Initialize(backendConfig)
  105. if err != nil {
  106. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  107. }
  108. gateway.State = BackendStateRunning
  109. return gateway, nil
  110. }
  111. // Distributes an envelope to one of the backend workers
  112. func (gw *BackendGateway) Process(e *envelope.Envelope) BackendResult {
  113. if gw.State != BackendStateRunning {
  114. return NewBackendResult("554 Transaction failed - backend not running" + strconv.Itoa(gw.State))
  115. }
  116. to := e.RcptTo
  117. from := e.MailFrom
  118. // place on the channel so that one of the save mail workers can pick it up
  119. // TODO: support multiple recipients
  120. savedNotify := make(chan *saveStatus)
  121. gw.saveMailChan <- &savePayload{e, from, &to[0], savedNotify}
  122. // wait for the save to complete
  123. // or timeout
  124. select {
  125. case status := <-savedNotify:
  126. if status.err != nil {
  127. return NewBackendResult("554 Error: " + status.err.Error())
  128. }
  129. return NewBackendResult(fmt.Sprintf("250 OK : queued as %s", status.hash))
  130. case <-time.After(time.Second * 30):
  131. log.Infof("Backend has timed out")
  132. return NewBackendResult("554 Error: transaction timeout")
  133. }
  134. }
  135. func (gw *BackendGateway) Shutdown() error {
  136. gw.stateGuard.Lock()
  137. defer gw.stateGuard.Unlock()
  138. if gw.State != BackendStateShuttered {
  139. err := gw.b.Shutdown()
  140. if err == nil {
  141. close(gw.saveMailChan) // workers will stop
  142. gw.wg.Wait()
  143. gw.State = BackendStateShuttered
  144. }
  145. return err
  146. }
  147. return nil
  148. }
  149. // Reinitialize starts up a backend gateway that was shutdown before
  150. func (gw *BackendGateway) Reinitialize() error {
  151. if gw.State != BackendStateShuttered {
  152. return errors.New("backend must be in BackendStateshuttered state to Reinitialize")
  153. }
  154. err := gw.Initialize(gw.config)
  155. if err != nil {
  156. return fmt.Errorf("error while initializing the backend: %s", err)
  157. } else {
  158. gw.State = BackendStateRunning
  159. }
  160. return err
  161. }
  162. func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
  163. err := gw.b.Initialize(cfg)
  164. if err == nil {
  165. workersSize := gw.b.getNumberOfWorkers()
  166. if workersSize < 1 {
  167. gw.State = BackendStateError
  168. return errors.New("Must have at least 1 worker")
  169. }
  170. if err := gw.b.testSettings(); err != nil {
  171. gw.State = BackendStateError
  172. return err
  173. }
  174. gw.saveMailChan = make(chan *savePayload, workersSize)
  175. // start our savemail workers
  176. gw.wg.Add(workersSize)
  177. for i := 0; i < workersSize; i++ {
  178. go func() {
  179. gw.b.saveMailWorker(gw.saveMailChan)
  180. gw.wg.Done()
  181. }()
  182. }
  183. } else {
  184. gw.State = BackendStateError
  185. }
  186. return err
  187. }