backend.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package backends
  2. import (
  3. "fmt"
  4. "github.com/flashmob/go-guerrilla/envelope"
  5. "github.com/flashmob/go-guerrilla/log"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. "sync"
  10. "sync/atomic"
  11. )
  12. var (
  13. Svc *Service
  14. // deprecated backends system
  15. backends = map[string]Backend{}
  16. // Store the constructor for making an new processor decorator.
  17. processors map[string]processorConstructor
  18. b Backend // todo make sure legacy works
  19. )
  20. func init() {
  21. Svc = &Service{}
  22. processors = make(map[string]processorConstructor)
  23. }
  24. type processorConstructor func() Decorator
  25. // Backends process received mail. Depending on the implementation, they can store mail in the database,
  26. // write to a file, check for spam, re-transmit to another server, etc.
  27. // Must return an SMTP message (i.e. "250 OK") and a boolean indicating
  28. // whether the message was processed successfully.
  29. type Backend interface {
  30. // Public methods
  31. Process(*envelope.Envelope) Result
  32. ValidateRcpt(e *envelope.Envelope) RcptError
  33. Initialize(BackendConfig) error
  34. Shutdown() error
  35. }
  36. type BackendConfig map[string]interface{}
  37. // All config structs extend from this
  38. type BaseConfig interface{}
  39. type notifyMsg struct {
  40. err error
  41. queuedID string
  42. }
  43. // Result represents a response to an SMTP client after receiving DATA.
  44. // The String method should return an SMTP message ready to send back to the
  45. // client, for example `250 OK: Message received`.
  46. type Result interface {
  47. fmt.Stringer
  48. // Code should return the SMTP code associated with this response, ie. `250`
  49. Code() int
  50. }
  51. // Internal implementation of BackendResult for use by backend implementations.
  52. type result string
  53. func (br result) String() string {
  54. return string(br)
  55. }
  56. // Parses the SMTP code from the first 3 characters of the SMTP message.
  57. // Returns 554 if code cannot be parsed.
  58. func (br result) Code() int {
  59. trimmed := strings.TrimSpace(string(br))
  60. if len(trimmed) < 3 {
  61. return 554
  62. }
  63. code, err := strconv.Atoi(trimmed[:3])
  64. if err != nil {
  65. return 554
  66. }
  67. return code
  68. }
  69. func NewResult(message string) Result {
  70. return result(message)
  71. }
  72. type ProcessorInitializer interface {
  73. Initialize(backendConfig BackendConfig) error
  74. }
  75. type ProcessorShutdowner interface {
  76. Shutdown() error
  77. }
  78. type Initialize func(backendConfig BackendConfig) error
  79. type Shutdown func() error
  80. // Satisfy ProcessorInitializer interface
  81. // So we can now pass an anonymous function that implements ProcessorInitializer
  82. func (i Initialize) Initialize(backendConfig BackendConfig) error {
  83. // delegate to the anonymous function
  84. return i(backendConfig)
  85. }
  86. // satisfy ProcessorShutdowner interface, same concept as Initialize type
  87. func (s Shutdown) Shutdown() error {
  88. // delegate
  89. return s()
  90. }
  91. type Errors []error
  92. // implement the Error interface
  93. func (e Errors) Error() string {
  94. if len(e) == 1 {
  95. return e[0].Error()
  96. }
  97. // multiple errors
  98. msg := ""
  99. for _, err := range e {
  100. msg += "\n" + err.Error()
  101. }
  102. return msg
  103. }
  104. // New retrieve a backend specified by the backendName, and initialize it using
  105. // backendConfig
  106. func New(backendName string, backendConfig BackendConfig, l log.Logger) (Backend, error) {
  107. Svc.StoreMainlog(l)
  108. if backend, found := backends[backendName]; found {
  109. err := backend.Initialize(backendConfig)
  110. if err != nil {
  111. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  112. }
  113. b = backend
  114. } else {
  115. gateway := &BackendGateway{config: backendConfig}
  116. err := gateway.Initialize(backendConfig)
  117. if err != nil {
  118. return nil, fmt.Errorf("error while initializing the backend: %s", err)
  119. }
  120. gateway.State = BackendStateRunning
  121. b = Backend(gateway)
  122. }
  123. return b, nil
  124. }
  125. func GetBackend() Backend {
  126. return b
  127. }
  128. type Service struct {
  129. initializers []ProcessorInitializer
  130. shutdowners []ProcessorShutdowner
  131. sync.Mutex
  132. mainlog atomic.Value
  133. }
  134. // Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
  135. func Log() log.Logger {
  136. if v, ok := Svc.mainlog.Load().(log.Logger); ok {
  137. return v
  138. }
  139. l, _ := log.GetLogger(log.OutputStderr.String())
  140. return l
  141. }
  142. func (s *Service) StoreMainlog(l log.Logger) {
  143. s.mainlog.Store(l)
  144. }
  145. // AddInitializer adds a function that implements ProcessorShutdowner to be called when initializing
  146. func (s *Service) AddInitializer(i ProcessorInitializer) {
  147. s.Lock()
  148. defer s.Unlock()
  149. s.initializers = append(s.initializers, i)
  150. }
  151. // AddShutdowner adds a function that implements ProcessorShutdowner to be called when shutting down
  152. func (s *Service) AddShutdowner(sh ProcessorShutdowner) {
  153. s.Lock()
  154. defer s.Unlock()
  155. s.shutdowners = append(s.shutdowners, sh)
  156. }
  157. // Initialize initializes all the processors one-by-one and returns any errors.
  158. // Subsequent calls to Initialize will not call the initializer again unless it failed on the previous call
  159. // so Initialize may be called again to retry after getting errors
  160. func (s *Service) initialize(backend BackendConfig) Errors {
  161. s.Lock()
  162. defer s.Unlock()
  163. var errors Errors
  164. failed := make([]ProcessorInitializer, 0)
  165. for i := range s.initializers {
  166. if err := s.initializers[i].Initialize(backend); err != nil {
  167. errors = append(errors, err)
  168. failed = append(failed, s.initializers[i])
  169. }
  170. }
  171. // keep only the failed initializers
  172. s.initializers = failed
  173. return errors
  174. }
  175. // Shutdown shuts down all the processors by calling their shutdowners (if any)
  176. // Subsequent calls to Shutdown will not call the shutdowners again unless it failed on the previous call
  177. // so Shutdown may be called again to retry after getting errors
  178. func (s *Service) shutdown() Errors {
  179. s.Lock()
  180. defer s.Unlock()
  181. var errors Errors
  182. failed := make([]ProcessorShutdowner, 0)
  183. for i := range s.shutdowners {
  184. if err := s.shutdowners[i].Shutdown(); err != nil {
  185. errors = append(errors, err)
  186. failed = append(failed, s.shutdowners[i])
  187. }
  188. }
  189. s.shutdowners = failed
  190. return errors
  191. }
  192. // AddProcessor adds a new processor, which becomes available to the backend_config.process_stack option
  193. // Use to add your own custom processor when using backends as a package, or after importing an external
  194. // processor.
  195. func (s *Service) AddProcessor(name string, p processorConstructor) {
  196. // wrap in a constructor since we want to defer calling it
  197. var c processorConstructor
  198. c = func() Decorator {
  199. return p()
  200. }
  201. // add to our processors list
  202. processors[strings.ToLower(name)] = c
  203. }
  204. // extractConfig loads the backend config. It has already been unmarshalled
  205. // configData contains data from the main config file's "backend_config" value
  206. // configType is a Processor's specific config value.
  207. // The reason why using reflection is because we'll get a nice error message if the field is missing
  208. // the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
  209. // error messages
  210. func (s *Service) ExtractConfig(configData BackendConfig, configType BaseConfig) (interface{}, error) {
  211. // Use reflection so that we can provide a nice error message
  212. v := reflect.ValueOf(configType).Elem() // so that we can set the values
  213. //m := reflect.ValueOf(configType).Elem()
  214. t := reflect.TypeOf(configType).Elem()
  215. typeOfT := v.Type()
  216. for i := 0; i < v.NumField(); i++ {
  217. f := v.Field(i)
  218. // read the tags of the config struct
  219. field_name := t.Field(i).Tag.Get("json")
  220. if len(field_name) > 0 {
  221. // parse the tag to
  222. // get the field name from struct tag
  223. split := strings.Split(field_name, ",")
  224. field_name = split[0]
  225. } else {
  226. // could have no tag
  227. // so use the reflected field name
  228. field_name = typeOfT.Field(i).Name
  229. }
  230. if f.Type().Name() == "int" {
  231. // in json, there is no int, only floats...
  232. if intVal, converted := configData[field_name].(float64); converted {
  233. v.Field(i).SetInt(int64(intVal))
  234. } else if intVal, converted := configData[field_name].(int); converted {
  235. v.Field(i).SetInt(int64(intVal))
  236. } else {
  237. return configType, convertError("property missing/invalid: '" + field_name + "' of expected type: " + f.Type().Name())
  238. }
  239. }
  240. if f.Type().Name() == "string" {
  241. if stringVal, converted := configData[field_name].(string); converted {
  242. v.Field(i).SetString(stringVal)
  243. } else {
  244. return configType, convertError("missing/invalid: '" + field_name + "' of type: " + f.Type().Name())
  245. }
  246. }
  247. if f.Type().Name() == "bool" {
  248. if boolVal, converted := configData[field_name].(bool); converted {
  249. v.Field(i).SetBool(boolVal)
  250. } else {
  251. return configType, convertError("missing/invalid: '" + field_name + "' of type: " + f.Type().Name())
  252. }
  253. }
  254. }
  255. return configType, nil
  256. }