backend.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. package backends
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/flashmob/go-guerrilla/log"
  6. "github.com/flashmob/go-guerrilla/mail"
  7. "io"
  8. "reflect"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "sync/atomic"
  13. )
  14. var (
  15. Svc *service
  16. // processors store the constructors for for composing a new processor using a decorator pattern.
  17. processors map[string]ProcessorConstructor
  18. // Streamers store the constructors for composing a new stream-based processor using a decorator pattern.
  19. Streamers map[string]StreamProcessorConstructor
  20. )
  21. func init() {
  22. Svc = &service{}
  23. processors = make(map[string]ProcessorConstructor)
  24. Streamers = make(map[string]StreamProcessorConstructor)
  25. }
  26. const DefaultGateway = "default"
  27. type ProcessorConstructor func() Decorator
  28. type StreamProcessorConstructor func() *StreamDecorator
  29. // Backends process received mail. Depending on the implementation, they can store mail in the database,
  30. // write to a file, check for spam, re-transmit to another server, etc.
  31. // Must return an SMTP message (i.e. "250 OK") and a boolean indicating
  32. // whether the message was processed successfully.
  33. type Backend interface {
  34. // Process processes then saves the mail envelope
  35. Process(*mail.Envelope) Result
  36. // ValidateRcpt validates the last recipient that was pushed to the mail envelope
  37. ValidateRcpt(e *mail.Envelope) RcptError
  38. // ProcessStream is the alternative for Process, a stream is read from io.Reader
  39. ProcessStream(r io.Reader, e *mail.Envelope) (Result, error)
  40. // StreamOn signals if ProcessStream can be used
  41. StreamOn() bool
  42. // Initializes the backend, eg. creates folders, sets-up database connections
  43. Initialize(BackendConfig) error
  44. // Initializes the backend after it was Shutdown()
  45. Reinitialize() error
  46. // Shutdown frees / closes anything created during initializations
  47. Shutdown() error
  48. // Start Starts a backend that has been initialized
  49. Start() error
  50. // returns the name of the backend
  51. Name() string
  52. }
  53. type notifyMsg struct {
  54. err error
  55. queuedID string
  56. result Result
  57. }
  58. // Result represents a response to an SMTP client after receiving DATA.
  59. // The String method should return an SMTP message ready to send back to the
  60. // client, for example `250 OK: Message received`.
  61. type Result interface {
  62. fmt.Stringer
  63. // Code should return the SMTP code associated with this response, ie. `250`
  64. Code() int
  65. }
  66. // Internal implementation of BackendResult for use by backend implementations.
  67. type result struct {
  68. // we're going to use a bytes.Buffer for building a string
  69. bytes.Buffer
  70. }
  71. func (r *result) String() string {
  72. return r.Buffer.String()
  73. }
  74. // Parses the SMTP code from the first 3 characters of the SMTP message.
  75. // Returns 554 if code cannot be parsed.
  76. func (r *result) Code() int {
  77. trimmed := strings.TrimSpace(r.String())
  78. if len(trimmed) < 3 {
  79. return 554
  80. }
  81. code, err := strconv.Atoi(trimmed[:3])
  82. if err != nil {
  83. return 554
  84. }
  85. return code
  86. }
  87. func NewResult(param ...interface{}) Result {
  88. r := new(result)
  89. for _, item := range param {
  90. switch v := item.(type) {
  91. case error:
  92. _, _ = r.WriteString(v.Error())
  93. case fmt.Stringer:
  94. _, _ = r.WriteString(v.String())
  95. case string:
  96. _, _ = r.WriteString(v)
  97. }
  98. }
  99. return r
  100. }
  101. type processorInitializer interface {
  102. Initialize(backendConfig BackendConfig) error
  103. }
  104. type processorShutdowner interface {
  105. Shutdown() error
  106. }
  107. type InitializeWith func(backendConfig BackendConfig) error
  108. type InitializeStreamWith func(conf ConfigGroup) error
  109. type ShutdownWith func() error
  110. // Satisfy ProcessorInitializer interface
  111. // So we can now pass an anonymous function that implements ProcessorInitializer
  112. func (i InitializeWith) Initialize(backendConfig BackendConfig) error {
  113. // delegate to the anonymous function
  114. return i(backendConfig)
  115. }
  116. func (i InitializeStreamWith) Initialize(conf ConfigGroup) error {
  117. // delegate to the anonymous function
  118. return i(conf)
  119. }
  120. // satisfy ProcessorShutdowner interface, same concept as InitializeWith type
  121. func (s ShutdownWith) Shutdown() error {
  122. // delegate
  123. return s()
  124. }
  125. type Errors []error
  126. // implement the Error interface
  127. func (e Errors) Error() string {
  128. if len(e) == 1 {
  129. return e[0].Error()
  130. }
  131. // multiple errors
  132. msg := ""
  133. for _, err := range e {
  134. msg += "\n" + err.Error()
  135. }
  136. return msg
  137. }
  138. func convertError(name string) error {
  139. return fmt.Errorf("failed to load backend config (%s)", name)
  140. }
  141. type service struct {
  142. initializers []processorInitializer
  143. shutdowners []processorShutdowner
  144. sync.Mutex
  145. mainlog atomic.Value
  146. }
  147. // Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
  148. func Log() log.Logger {
  149. if v, ok := Svc.mainlog.Load().(log.Logger); ok {
  150. return v
  151. }
  152. l, _ := log.GetLogger(log.OutputStderr.String(), log.InfoLevel.String())
  153. return l
  154. }
  155. func (s *service) SetMainlog(l log.Logger) {
  156. s.mainlog.Store(l)
  157. }
  158. // AddInitializer adds a function that implements processorInitializer to be called when initializing
  159. func (s *service) AddInitializer(i processorInitializer) {
  160. s.Lock()
  161. defer s.Unlock()
  162. s.initializers = append(s.initializers, i)
  163. }
  164. // AddShutdowner adds a function that implements ProcessorShutdowner to be called when shutting down
  165. func (s *service) AddShutdowner(sh processorShutdowner) {
  166. s.Lock()
  167. defer s.Unlock()
  168. s.shutdowners = append(s.shutdowners, sh)
  169. }
  170. // reset clears the initializers and Shutdowners
  171. func (s *service) reset() {
  172. s.shutdowners = make([]processorShutdowner, 0)
  173. s.initializers = make([]processorInitializer, 0)
  174. }
  175. // Initialize initializes all the processors one-by-one and returns any errors.
  176. // Subsequent calls to Initialize will not call the initializer again unless it failed on the previous call
  177. // so Initialize may be called again to retry after getting errors
  178. func (s *service) Initialize(backend BackendConfig) Errors {
  179. s.Lock()
  180. defer s.Unlock()
  181. var errors Errors = nil
  182. failed := make([]processorInitializer, 0)
  183. for i := range s.initializers {
  184. if err := s.initializers[i].Initialize(backend); err != nil {
  185. errors = append(errors, err)
  186. failed = append(failed, s.initializers[i])
  187. }
  188. }
  189. // keep only the failed initializers
  190. s.initializers = failed
  191. return errors
  192. }
  193. // Shutdown shuts down all the processors by calling their shutdowners (if any)
  194. // Subsequent calls to Shutdown will not call the shutdowners again unless it failed on the previous call
  195. // so Shutdown may be called again to retry after getting errors
  196. func (s *service) shutdown() Errors {
  197. s.Lock()
  198. defer s.Unlock()
  199. var errors Errors
  200. failed := make([]processorShutdowner, 0)
  201. for i := range s.shutdowners {
  202. if err := s.shutdowners[i].Shutdown(); err != nil {
  203. errors = append(errors, err)
  204. failed = append(failed, s.shutdowners[i])
  205. }
  206. }
  207. s.shutdowners = failed
  208. return errors
  209. }
  210. // AddProcessor adds a new processor, which becomes available to the backend_config.save_process option
  211. // and also the backend_config.validate_process option
  212. // Use to add your own custom processor when using backends as a package, or after importing an external
  213. // processor.
  214. func (s *service) AddProcessor(name string, p ProcessorConstructor) {
  215. // wrap in a constructor since we want to defer calling it
  216. var c ProcessorConstructor
  217. c = func() Decorator {
  218. return p()
  219. }
  220. processors[strings.ToLower(name)] = c
  221. }
  222. func (s *service) AddStreamProcessor(name string, p StreamProcessorConstructor) {
  223. // wrap in a constructor since we want to defer calling it
  224. var c StreamProcessorConstructor
  225. c = func() *StreamDecorator {
  226. return p()
  227. }
  228. Streamers[strings.ToLower(name)] = c
  229. }
  230. // extractConfig loads the backend config. It has already been unmarshalled
  231. // "group" refers
  232. // cfg contains data from the main config file's "backend_config" value
  233. // configType is a Processor's specific config value.
  234. // The reason why using reflection is because we'll get a nice error message if the field is missing
  235. // the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
  236. // error messages
  237. func (s *service) ExtractConfig(ns configNameSpace, group string, cfg BackendConfig, configType BaseConfig) (interface{}, error) {
  238. group = strings.ToLower(group)
  239. var configData ConfigGroup
  240. if v, ok := cfg[ns.String()][group]; ok {
  241. configData = v
  242. } else {
  243. return configType, nil
  244. }
  245. // Use reflection so that we can provide a nice error message
  246. v := reflect.ValueOf(configType).Elem() // so that we can set the values
  247. //m := reflect.ValueOf(configType).Elem()
  248. t := reflect.TypeOf(configType).Elem()
  249. typeOfT := v.Type()
  250. for i := 0; i < v.NumField(); i++ {
  251. f := v.Field(i)
  252. // read the tags of the config struct
  253. fieldName := t.Field(i).Tag.Get("json")
  254. omitempty := false
  255. if len(fieldName) > 0 {
  256. // parse the tag to
  257. // get the field name from struct tag
  258. split := strings.Split(fieldName, ",")
  259. fieldName = split[0]
  260. if len(split) > 1 {
  261. if split[1] == "omitempty" {
  262. omitempty = true
  263. }
  264. }
  265. } else {
  266. // could have no tag
  267. // so use the reflected field name
  268. fieldName = typeOfT.Field(i).Name
  269. }
  270. if f.Type().Name() == "int" {
  271. // in json, there is no int, only floats...
  272. if intVal, converted := configData[fieldName].(float64); converted {
  273. v.Field(i).SetInt(int64(intVal))
  274. } else if intVal, converted := configData[fieldName].(int); converted {
  275. v.Field(i).SetInt(int64(intVal))
  276. } else if !omitempty {
  277. return configType, convertError("property missing/invalid: '" + fieldName + "' of expected type: " + f.Type().Name())
  278. }
  279. }
  280. if f.Type().Name() == "string" {
  281. if stringVal, converted := configData[fieldName].(string); converted {
  282. v.Field(i).SetString(stringVal)
  283. } else if !omitempty {
  284. return configType, convertError("missing/invalid: '" + fieldName + "' of type: " + f.Type().Name())
  285. }
  286. }
  287. if f.Type().Name() == "bool" {
  288. if boolVal, converted := configData[fieldName].(bool); converted {
  289. v.Field(i).SetBool(boolVal)
  290. } else if !omitempty {
  291. return configType, convertError("missing/invalid: '" + fieldName + "' of type: " + f.Type().Name())
  292. }
  293. }
  294. }
  295. return configType, nil
  296. }