123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486 |
- package backends
- import (
- "encoding/json"
- "errors"
- "fmt"
- "os"
- "reflect"
- "strings"
- "time"
- )
- type ConfigGroup map[string]interface{}
- type BackendConfig map[ConfigSection]map[string]ConfigGroup
- const (
- validateRcptTimeout = time.Second * 5
- defaultProcessor = "Debugger"
- // streamBufferSize sets the size of the buffer for the streaming processors,
- // can be configured using `stream_buffer_size`
- configStreamBufferSize = 4096
- configSaveWorkersCount = 1
- configValidateWorkersCount = 1
- configStreamWorkersCount = 1
- configBackgroundWorkersCount = 1
- configSaveProcessSize = 64
- configValidateProcessSize = 64
- // configTimeoutSave: default timeout for saving email, if 'save_timeout' not present in config
- configTimeoutSave = time.Second * 30
- // configTimeoutValidateRcpt default timeout for validating rcpt to, if 'val_rcpt_timeout' not present in config
- configTimeoutValidateRcpt = time.Second * 5
- configTimeoutStream = time.Second * 30
- configSaveStreamSize = 64
- configPostProcessSize = 64
- )
- func (c *BackendConfig) SetValue(section ConfigSection, name string, key string, value interface{}) {
- if *c == nil {
- *c = make(BackendConfig, 0)
- }
- if (*c)[section] == nil {
- (*c)[section] = make(map[string]ConfigGroup)
- }
- if (*c)[section][name] == nil {
- (*c)[section][name] = make(ConfigGroup)
- }
- (*c)[section][name][key] = value
- }
- func (c *BackendConfig) GetValue(section ConfigSection, name string, key string) interface{} {
- if (*c)[section] == nil {
- return nil
- }
- if (*c)[section][name] == nil {
- return nil
- }
- if v, ok := (*c)[section][name][key]; ok {
- return &v
- }
- return nil
- }
- // toLower normalizes the backendconfig lowercases the config's keys
- func (c BackendConfig) toLower() {
- for section, v := range c {
- for k2, v2 := range v {
- if k2_lower := strings.ToLower(k2); k2 != k2_lower {
- c[section][k2_lower] = v2
- delete(c[section], k2) // delete the non-lowercased key
- }
- }
- }
- }
- func (c BackendConfig) lookupGroup(section ConfigSection, name string) ConfigGroup {
- if v, ok := c[section][name]; ok {
- return v
- }
- return nil
- }
- // ConfigureDefaults sets default values for the backend config,
- // if no backend config was added before starting, then use a default config
- // otherwise, see what required values were missed in the config and add any missing with defaults
- func (c *BackendConfig) ConfigureDefaults() error {
- // set the defaults if no value has been configured
- // (always use lowercase)
- if c.GetValue(ConfigGateways, "default", "save_workers_size") == nil {
- c.SetValue(ConfigGateways, "default", "save_workers_size", 1)
- }
- if c.GetValue(ConfigGateways, "default", "save_process") == nil {
- c.SetValue(ConfigGateways, "default", "save_process", "HeadersParser|Header|Debugger")
- }
- if c.GetValue(ConfigProcessors, "header", "primary_mail_host") == nil {
- h, err := os.Hostname()
- if err != nil {
- return err
- }
- c.SetValue(ConfigProcessors, "header", "primary_mail_host", h)
- }
- if c.GetValue(ConfigProcessors, "debugger", "log_received_mails") == nil {
- c.SetValue(ConfigProcessors, "debugger", "log_received_mails", true)
- }
- return nil
- }
- // UnmarshalJSON custom handling of the ConfigSection keys (they're enumerated)
- func (c *BackendConfig) UnmarshalJSON(b []byte) error {
- temp := make(map[string]map[string]ConfigGroup)
- err := json.Unmarshal(b, &temp)
- if err != nil {
- return err
- }
- if *c == nil {
- *c = make(BackendConfig)
- }
- for key, val := range temp {
- // map the key to a ConfigSection type
- var section ConfigSection
- if err := json.Unmarshal([]byte("\""+key+"\""), §ion); err != nil {
- return err
- }
- if (*c)[section] == nil {
- (*c)[section] = make(map[string]ConfigGroup)
- }
- (*c)[section] = val
- }
- return nil
- }
- // MarshalJSON custom handling of ConfigSection keys (since JSON keys need to be strings)
- func (c *BackendConfig) MarshalJSON() ([]byte, error) {
- temp := make(map[string]map[string]ConfigGroup)
- for key, val := range *c {
- // convert they key to a string
- temp[key.String()] = val
- }
- return json.Marshal(temp)
- }
- type ConfigSection int
- const (
- ConfigProcessors ConfigSection = iota
- ConfigStreamProcessors
- ConfigGateways
- )
- func (o ConfigSection) String() string {
- switch o {
- case ConfigProcessors:
- return "processors"
- case ConfigStreamProcessors:
- return "stream_processors"
- case ConfigGateways:
- return "gateways"
- }
- return "unknown"
- }
- func (o *ConfigSection) UnmarshalJSON(b []byte) error {
- str := strings.Trim(string(b), `"`)
- str = strings.ToLower(str)
- switch {
- case str == "processors":
- *o = ConfigProcessors
- case str == "stream_processors":
- *o = ConfigStreamProcessors
- case str == "gateways":
- *o = ConfigGateways
- default:
- return errors.New("incorrect config section [" + str + "], may be processors, stream_processors or gateways")
- }
- return nil
- }
- func (o *ConfigSection) MarshalJSON() ([]byte, error) {
- ret := o.String()
- if ret == "unknown" {
- return []byte{}, errors.New("unknown config section")
- }
- return []byte(ret), nil
- }
- // All config structs extend from this
- type BaseConfig interface{}
- type stackConfigExpression struct {
- alias string
- name string
- }
- func (e stackConfigExpression) String() string {
- if e.alias == e.name || e.alias == "" {
- return e.name
- }
- return fmt.Sprintf("%s:%s", e.alias, e.name)
- }
- type notFoundError func(s string) error
- type stackConfig struct {
- list []stackConfigExpression
- notFound notFoundError
- }
- type aliasMap map[string]string
- // newAliasMap scans through the configured processors to produce a mapping
- // alias -> processor name. This mapping is used to determine what configuration to use
- // when making a new processor
- func newAliasMap(cfg map[string]ConfigGroup) aliasMap {
- am := make(aliasMap, 0)
- for k, _ := range cfg {
- var alias, name string
- // format: <alias> : <processorName>
- if i := strings.Index(k, ":"); i > 0 && len(k) > i+2 {
- alias = k[0:i]
- name = k[i+1:]
- } else {
- alias = k
- name = k
- }
- am[strings.ToLower(alias)] = strings.ToLower(name)
- }
- return am
- }
- func NewStackConfig(config string, am aliasMap) (ret *stackConfig) {
- ret = new(stackConfig)
- cfg := strings.ToLower(strings.TrimSpace(config))
- if cfg == "" {
- return
- }
- items := strings.Split(cfg, "|")
- ret.list = make([]stackConfigExpression, len(items))
- pos := 0
- for i := range items {
- pos = len(items) - 1 - i // reverse order, since decorators are stacked
- ret.list[i] = stackConfigExpression{alias: items[pos], name: items[pos]}
- if processor, ok := am[items[pos]]; ok {
- ret.list[i].name = processor
- }
- }
- return ret
- }
- func newStackProcessorConfig(config string, am aliasMap) (ret *stackConfig) {
- ret = NewStackConfig(config, am)
- ret.notFound = func(s string) error {
- return errors.New(fmt.Sprintf("processor [%s] not found", s))
- }
- return ret
- }
- func newStackStreamProcessorConfig(config string, am aliasMap) (ret *stackConfig) {
- ret = NewStackConfig(config, am)
- ret.notFound = func(s string) error {
- return errors.New(fmt.Sprintf("stream processor [%s] not found", s))
- }
- return ret
- }
- // Changes returns a list of gateways whose config changed
- func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed map[string]bool) {
- // check the processors if changed
- changed = make(map[string]bool, 0)
- added = make(map[string]bool, 0)
- removed = make(map[string]bool, 0)
- cp := ConfigProcessors
- csp := ConfigStreamProcessors
- cg := ConfigGateways
- changedProcessors := changedConfigGroups(
- oldConfig[cp], c[cp])
- changedStreamProcessors := changedConfigGroups(
- oldConfig[csp], c[csp])
- configType := BaseConfig(&GatewayConfig{})
- aliasMapStream := newAliasMap(c[csp])
- aliasMapProcessor := newAliasMap(c[cp])
- // oldList keeps a track of gateways that have been compared for changes.
- // We remove the from the list when a gateway was processed
- // remaining items are assumed to be removed from the new config
- oldList := map[string]bool{}
- for g := range oldConfig[cg] {
- oldList[g] = true
- }
- // go through all the gateway configs,
- // make a list of all the ones that have processors whose config had changed
- for key, _ := range c[cg] {
- // check the processors in the SaveProcess and SaveStream settings to see if
- // they changed. If changed, then make a record of which gateways use the processors
- e, _ := Svc.ExtractConfig(ConfigGateways, key, c, configType)
- bcfg := e.(*GatewayConfig)
- config := NewStackConfig(bcfg.SaveProcess, aliasMapProcessor)
- for _, v := range config.list {
- if _, ok := changedProcessors[v.name]; ok {
- changed[key] = true
- }
- }
- config = NewStackConfig(bcfg.SaveStream, aliasMapStream)
- for _, v := range config.list {
- if _, ok := changedStreamProcessors[v.name]; ok {
- changed[key] = true
- }
- }
- if o, ok := oldConfig[cg][key]; ok {
- delete(oldList, key)
- if !reflect.DeepEqual(c[cg][key], o) {
- // whats changed
- changed[key] = true
- }
- } else {
- // whats been added
- added[key] = true
- }
- }
- // whats been removed
- for p := range oldList {
- removed[p] = true
- }
- return
- }
- func changedConfigGroups(old map[string]ConfigGroup, new map[string]ConfigGroup) map[string]bool {
- diff, added, removed := compareConfigGroup(old, new)
- var all []string
- all = append(all, diff...)
- all = append(all, removed...)
- all = append(all, added...)
- changed := make(map[string]bool, 0)
- for p := range all {
- changed[strings.ToLower(all[p])] = true
- }
- return changed
- }
- // compareConfigGroup compares two config groups
- // returns a list of keys that changed, been added or removed to new
- func compareConfigGroup(old map[string]ConfigGroup, new map[string]ConfigGroup) (diff, added, removed []string) {
- diff = make([]string, 0)
- added = make([]string, 0)
- removed = make([]string, 0)
- for p := range new {
- if o, ok := old[p]; ok {
- delete(old, p)
- if !reflect.DeepEqual(new[p], o) {
- // whats changed
- diff = append(diff, p)
- }
- } else {
- // whats been added
- added = append(added, p)
- }
- }
- // whats been removed
- for p := range old {
- removed = append(removed, p)
- }
- return
- }
- type GatewayConfig struct {
- // SaveWorkersCount controls how many concurrent workers to start. Defaults to 1
- SaveWorkersCount int `json:"save_workers_size,omitempty"`
- // ValidateWorkersCount controls how many concurrent recipient validation workers to start. Defaults to 1
- ValidateWorkersCount int `json:"validate_workers_size,omitempty"`
- // StreamWorkersCount controls how many concurrent stream workers to start. Defaults to 1
- StreamWorkersCount int `json:"stream_workers_size,omitempty"`
- // BackgroundWorkersCount controls how many concurrent background stream workers to start. Defaults to 1
- BackgroundWorkersCount int `json:"background_workers_size,omitempty"`
- // SaveProcess controls which processors to chain in a stack for saving email tasks
- SaveProcess string `json:"save_process,omitempty"`
- // SaveProcessSize limits the amount of messages waiting in the queue to get processed by SaveProcess
- SaveProcessSize int `json:"save_process_size,omitempty"`
- // ValidateProcess is like ProcessorStack, but for recipient validation tasks
- ValidateProcess string `json:"validate_process,omitempty"`
- // ValidateProcessSize limits the amount of messages waiting in the queue to get processed by ValidateProcess
- ValidateProcessSize int `json:"validate_process_size,omitempty"`
- // TimeoutSave is duration before timeout when saving an email, eg "29s"
- TimeoutSave string `json:"save_timeout,omitempty"`
- // TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
- TimeoutValidateRcpt string `json:"val_rcpt_timeout,omitempty"`
- // TimeoutStream duration before timeout when processing a stream eg "1s"
- TimeoutStream string `json:"stream_timeout,omitempty"`
- // StreamBufferLen controls the size of the output buffer, in bytes. Default is 4096
- StreamBufferSize int `json:"stream_buffer_size,omitempty"`
- // SaveStream is same as a SaveProcess, but uses the StreamProcessor stack instead
- SaveStream string `json:"save_stream,omitempty"`
- // SaveStreamSize limits the amount of messages waiting in the queue to get processed by SaveStream
- SaveStreamSize int `json:"save_stream_size,omitempty"`
- // PostProcessSize controls the length of thq queue for background processing
- PostProcessSize int `json:"post_process_size,omitempty"`
- // PostProcessProducer specifies which StreamProcessor to use for reading data to the post process
- PostProcessProducer string `json:"post_process_producer,omitempty"`
- // PostProcessConsumer is same as SaveStream, but controls
- PostProcessConsumer string `json:"post_process_consumer,omitempty"`
- }
- // saveWorkersCount gets the number of workers to use for saving email by reading the save_workers_size config value
- // Returns 1 if no config value was set
- func (c *GatewayConfig) saveWorkersCount() int {
- if c.SaveWorkersCount <= 0 {
- return configSaveWorkersCount
- }
- return c.SaveWorkersCount
- }
- func (c *GatewayConfig) validateWorkersCount() int {
- if c.ValidateWorkersCount <= 0 {
- return configValidateWorkersCount
- }
- return c.ValidateWorkersCount
- }
- func (c *GatewayConfig) streamWorkersCount() int {
- if c.StreamWorkersCount <= 0 {
- return configStreamWorkersCount
- }
- return c.StreamWorkersCount
- }
- func (c *GatewayConfig) backgroundWorkersCount() int {
- if c.BackgroundWorkersCount <= 0 {
- return configBackgroundWorkersCount
- }
- return c.BackgroundWorkersCount
- }
- func (c *GatewayConfig) saveProcessSize() int {
- if c.SaveProcessSize <= 0 {
- return configSaveProcessSize
- }
- return c.SaveProcessSize
- }
- func (c *GatewayConfig) validateProcessSize() int {
- if c.ValidateProcessSize <= 0 {
- return configValidateProcessSize
- }
- return c.ValidateProcessSize
- }
- func (c *GatewayConfig) saveStreamSize() int {
- if c.SaveStreamSize <= 0 {
- return configSaveStreamSize
- }
- return c.SaveStreamSize
- }
- func (c *GatewayConfig) postProcessSize() int {
- if c.PostProcessSize <= 0 {
- return configPostProcessSize
- }
- return c.PostProcessSize
- }
- // saveTimeout returns the maximum amount of seconds to wait before timing out a save processing task
- func (gw *BackendGateway) saveTimeout() time.Duration {
- if gw.gwConfig.TimeoutSave == "" {
- return configTimeoutSave
- }
- t, err := time.ParseDuration(gw.gwConfig.TimeoutSave)
- if err != nil {
- return configTimeoutSave
- }
- return t
- }
- // validateRcptTimeout returns the maximum amount of seconds to wait before timing out a recipient validation task
- func (gw *BackendGateway) validateRcptTimeout() time.Duration {
- if gw.gwConfig.TimeoutValidateRcpt == "" {
- return configTimeoutValidateRcpt
- }
- t, err := time.ParseDuration(gw.gwConfig.TimeoutValidateRcpt)
- if err != nil {
- return configTimeoutValidateRcpt
- }
- return t
- }
|