backend.go 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package backends
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/flashmob/go-guerrilla/log"
  6. "github.com/flashmob/go-guerrilla/mail"
  7. "reflect"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. )
  13. var (
  14. Svc *service
  15. // Store the constructor for making an new processor decorator.
  16. processors map[string]ProcessorConstructor
  17. b Backend
  18. )
  19. func init() {
  20. Svc = &service{}
  21. processors = make(map[string]ProcessorConstructor)
  22. }
  23. type ProcessorConstructor func() Decorator
  24. // Backends process received mail. Depending on the implementation, they can store mail in the database,
  25. // write to a file, check for spam, re-transmit to another server, etc.
  26. // Must return an SMTP message (i.e. "250 OK") and a boolean indicating
  27. // whether the message was processed successfully.
  28. type Backend interface {
  29. // Process processes then saves the mail envelope
  30. Process(*mail.Envelope) Result
  31. // ValidateRcpt validates the last recipient that was pushed to the mail envelope
  32. ValidateRcpt(e *mail.Envelope) RcptError
  33. // Initializes the backend, eg. creates folders, sets-up database connections
  34. Initialize(BackendConfig) error
  35. // Initializes the backend after it was Shutdown()
  36. Reinitialize() error
  37. // Shutdown frees / closes anything created during initializations
  38. Shutdown() error
  39. // Start Starts a backend that has been initialized
  40. Start() error
  41. }
  42. type BackendConfig map[string]interface{}
  43. // All config structs extend from this
  44. type BaseConfig interface{}
  45. type notifyMsg struct {
  46. err error
  47. queuedID string
  48. result Result
  49. }
  50. // Result represents a response to an SMTP client after receiving DATA.
  51. // The String method should return an SMTP message ready to send back to the
  52. // client, for example `250 OK: Message received`.
  53. type Result interface {
  54. fmt.Stringer
  55. // Code should return the SMTP code associated with this response, ie. `250`
  56. Code() int
  57. }
  58. // Internal implementation of BackendResult for use by backend implementations.
  59. type result struct {
  60. // we're going to use a bytes.Buffer for building a string
  61. bytes.Buffer
  62. }
  63. func (r *result) String() string {
  64. return r.Buffer.String()
  65. }
  66. // Parses the SMTP code from the first 3 characters of the SMTP message.
  67. // Returns 554 if code cannot be parsed.
  68. func (r *result) Code() int {
  69. trimmed := strings.TrimSpace(r.String())
  70. if len(trimmed) < 3 {
  71. return 554
  72. }
  73. code, err := strconv.Atoi(trimmed[:3])
  74. if err != nil {
  75. return 554
  76. }
  77. return code
  78. }
  79. func NewResult(r ...interface{}) Result {
  80. buf := new(result)
  81. for _, item := range r {
  82. switch v := item.(type) {
  83. case error:
  84. _, _ = buf.WriteString(v.Error())
  85. case fmt.Stringer:
  86. _, _ = buf.WriteString(v.String())
  87. case string:
  88. _, _ = buf.WriteString(v)
  89. }
  90. }
  91. return buf
  92. }
  93. type processorInitializer interface {
  94. Initialize(backendConfig BackendConfig) error
  95. }
  96. type processorShutdowner interface {
  97. Shutdown() error
  98. }
  99. type InitializeWith func(backendConfig BackendConfig) error
  100. type ShutdownWith func() error
  101. // Satisfy ProcessorInitializer interface
  102. // So we can now pass an anonymous function that implements ProcessorInitializer
  103. func (i InitializeWith) Initialize(backendConfig BackendConfig) error {
  104. // delegate to the anonymous function
  105. return i(backendConfig)
  106. }
  107. // satisfy ProcessorShutdowner interface, same concept as InitializeWith type
  108. func (s ShutdownWith) Shutdown() error {
  109. // delegate
  110. return s()
  111. }
  112. type Errors []error
  113. // implement the Error interface
  114. func (e Errors) Error() string {
  115. if len(e) == 1 {
  116. return e[0].Error()
  117. }
  118. // multiple errors
  119. msg := ""
  120. for _, err := range e {
  121. msg += "\n" + err.Error()
  122. }
  123. return msg
  124. }
  125. func convertError(name string) error {
  126. return fmt.Errorf("failed to load backend config (%s)", name)
  127. }
  128. type service struct {
  129. initializers []processorInitializer
  130. shutdowners []processorShutdowner
  131. sync.Mutex
  132. mainlog atomic.Value
  133. }
  134. // Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
  135. func Log() log.Logger {
  136. if v, ok := Svc.mainlog.Load().(log.Logger); ok {
  137. return v
  138. }
  139. l, _ := log.GetLogger(log.OutputStderr.String(), log.InfoLevel.String())
  140. return l
  141. }
  142. func (s *service) SetMainlog(l log.Logger) {
  143. s.mainlog.Store(l)
  144. }
  145. // AddInitializer adds a function that implements ProcessorShutdowner to be called when initializing
  146. func (s *service) AddInitializer(i processorInitializer) {
  147. s.Lock()
  148. defer s.Unlock()
  149. s.initializers = append(s.initializers, i)
  150. }
  151. // AddShutdowner adds a function that implements ProcessorShutdowner to be called when shutting down
  152. func (s *service) AddShutdowner(sh processorShutdowner) {
  153. s.Lock()
  154. defer s.Unlock()
  155. s.shutdowners = append(s.shutdowners, sh)
  156. }
  157. // reset clears the initializers and Shutdowners
  158. func (s *service) reset() {
  159. s.shutdowners = make([]processorShutdowner, 0)
  160. s.initializers = make([]processorInitializer, 0)
  161. }
  162. // Initialize initializes all the processors one-by-one and returns any errors.
  163. // Subsequent calls to Initialize will not call the initializer again unless it failed on the previous call
  164. // so Initialize may be called again to retry after getting errors
  165. func (s *service) initialize(backend BackendConfig) Errors {
  166. s.Lock()
  167. defer s.Unlock()
  168. var errors Errors
  169. failed := make([]processorInitializer, 0)
  170. for i := range s.initializers {
  171. if err := s.initializers[i].Initialize(backend); err != nil {
  172. errors = append(errors, err)
  173. failed = append(failed, s.initializers[i])
  174. }
  175. }
  176. // keep only the failed initializers
  177. s.initializers = failed
  178. return errors
  179. }
  180. // Shutdown shuts down all the processors by calling their shutdowners (if any)
  181. // Subsequent calls to Shutdown will not call the shutdowners again unless it failed on the previous call
  182. // so Shutdown may be called again to retry after getting errors
  183. func (s *service) shutdown() Errors {
  184. s.Lock()
  185. defer s.Unlock()
  186. var errors Errors
  187. failed := make([]processorShutdowner, 0)
  188. for i := range s.shutdowners {
  189. if err := s.shutdowners[i].Shutdown(); err != nil {
  190. errors = append(errors, err)
  191. failed = append(failed, s.shutdowners[i])
  192. }
  193. }
  194. s.shutdowners = failed
  195. return errors
  196. }
  197. // AddProcessor adds a new processor, which becomes available to the backend_config.save_process option
  198. // and also the backend_config.validate_process option
  199. // Use to add your own custom processor when using backends as a package, or after importing an external
  200. // processor.
  201. func (s *service) AddProcessor(name string, p ProcessorConstructor) {
  202. // wrap in a constructor since we want to defer calling it
  203. var c ProcessorConstructor
  204. c = func() Decorator {
  205. return p()
  206. }
  207. // add to our processors list
  208. processors[strings.ToLower(name)] = c
  209. }
  210. // extractConfig loads the backend config. It has already been unmarshalled
  211. // configData contains data from the main config file's "backend_config" value
  212. // configType is a Processor's specific config value.
  213. // The reason why using reflection is because we'll get a nice error message if the field is missing
  214. // the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
  215. // error messages
  216. func (s *service) ExtractConfig(configData BackendConfig, configType BaseConfig) (interface{}, error) {
  217. // Use reflection so that we can provide a nice error message
  218. v := reflect.ValueOf(configType).Elem() // so that we can set the values
  219. //m := reflect.ValueOf(configType).Elem()
  220. t := reflect.TypeOf(configType).Elem()
  221. typeOfT := v.Type()
  222. for i := 0; i < v.NumField(); i++ {
  223. f := v.Field(i)
  224. // read the tags of the config struct
  225. fieldName := t.Field(i).Tag.Get("json")
  226. omitempty := false
  227. if len(fieldName) > 0 {
  228. // parse the tag to
  229. // get the field name from struct tag
  230. split := strings.Split(fieldName, ",")
  231. fieldName = split[0]
  232. if len(split) > 1 {
  233. if split[1] == "omitempty" {
  234. omitempty = true
  235. }
  236. }
  237. } else {
  238. // could have no tag
  239. // so use the reflected field name
  240. fieldName = typeOfT.Field(i).Name
  241. }
  242. if f.Type().Name() == "int" {
  243. // in json, there is no int, only floats...
  244. if intVal, converted := configData[fieldName].(float64); converted {
  245. v.Field(i).SetInt(int64(intVal))
  246. } else if intVal, converted := configData[fieldName].(int); converted {
  247. v.Field(i).SetInt(int64(intVal))
  248. } else if !omitempty {
  249. return configType, convertError("property missing/invalid: '" + fieldName + "' of expected type: " + f.Type().Name())
  250. }
  251. }
  252. if f.Type().Name() == "string" {
  253. if stringVal, converted := configData[fieldName].(string); converted {
  254. v.Field(i).SetString(stringVal)
  255. } else if !omitempty {
  256. return configType, convertError("missing/invalid: '" + fieldName + "' of type: " + f.Type().Name())
  257. }
  258. }
  259. if f.Type().Name() == "bool" {
  260. if boolVal, converted := configData[fieldName].(bool); converted {
  261. v.Field(i).SetBool(boolVal)
  262. } else if !omitempty {
  263. return configType, convertError("missing/invalid: '" + fieldName + "' of type: " + f.Type().Name())
  264. }
  265. }
  266. }
  267. return configType, nil
  268. }