config.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. package backends
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "reflect"
  8. "strings"
  9. "time"
  10. )
  11. type ConfigGroup map[string]interface{}
  12. type BackendConfig map[ConfigSection]map[string]ConfigGroup
  13. const (
  14. validateRcptTimeout = time.Second * 5
  15. defaultProcessor = "Debugger"
  16. // streamBufferSize sets the size of the buffer for the streaming processors,
  17. // can be configured using `stream_buffer_size`
  18. configStreamBufferSize = 4096
  19. configSaveWorkersCount = 1
  20. configValidateWorkersCount = 1
  21. configStreamWorkersCount = 1
  22. configBackgroundWorkersCount = 1
  23. configSaveProcessSize = 64
  24. configValidateProcessSize = 64
  25. // configTimeoutSave: default timeout for saving email, if 'save_timeout' not present in config
  26. configTimeoutSave = time.Second * 30
  27. // configTimeoutValidateRcpt default timeout for validating rcpt to, if 'val_rcpt_timeout' not present in config
  28. configTimeoutValidateRcpt = time.Second * 5
  29. configTimeoutStream = time.Second * 30
  30. configSaveStreamSize = 64
  31. configPostProcessSize = 64
  32. )
  33. func (c *BackendConfig) SetValue(section ConfigSection, name string, key string, value interface{}) {
  34. if *c == nil {
  35. *c = make(BackendConfig, 0)
  36. }
  37. if (*c)[section] == nil {
  38. (*c)[section] = make(map[string]ConfigGroup)
  39. }
  40. if (*c)[section][name] == nil {
  41. (*c)[section][name] = make(ConfigGroup)
  42. }
  43. (*c)[section][name][key] = value
  44. }
  45. func (c *BackendConfig) GetValue(section ConfigSection, name string, key string) interface{} {
  46. if (*c)[section] == nil {
  47. return nil
  48. }
  49. if (*c)[section][name] == nil {
  50. return nil
  51. }
  52. if v, ok := (*c)[section][name][key]; ok {
  53. return &v
  54. }
  55. return nil
  56. }
  57. // toLower normalizes the backendconfig lowercases the config's keys
  58. func (c BackendConfig) toLower() {
  59. for section, v := range c {
  60. for k2, v2 := range v {
  61. if k2_lower := strings.ToLower(k2); k2 != k2_lower {
  62. c[section][k2_lower] = v2
  63. delete(c[section], k2) // delete the non-lowercased key
  64. }
  65. }
  66. }
  67. }
  68. func (c BackendConfig) lookupGroup(section ConfigSection, name string) ConfigGroup {
  69. if v, ok := c[section][name]; ok {
  70. return v
  71. }
  72. return nil
  73. }
  74. // ConfigureDefaults sets default values for the backend config,
  75. // if no backend config was added before starting, then use a default config
  76. // otherwise, see what required values were missed in the config and add any missing with defaults
  77. func (c *BackendConfig) ConfigureDefaults() error {
  78. // set the defaults if no value has been configured
  79. // (always use lowercase)
  80. if c.GetValue(ConfigGateways, "default", "save_workers_size") == nil {
  81. c.SetValue(ConfigGateways, "default", "save_workers_size", 1)
  82. }
  83. if c.GetValue(ConfigGateways, "default", "save_process") == nil {
  84. c.SetValue(ConfigGateways, "default", "save_process", "HeadersParser|Header|Debugger")
  85. }
  86. if c.GetValue(ConfigProcessors, "header", "primary_mail_host") == nil {
  87. h, err := os.Hostname()
  88. if err != nil {
  89. return err
  90. }
  91. c.SetValue(ConfigProcessors, "header", "primary_mail_host", h)
  92. }
  93. if c.GetValue(ConfigProcessors, "debugger", "log_received_mails") == nil {
  94. c.SetValue(ConfigProcessors, "debugger", "log_received_mails", true)
  95. }
  96. return nil
  97. }
  98. // UnmarshalJSON custom handling of the ConfigSection keys (they're enumerated)
  99. func (c *BackendConfig) UnmarshalJSON(b []byte) error {
  100. temp := make(map[string]map[string]ConfigGroup)
  101. err := json.Unmarshal(b, &temp)
  102. if err != nil {
  103. return err
  104. }
  105. if *c == nil {
  106. *c = make(BackendConfig)
  107. }
  108. for key, val := range temp {
  109. // map the key to a ConfigSection type
  110. var section ConfigSection
  111. if err := json.Unmarshal([]byte("\""+key+"\""), &section); err != nil {
  112. return err
  113. }
  114. if (*c)[section] == nil {
  115. (*c)[section] = make(map[string]ConfigGroup)
  116. }
  117. (*c)[section] = val
  118. }
  119. return nil
  120. }
  121. // MarshalJSON custom handling of ConfigSection keys (since JSON keys need to be strings)
  122. func (c *BackendConfig) MarshalJSON() ([]byte, error) {
  123. temp := make(map[string]map[string]ConfigGroup)
  124. for key, val := range *c {
  125. // convert they key to a string
  126. temp[key.String()] = val
  127. }
  128. return json.Marshal(temp)
  129. }
  130. type ConfigSection int
  131. const (
  132. ConfigProcessors ConfigSection = iota
  133. ConfigStreamProcessors
  134. ConfigGateways
  135. )
  136. func (o ConfigSection) String() string {
  137. switch o {
  138. case ConfigProcessors:
  139. return "processors"
  140. case ConfigStreamProcessors:
  141. return "stream_processors"
  142. case ConfigGateways:
  143. return "gateways"
  144. }
  145. return "unknown"
  146. }
  147. func (o *ConfigSection) UnmarshalJSON(b []byte) error {
  148. str := strings.Trim(string(b), `"`)
  149. str = strings.ToLower(str)
  150. switch {
  151. case str == "processors":
  152. *o = ConfigProcessors
  153. case str == "stream_processors":
  154. *o = ConfigStreamProcessors
  155. case str == "gateways":
  156. *o = ConfigGateways
  157. default:
  158. return errors.New("incorrect config section [" + str + "], may be processors, stream_processors or gateways")
  159. }
  160. return nil
  161. }
  162. func (o *ConfigSection) MarshalJSON() ([]byte, error) {
  163. ret := o.String()
  164. if ret == "unknown" {
  165. return []byte{}, errors.New("unknown config section")
  166. }
  167. return []byte(ret), nil
  168. }
  169. // All config structs extend from this
  170. type BaseConfig interface{}
  171. type stackConfigExpression struct {
  172. alias string
  173. name string
  174. }
  175. func (e stackConfigExpression) String() string {
  176. if e.alias == e.name || e.alias == "" {
  177. return e.name
  178. }
  179. return fmt.Sprintf("%s:%s", e.alias, e.name)
  180. }
  181. type notFoundError func(s string) error
  182. type stackConfig struct {
  183. list []stackConfigExpression
  184. notFound notFoundError
  185. }
  186. type aliasMap map[string]string
  187. // newAliasMap scans through the configured processors to produce a mapping
  188. // alias -> processor name. This mapping is used to determine what configuration to use
  189. // when making a new processor
  190. func newAliasMap(cfg map[string]ConfigGroup) aliasMap {
  191. am := make(aliasMap, 0)
  192. for k, _ := range cfg {
  193. var alias, name string
  194. // format: <alias> : <processorName>
  195. if i := strings.Index(k, ":"); i > 0 && len(k) > i+2 {
  196. alias = k[0:i]
  197. name = k[i+1:]
  198. } else {
  199. alias = k
  200. name = k
  201. }
  202. am[strings.ToLower(alias)] = strings.ToLower(name)
  203. }
  204. return am
  205. }
  206. func NewStackConfig(config string, am aliasMap) (ret *stackConfig) {
  207. ret = new(stackConfig)
  208. cfg := strings.ToLower(strings.TrimSpace(config))
  209. if cfg == "" {
  210. return
  211. }
  212. items := strings.Split(cfg, "|")
  213. ret.list = make([]stackConfigExpression, len(items))
  214. pos := 0
  215. for i := range items {
  216. pos = len(items) - 1 - i // reverse order, since decorators are stacked
  217. ret.list[i] = stackConfigExpression{alias: items[pos], name: items[pos]}
  218. if processor, ok := am[items[pos]]; ok {
  219. ret.list[i].name = processor
  220. }
  221. }
  222. return ret
  223. }
  224. func newStackProcessorConfig(config string, am aliasMap) (ret *stackConfig) {
  225. ret = NewStackConfig(config, am)
  226. ret.notFound = func(s string) error {
  227. return errors.New(fmt.Sprintf("processor [%s] not found", s))
  228. }
  229. return ret
  230. }
  231. func newStackStreamProcessorConfig(config string, am aliasMap) (ret *stackConfig) {
  232. ret = NewStackConfig(config, am)
  233. ret.notFound = func(s string) error {
  234. return errors.New(fmt.Sprintf("stream processor [%s] not found", s))
  235. }
  236. return ret
  237. }
  238. // Changes returns a list of gateways whose config changed
  239. func (c BackendConfig) Changes(oldConfig BackendConfig) (changed, added, removed map[string]bool) {
  240. // check the processors if changed
  241. changed = make(map[string]bool, 0)
  242. added = make(map[string]bool, 0)
  243. removed = make(map[string]bool, 0)
  244. cp := ConfigProcessors
  245. csp := ConfigStreamProcessors
  246. cg := ConfigGateways
  247. changedProcessors := changedConfigGroups(
  248. oldConfig[cp], c[cp])
  249. changedStreamProcessors := changedConfigGroups(
  250. oldConfig[csp], c[csp])
  251. configType := BaseConfig(&GatewayConfig{})
  252. aliasMapStream := newAliasMap(c[csp])
  253. aliasMapProcessor := newAliasMap(c[cp])
  254. // oldList keeps a track of gateways that have been compared for changes.
  255. // We remove the from the list when a gateway was processed
  256. // remaining items are assumed to be removed from the new config
  257. oldList := map[string]bool{}
  258. for g := range oldConfig[cg] {
  259. oldList[g] = true
  260. }
  261. // go through all the gateway configs,
  262. // make a list of all the ones that have processors whose config had changed
  263. for key, _ := range c[cg] {
  264. // check the processors in the SaveProcess and SaveStream settings to see if
  265. // they changed. If changed, then make a record of which gateways use the processors
  266. e, _ := Svc.ExtractConfig(ConfigGateways, key, c, configType)
  267. bcfg := e.(*GatewayConfig)
  268. config := NewStackConfig(bcfg.SaveProcess, aliasMapProcessor)
  269. for _, v := range config.list {
  270. if _, ok := changedProcessors[v.name]; ok {
  271. changed[key] = true
  272. }
  273. }
  274. config = NewStackConfig(bcfg.SaveStream, aliasMapStream)
  275. for _, v := range config.list {
  276. if _, ok := changedStreamProcessors[v.name]; ok {
  277. changed[key] = true
  278. }
  279. }
  280. if o, ok := oldConfig[cg][key]; ok {
  281. delete(oldList, key)
  282. if !reflect.DeepEqual(c[cg][key], o) {
  283. // whats changed
  284. changed[key] = true
  285. }
  286. } else {
  287. // whats been added
  288. added[key] = true
  289. }
  290. }
  291. // whats been removed
  292. for p := range oldList {
  293. removed[p] = true
  294. }
  295. return
  296. }
  297. func changedConfigGroups(old map[string]ConfigGroup, new map[string]ConfigGroup) map[string]bool {
  298. diff, added, removed := compareConfigGroup(old, new)
  299. var all []string
  300. all = append(all, diff...)
  301. all = append(all, removed...)
  302. all = append(all, added...)
  303. changed := make(map[string]bool, 0)
  304. for p := range all {
  305. changed[strings.ToLower(all[p])] = true
  306. }
  307. return changed
  308. }
  309. // compareConfigGroup compares two config groups
  310. // returns a list of keys that changed, been added or removed to new
  311. func compareConfigGroup(old map[string]ConfigGroup, new map[string]ConfigGroup) (diff, added, removed []string) {
  312. diff = make([]string, 0)
  313. added = make([]string, 0)
  314. removed = make([]string, 0)
  315. for p := range new {
  316. if o, ok := old[p]; ok {
  317. delete(old, p)
  318. if !reflect.DeepEqual(new[p], o) {
  319. // whats changed
  320. diff = append(diff, p)
  321. }
  322. } else {
  323. // whats been added
  324. added = append(added, p)
  325. }
  326. }
  327. // whats been removed
  328. for p := range old {
  329. removed = append(removed, p)
  330. }
  331. return
  332. }
  333. type GatewayConfig struct {
  334. // SaveWorkersCount controls how many concurrent workers to start. Defaults to 1
  335. SaveWorkersCount int `json:"save_workers_size,omitempty"`
  336. // ValidateWorkersCount controls how many concurrent recipient validation workers to start. Defaults to 1
  337. ValidateWorkersCount int `json:"validate_workers_size,omitempty"`
  338. // StreamWorkersCount controls how many concurrent stream workers to start. Defaults to 1
  339. StreamWorkersCount int `json:"stream_workers_size,omitempty"`
  340. // BackgroundWorkersCount controls how many concurrent background stream workers to start. Defaults to 1
  341. BackgroundWorkersCount int `json:"background_workers_size,omitempty"`
  342. // SaveProcess controls which processors to chain in a stack for saving email tasks
  343. SaveProcess string `json:"save_process,omitempty"`
  344. // SaveProcessSize limits the amount of messages waiting in the queue to get processed by SaveProcess
  345. SaveProcessSize int `json:"save_process_size,omitempty"`
  346. // ValidateProcess is like ProcessorStack, but for recipient validation tasks
  347. ValidateProcess string `json:"validate_process,omitempty"`
  348. // ValidateProcessSize limits the amount of messages waiting in the queue to get processed by ValidateProcess
  349. ValidateProcessSize int `json:"validate_process_size,omitempty"`
  350. // TimeoutSave is duration before timeout when saving an email, eg "29s"
  351. TimeoutSave string `json:"save_timeout,omitempty"`
  352. // TimeoutValidateRcpt duration before timeout when validating a recipient, eg "1s"
  353. TimeoutValidateRcpt string `json:"val_rcpt_timeout,omitempty"`
  354. // TimeoutStream duration before timeout when processing a stream eg "1s"
  355. TimeoutStream string `json:"stream_timeout,omitempty"`
  356. // StreamBufferLen controls the size of the output buffer, in bytes. Default is 4096
  357. StreamBufferSize int `json:"stream_buffer_size,omitempty"`
  358. // SaveStream is same as a SaveProcess, but uses the StreamProcessor stack instead
  359. SaveStream string `json:"save_stream,omitempty"`
  360. // SaveStreamSize limits the amount of messages waiting in the queue to get processed by SaveStream
  361. SaveStreamSize int `json:"save_stream_size,omitempty"`
  362. // PostProcessSize controls the length of thq queue for background processing
  363. PostProcessSize int `json:"post_process_size,omitempty"`
  364. // PostProcessProducer specifies which StreamProcessor to use for reading data to the post process
  365. PostProcessProducer string `json:"post_process_producer,omitempty"`
  366. // PostProcessConsumer is same as SaveStream, but controls
  367. PostProcessConsumer string `json:"post_process_consumer,omitempty"`
  368. }
  369. // saveWorkersCount gets the number of workers to use for saving email by reading the save_workers_size config value
  370. // Returns 1 if no config value was set
  371. func (c *GatewayConfig) saveWorkersCount() int {
  372. if c.SaveWorkersCount <= 0 {
  373. return configSaveWorkersCount
  374. }
  375. return c.SaveWorkersCount
  376. }
  377. func (c *GatewayConfig) validateWorkersCount() int {
  378. if c.ValidateWorkersCount <= 0 {
  379. return configValidateWorkersCount
  380. }
  381. return c.ValidateWorkersCount
  382. }
  383. func (c *GatewayConfig) streamWorkersCount() int {
  384. if c.StreamWorkersCount <= 0 {
  385. return configStreamWorkersCount
  386. }
  387. return c.StreamWorkersCount
  388. }
  389. func (c *GatewayConfig) backgroundWorkersCount() int {
  390. if c.BackgroundWorkersCount <= 0 {
  391. return configBackgroundWorkersCount
  392. }
  393. return c.BackgroundWorkersCount
  394. }
  395. func (c *GatewayConfig) saveProcessSize() int {
  396. if c.SaveProcessSize <= 0 {
  397. return configSaveProcessSize
  398. }
  399. return c.SaveProcessSize
  400. }
  401. func (c *GatewayConfig) validateProcessSize() int {
  402. if c.ValidateProcessSize <= 0 {
  403. return configValidateProcessSize
  404. }
  405. return c.ValidateProcessSize
  406. }
  407. func (c *GatewayConfig) saveStreamSize() int {
  408. if c.SaveStreamSize <= 0 {
  409. return configSaveStreamSize
  410. }
  411. return c.SaveStreamSize
  412. }
  413. func (c *GatewayConfig) postProcessSize() int {
  414. if c.PostProcessSize <= 0 {
  415. return configPostProcessSize
  416. }
  417. return c.PostProcessSize
  418. }
  419. // saveTimeout returns the maximum amount of seconds to wait before timing out a save processing task
  420. func (gw *BackendGateway) saveTimeout() time.Duration {
  421. if gw.gwConfig.TimeoutSave == "" {
  422. return configTimeoutSave
  423. }
  424. t, err := time.ParseDuration(gw.gwConfig.TimeoutSave)
  425. if err != nil {
  426. return configTimeoutSave
  427. }
  428. return t
  429. }
  430. // validateRcptTimeout returns the maximum amount of seconds to wait before timing out a recipient validation task
  431. func (gw *BackendGateway) validateRcptTimeout() time.Duration {
  432. if gw.gwConfig.TimeoutValidateRcpt == "" {
  433. return configTimeoutValidateRcpt
  434. }
  435. t, err := time.ParseDuration(gw.gwConfig.TimeoutValidateRcpt)
  436. if err != nil {
  437. return configTimeoutValidateRcpt
  438. }
  439. return t
  440. }