backend.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package backends
  2. import (
  3. "errors"
  4. "fmt"
  5. log "github.com/Sirupsen/logrus"
  6. "github.com/flashmob/go-guerrilla/envelope"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. // Backends process received mail. Depending on the implementation, they can store mail in the database,
  13. // write to a file, check for spam, re-transmit to another server, etc.
  14. // Must return an SMTP message (i.e. "250 OK") and a boolean indicating
  15. // whether the message was processed successfully.
  16. type Backend interface {
  17. // Public methods
  18. Process(*envelope.Envelope) BackendResult
  19. Initialize(BackendConfig) error
  20. Shutdown() error
  21. // start save mail worker(s)
  22. saveMailWorker(chan *savePayload)
  23. // get the number of workers that will be stared
  24. getNumberOfWorkers() int
  25. // test database settings, permissions, correct paths, etc, before starting workers
  26. testSettings() error
  27. // parse the configuration files
  28. loadConfig(BackendConfig) error
  29. }
  30. type configLoader interface {
  31. loadConfig(backendConfig BackendConfig) (err error)
  32. }
  33. type BackendConfig map[string]interface{}
  34. var backends = map[string]Backend{}
  35. type baseConfig interface{}
  36. type saveStatus struct {
  37. err error
  38. hash string
  39. }
  40. type savePayload struct {
  41. mail *envelope.Envelope
  42. from *envelope.EmailAddress
  43. recipient *envelope.EmailAddress
  44. savedNotify chan *saveStatus
  45. }
  46. type helper struct {
  47. saveMailChan chan *savePayload
  48. wg sync.WaitGroup
  49. }
  50. // BackendResult represents a response to an SMTP client after receiving DATA.
  51. // The String method should return an SMTP message ready to send back to the
  52. // client, for example `250 OK: Message received`.
  53. type BackendResult interface {
  54. fmt.Stringer
  55. // Code should return the SMTP code associated with this response, ie. `250`
  56. Code() int
  57. }
  58. // Internal implementation of BackendResult for use by backend implementations.
  59. type backendResult string
  60. func (br backendResult) String() string {
  61. return string(br)
  62. }
  63. // Parses the SMTP code from the first 3 characters of the SMTP message.
  64. // Returns 554 if code cannot be parsed.
  65. func (br backendResult) Code() int {
  66. trimmed := strings.TrimSpace(string(br))
  67. if len(trimmed) < 3 {
  68. return 554
  69. }
  70. code, err := strconv.Atoi(trimmed[:3])
  71. if err != nil {
  72. return 554
  73. }
  74. return code
  75. }
  76. func NewBackendResult(message string) BackendResult {
  77. return backendResult(message)
  78. }
  79. // A backend gateway is a proxy that implements the Backend interface.
  80. // It is used to start multiple goroutine workers for saving mail, and then distribute email saving to the workers
  81. // via a channel. Shutting down via Shutdown() will stop all workers.
  82. // The rest of this program always talks to the backend via this gateway.
  83. type BackendGateway struct {
  84. AbstractBackend
  85. saveMailChan chan *savePayload
  86. // waits for backend workers to start/stop
  87. wg sync.WaitGroup
  88. b Backend
  89. // controls access to state
  90. stateGuard sync.Mutex
  91. state int
  92. }
  93. // possible values for state
  94. const (
  95. BackendStateProcessing = iota
  96. BackendStateShutdown
  97. )
  98. // New retrieve a backend specified by the backendName, and initialize it using
  99. // backendConfig
  100. func New(backendName string, backendConfig BackendConfig) (Backend, error) {
  101. backend, found := backends[backendName]
  102. if !found {
  103. return nil, fmt.Errorf("backend %q not found", backendName)
  104. }
  105. p := &BackendGateway{b: backend}
  106. err := p.Initialize(backendConfig)
  107. if err != nil {
  108. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  109. }
  110. p.state = BackendStateProcessing
  111. return p, nil
  112. }
  113. // Distributes an envelope to one of the backend workers
  114. func (gw *BackendGateway) Process(e *envelope.Envelope) BackendResult {
  115. to := e.RcptTo
  116. from := e.MailFrom
  117. // place on the channel so that one of the save mail workers can pick it up
  118. // TODO: support multiple recipients
  119. savedNotify := make(chan *saveStatus)
  120. gw.saveMailChan <- &savePayload{e, from, &to[0], savedNotify}
  121. // wait for the save to complete
  122. // or timeout
  123. select {
  124. case status := <-savedNotify:
  125. if status.err != nil {
  126. return NewBackendResult("554 Error: " + status.err.Error())
  127. }
  128. return NewBackendResult(fmt.Sprintf("250 OK : queued as %s", status.hash))
  129. case <-time.After(time.Second * 30):
  130. log.Infof("Backend has timed out")
  131. return NewBackendResult("554 Error: transaction timeout")
  132. }
  133. }
  134. func (gw *BackendGateway) Shutdown() error {
  135. gw.stateGuard.Lock()
  136. defer gw.stateGuard.Unlock()
  137. if gw.state != BackendStateShutdown {
  138. err := gw.b.Shutdown()
  139. if err == nil {
  140. close(gw.saveMailChan) // workers will stop
  141. gw.wg.Wait()
  142. gw.state = BackendStateShutdown
  143. }
  144. return err
  145. }
  146. return nil
  147. }
  148. func (gw *BackendGateway) Initialize(cfg BackendConfig) error {
  149. err := gw.b.Initialize(cfg)
  150. if err == nil {
  151. workersSize := gw.b.getNumberOfWorkers()
  152. if workersSize < 1 {
  153. return errors.New("Must have at least 1 worker")
  154. }
  155. if err := gw.b.testSettings(); err != nil {
  156. return err
  157. }
  158. gw.saveMailChan = make(chan *savePayload, workersSize)
  159. // start our savemail workers
  160. gw.wg.Add(workersSize)
  161. for i := 0; i < workersSize; i++ {
  162. go func() {
  163. gw.b.saveMailWorker(gw.saveMailChan)
  164. gw.wg.Done()
  165. }()
  166. }
  167. }
  168. return err
  169. }