|
@@ -12,19 +12,23 @@ import (
|
|
)
|
|
)
|
|
|
|
|
|
var (
|
|
var (
|
|
- Service *BackendService
|
|
|
|
|
|
+ Svc *Service
|
|
|
|
+
|
|
// deprecated backends system
|
|
// deprecated backends system
|
|
backends = map[string]Backend{}
|
|
backends = map[string]Backend{}
|
|
- // new backends system
|
|
|
|
- Processors map[string]ProcessorConstructor
|
|
|
|
|
|
+ // Store the constructor for making an new processor decorator.
|
|
|
|
+ processors map[string]processorConstructor
|
|
|
|
+
|
|
|
|
+ b Backend // todo make sure legacy works
|
|
)
|
|
)
|
|
|
|
|
|
func init() {
|
|
func init() {
|
|
- Service = &BackendService{}
|
|
|
|
- Processors = make(map[string]ProcessorConstructor)
|
|
|
|
|
|
+ Svc = &Service{}
|
|
|
|
+
|
|
|
|
+ processors = make(map[string]processorConstructor)
|
|
}
|
|
}
|
|
|
|
|
|
-type ProcessorConstructor func() Decorator
|
|
|
|
|
|
+type processorConstructor func() Decorator
|
|
|
|
|
|
// Backends process received mail. Depending on the implementation, they can store mail in the database,
|
|
// Backends process received mail. Depending on the implementation, they can store mail in the database,
|
|
// write to a file, check for spam, re-transmit to another server, etc.
|
|
// write to a file, check for spam, re-transmit to another server, etc.
|
|
@@ -32,7 +36,8 @@ type ProcessorConstructor func() Decorator
|
|
// whether the message was processed successfully.
|
|
// whether the message was processed successfully.
|
|
type Backend interface {
|
|
type Backend interface {
|
|
// Public methods
|
|
// Public methods
|
|
- Process(*envelope.Envelope) BackendResult
|
|
|
|
|
|
+ Process(*envelope.Envelope) Result
|
|
|
|
+ ValidateRcpt(e *envelope.Envelope) RcptError
|
|
Initialize(BackendConfig) error
|
|
Initialize(BackendConfig) error
|
|
Shutdown() error
|
|
Shutdown() error
|
|
}
|
|
}
|
|
@@ -42,30 +47,30 @@ type BackendConfig map[string]interface{}
|
|
// All config structs extend from this
|
|
// All config structs extend from this
|
|
type BaseConfig interface{}
|
|
type BaseConfig interface{}
|
|
|
|
|
|
-type saveStatus struct {
|
|
|
|
|
|
+type notifyMsg struct {
|
|
err error
|
|
err error
|
|
queuedID string
|
|
queuedID string
|
|
}
|
|
}
|
|
|
|
|
|
-// BackendResult represents a response to an SMTP client after receiving DATA.
|
|
|
|
|
|
+// Result represents a response to an SMTP client after receiving DATA.
|
|
// The String method should return an SMTP message ready to send back to the
|
|
// The String method should return an SMTP message ready to send back to the
|
|
// client, for example `250 OK: Message received`.
|
|
// client, for example `250 OK: Message received`.
|
|
-type BackendResult interface {
|
|
|
|
|
|
+type Result interface {
|
|
fmt.Stringer
|
|
fmt.Stringer
|
|
// Code should return the SMTP code associated with this response, ie. `250`
|
|
// Code should return the SMTP code associated with this response, ie. `250`
|
|
Code() int
|
|
Code() int
|
|
}
|
|
}
|
|
|
|
|
|
// Internal implementation of BackendResult for use by backend implementations.
|
|
// Internal implementation of BackendResult for use by backend implementations.
|
|
-type backendResult string
|
|
|
|
|
|
+type result string
|
|
|
|
|
|
-func (br backendResult) String() string {
|
|
|
|
|
|
+func (br result) String() string {
|
|
return string(br)
|
|
return string(br)
|
|
}
|
|
}
|
|
|
|
|
|
// Parses the SMTP code from the first 3 characters of the SMTP message.
|
|
// Parses the SMTP code from the first 3 characters of the SMTP message.
|
|
// Returns 554 if code cannot be parsed.
|
|
// Returns 554 if code cannot be parsed.
|
|
-func (br backendResult) Code() int {
|
|
|
|
|
|
+func (br result) Code() int {
|
|
trimmed := strings.TrimSpace(string(br))
|
|
trimmed := strings.TrimSpace(string(br))
|
|
if len(trimmed) < 3 {
|
|
if len(trimmed) < 3 {
|
|
return 554
|
|
return 554
|
|
@@ -77,8 +82,8 @@ func (br backendResult) Code() int {
|
|
return code
|
|
return code
|
|
}
|
|
}
|
|
|
|
|
|
-func NewBackendResult(message string) BackendResult {
|
|
|
|
- return backendResult(message)
|
|
|
|
|
|
+func NewResult(message string) Result {
|
|
|
|
+ return result(message)
|
|
}
|
|
}
|
|
|
|
|
|
type ProcessorInitializer interface {
|
|
type ProcessorInitializer interface {
|
|
@@ -120,76 +125,110 @@ func (e Errors) Error() string {
|
|
return msg
|
|
return msg
|
|
}
|
|
}
|
|
|
|
|
|
-type BackendService struct {
|
|
|
|
- Initializers []ProcessorInitializer
|
|
|
|
- Shutdowners []ProcessorShutdowner
|
|
|
|
|
|
+// New retrieve a backend specified by the backendName, and initialize it using
|
|
|
|
+// backendConfig
|
|
|
|
+func New(backendName string, backendConfig BackendConfig, l log.Logger) (Backend, error) {
|
|
|
|
+ Svc.StoreMainlog(l)
|
|
|
|
+ if backend, found := backends[backendName]; found {
|
|
|
|
+ b = backend
|
|
|
|
+ } else {
|
|
|
|
+ gateway := &BackendGateway{config: backendConfig}
|
|
|
|
+ err := gateway.Initialize(backendConfig)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, fmt.Errorf("error while initializing the backend: %s", err)
|
|
|
|
+ }
|
|
|
|
+ gateway.State = BackendStateRunning
|
|
|
|
+ b = Backend(gateway)
|
|
|
|
+ }
|
|
|
|
+ return b, nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func GetBackend() Backend {
|
|
|
|
+ return b
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type Service struct {
|
|
|
|
+ initializers []ProcessorInitializer
|
|
|
|
+ shutdowners []ProcessorShutdowner
|
|
sync.Mutex
|
|
sync.Mutex
|
|
- mainlog atomic.Value
|
|
|
|
- initErrors Errors
|
|
|
|
|
|
+ mainlog atomic.Value
|
|
}
|
|
}
|
|
|
|
|
|
// Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
|
|
// Get loads the log.logger in an atomic operation. Returns a stderr logger if not able to load
|
|
func Log() log.Logger {
|
|
func Log() log.Logger {
|
|
- if v, ok := Service.mainlog.Load().(log.Logger); ok {
|
|
|
|
|
|
+ if v, ok := Svc.mainlog.Load().(log.Logger); ok {
|
|
return v
|
|
return v
|
|
}
|
|
}
|
|
l, _ := log.GetLogger(log.OutputStderr.String())
|
|
l, _ := log.GetLogger(log.OutputStderr.String())
|
|
return l
|
|
return l
|
|
}
|
|
}
|
|
|
|
|
|
-func (b *BackendService) StoreMainlog(l log.Logger) {
|
|
|
|
- b.mainlog.Store(l)
|
|
|
|
|
|
+func (s *Service) StoreMainlog(l log.Logger) {
|
|
|
|
+ s.mainlog.Store(l)
|
|
}
|
|
}
|
|
|
|
|
|
-// AddInitializer adds a function that impliments ProcessorShutdowner to be called when initializing
|
|
|
|
-func (b *BackendService) AddInitializer(i ProcessorInitializer) {
|
|
|
|
- b.Lock()
|
|
|
|
- defer b.Unlock()
|
|
|
|
- b.Initializers = append(b.Initializers, i)
|
|
|
|
|
|
+// AddInitializer adds a function that implements ProcessorShutdowner to be called when initializing
|
|
|
|
+func (s *Service) AddInitializer(i ProcessorInitializer) {
|
|
|
|
+ s.Lock()
|
|
|
|
+ defer s.Unlock()
|
|
|
|
+ s.initializers = append(s.initializers, i)
|
|
}
|
|
}
|
|
|
|
|
|
-// AddShutdowner adds a function that impliments ProcessorShutdowner to be called when shutting down
|
|
|
|
-func (b *BackendService) AddShutdowner(i ProcessorShutdowner) {
|
|
|
|
- b.Lock()
|
|
|
|
- defer b.Unlock()
|
|
|
|
- b.Shutdowners = append(b.Shutdowners, i)
|
|
|
|
|
|
+// AddShutdowner adds a function that implements ProcessorShutdowner to be called when shutting down
|
|
|
|
+func (s *Service) AddShutdowner(sh ProcessorShutdowner) {
|
|
|
|
+ s.Lock()
|
|
|
|
+ defer s.Unlock()
|
|
|
|
+ s.shutdowners = append(s.shutdowners, sh)
|
|
}
|
|
}
|
|
|
|
|
|
// Initialize initializes all the processors one-by-one and returns any errors.
|
|
// Initialize initializes all the processors one-by-one and returns any errors.
|
|
-func (b *BackendService) Initialize(backend BackendConfig) Errors {
|
|
|
|
- b.Lock()
|
|
|
|
- defer b.Unlock()
|
|
|
|
- b.initErrors = nil
|
|
|
|
- for i := range b.Initializers {
|
|
|
|
- err := b.Initializers[i].Initialize(backend)
|
|
|
|
- if err != nil {
|
|
|
|
- b.initErrors = append(b.initErrors, err)
|
|
|
|
|
|
+// Subsequent calls to Initialize will not call the initializer again unless it failed on the previous call
|
|
|
|
+// so Initialize may be called again to retry after getting errors
|
|
|
|
+func (s *Service) initialize(backend BackendConfig) Errors {
|
|
|
|
+ s.Lock()
|
|
|
|
+ defer s.Unlock()
|
|
|
|
+ var errors Errors
|
|
|
|
+ failed := make([]ProcessorInitializer, 0)
|
|
|
|
+ for i := range s.initializers {
|
|
|
|
+ if err := s.initializers[i].Initialize(backend); err != nil {
|
|
|
|
+ errors = append(errors, err)
|
|
|
|
+ failed = append(failed, s.initializers[i])
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- return b.initErrors
|
|
|
|
|
|
+ // keep only the failed initializers
|
|
|
|
+ s.initializers = failed
|
|
|
|
+ return errors
|
|
}
|
|
}
|
|
|
|
|
|
-// Shutdown shuts down all the processor by calling their shutdowners
|
|
|
|
-// It also clears the initializers and shutdowners that were set with AddInitializer and AddShutdowner
|
|
|
|
-func (b *BackendService) Shutdown() {
|
|
|
|
- b.Lock()
|
|
|
|
- defer b.Unlock()
|
|
|
|
- for i := range b.Shutdowners {
|
|
|
|
- b.Shutdowners[i].Shutdown()
|
|
|
|
|
|
+// Shutdown shuts down all the processors by calling their shutdowners (if any)
|
|
|
|
+// Subsequent calls to Shutdown will not call the shutdowners again unless it failed on the previous call
|
|
|
|
+// so Shutdown may be called again to retry after getting errors
|
|
|
|
+func (s *Service) shutdown() Errors {
|
|
|
|
+ s.Lock()
|
|
|
|
+ defer s.Unlock()
|
|
|
|
+ var errors Errors
|
|
|
|
+ failed := make([]ProcessorShutdowner, 0)
|
|
|
|
+ for i := range s.shutdowners {
|
|
|
|
+ if err := s.shutdowners[i].Shutdown(); err != nil {
|
|
|
|
+ errors = append(errors, err)
|
|
|
|
+ failed = append(failed, s.shutdowners[i])
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- b.Initializers = make([]ProcessorInitializer, 0)
|
|
|
|
- b.Shutdowners = make([]ProcessorShutdowner, 0)
|
|
|
|
|
|
+ s.shutdowners = failed
|
|
|
|
+ return errors
|
|
}
|
|
}
|
|
|
|
|
|
// AddProcessor adds a new processor, which becomes available to the backend_config.process_stack option
|
|
// AddProcessor adds a new processor, which becomes available to the backend_config.process_stack option
|
|
-func (b *BackendService) AddProcessor(name string, p ProcessorConstructor) {
|
|
|
|
|
|
+// Use to add your own custom processor when using backends as a package, or after importing an external
|
|
|
|
+// processor.
|
|
|
|
+func (s *Service) AddProcessor(name string, p processorConstructor) {
|
|
// wrap in a constructor since we want to defer calling it
|
|
// wrap in a constructor since we want to defer calling it
|
|
- var c ProcessorConstructor
|
|
|
|
|
|
+ var c processorConstructor
|
|
c = func() Decorator {
|
|
c = func() Decorator {
|
|
return p()
|
|
return p()
|
|
}
|
|
}
|
|
// add to our processors list
|
|
// add to our processors list
|
|
- Processors[strings.ToLower(name)] = c
|
|
|
|
|
|
+ processors[strings.ToLower(name)] = c
|
|
}
|
|
}
|
|
|
|
|
|
// extractConfig loads the backend config. It has already been unmarshalled
|
|
// extractConfig loads the backend config. It has already been unmarshalled
|
|
@@ -198,15 +237,15 @@ func (b *BackendService) AddProcessor(name string, p ProcessorConstructor) {
|
|
// The reason why using reflection is because we'll get a nice error message if the field is missing
|
|
// The reason why using reflection is because we'll get a nice error message if the field is missing
|
|
// the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
|
|
// the alternative solution would be to json.Marshal() and json.Unmarshal() however that will not give us any
|
|
// error messages
|
|
// error messages
|
|
-func (b *BackendService) ExtractConfig(configData BackendConfig, configType BaseConfig) (interface{}, error) {
|
|
|
|
|
|
+func (s *Service) ExtractConfig(configData BackendConfig, configType BaseConfig) (interface{}, error) {
|
|
// Use reflection so that we can provide a nice error message
|
|
// Use reflection so that we can provide a nice error message
|
|
- s := reflect.ValueOf(configType).Elem() // so that we can set the values
|
|
|
|
- m := reflect.ValueOf(configType).Elem()
|
|
|
|
|
|
+ v := reflect.ValueOf(configType).Elem() // so that we can set the values
|
|
|
|
+ //m := reflect.ValueOf(configType).Elem()
|
|
t := reflect.TypeOf(configType).Elem()
|
|
t := reflect.TypeOf(configType).Elem()
|
|
- typeOfT := s.Type()
|
|
|
|
|
|
+ typeOfT := v.Type()
|
|
|
|
|
|
- for i := 0; i < m.NumField(); i++ {
|
|
|
|
- f := s.Field(i)
|
|
|
|
|
|
+ for i := 0; i < v.NumField(); i++ {
|
|
|
|
+ f := v.Field(i)
|
|
// read the tags of the config struct
|
|
// read the tags of the config struct
|
|
field_name := t.Field(i).Tag.Get("json")
|
|
field_name := t.Field(i).Tag.Get("json")
|
|
if len(field_name) > 0 {
|
|
if len(field_name) > 0 {
|
|
@@ -222,23 +261,23 @@ func (b *BackendService) ExtractConfig(configData BackendConfig, configType Base
|
|
if f.Type().Name() == "int" {
|
|
if f.Type().Name() == "int" {
|
|
// in json, there is no int, only floats...
|
|
// in json, there is no int, only floats...
|
|
if intVal, converted := configData[field_name].(float64); converted {
|
|
if intVal, converted := configData[field_name].(float64); converted {
|
|
- s.Field(i).SetInt(int64(intVal))
|
|
|
|
|
|
+ v.Field(i).SetInt(int64(intVal))
|
|
} else if intVal, converted := configData[field_name].(int); converted {
|
|
} else if intVal, converted := configData[field_name].(int); converted {
|
|
- s.Field(i).SetInt(int64(intVal))
|
|
|
|
|
|
+ v.Field(i).SetInt(int64(intVal))
|
|
} else {
|
|
} else {
|
|
return configType, convertError("property missing/invalid: '" + field_name + "' of expected type: " + f.Type().Name())
|
|
return configType, convertError("property missing/invalid: '" + field_name + "' of expected type: " + f.Type().Name())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if f.Type().Name() == "string" {
|
|
if f.Type().Name() == "string" {
|
|
if stringVal, converted := configData[field_name].(string); converted {
|
|
if stringVal, converted := configData[field_name].(string); converted {
|
|
- s.Field(i).SetString(stringVal)
|
|
|
|
|
|
+ v.Field(i).SetString(stringVal)
|
|
} else {
|
|
} else {
|
|
return configType, convertError("missing/invalid: '" + field_name + "' of type: " + f.Type().Name())
|
|
return configType, convertError("missing/invalid: '" + field_name + "' of type: " + f.Type().Name())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if f.Type().Name() == "bool" {
|
|
if f.Type().Name() == "bool" {
|
|
if boolVal, converted := configData[field_name].(bool); converted {
|
|
if boolVal, converted := configData[field_name].(bool); converted {
|
|
- s.Field(i).SetBool(boolVal)
|
|
|
|
|
|
+ v.Field(i).SetBool(boolVal)
|
|
} else {
|
|
} else {
|
|
return configType, convertError("missing/invalid: '" + field_name + "' of type: " + f.Type().Name())
|
|
return configType, convertError("missing/invalid: '" + field_name + "' of type: " + f.Type().Name())
|
|
}
|
|
}
|