p_redis.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package backends
  2. import (
  3. "fmt"
  4. "github.com/flashmob/go-guerrilla/mail"
  5. "github.com/flashmob/go-guerrilla/response"
  6. )
  7. // ----------------------------------------------------------------------------------
  8. // Processor Name: redis
  9. // ----------------------------------------------------------------------------------
  10. // Description : Saves the e.Data (email data) and e.DeliveryHeader together in redis
  11. // : using the hash generated by the "hash" processor and stored in
  12. // : e.Hashes
  13. // ----------------------------------------------------------------------------------
  14. // Config Options: redis_expire_seconds int - how many seconds to expiry
  15. // : redis_interface string - <host>:<port> eg, 127.0.0.1:6379
  16. // --------------:-------------------------------------------------------------------
  17. // Input : e.Data
  18. // : e.DeliveryHeader generated by Header() processor
  19. // :
  20. // ----------------------------------------------------------------------------------
  21. // Output : Sets e.QueuedId with the first item fromHashes[0]
  22. // ----------------------------------------------------------------------------------
  23. func init() {
  24. processors["redis"] = func() Decorator {
  25. return Redis()
  26. }
  27. }
  28. type RedisProcessorConfig struct {
  29. RedisExpireSeconds int `json:"redis_expire_seconds"`
  30. RedisInterface string `json:"redis_interface"`
  31. }
  32. type RedisProcessor struct {
  33. isConnected bool
  34. conn RedisConn
  35. }
  36. func (r *RedisProcessor) redisConnection(redisInterface string) (err error) {
  37. if r.isConnected == false {
  38. r.conn, err = RedisDialer("tcp", redisInterface)
  39. if err != nil {
  40. // handle error
  41. return err
  42. }
  43. r.isConnected = true
  44. }
  45. return nil
  46. }
  47. // The redis decorator stores the email data in redis
  48. func Redis() Decorator {
  49. var config *RedisProcessorConfig
  50. redisClient := &RedisProcessor{}
  51. // read the config into RedisProcessorConfig
  52. Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
  53. configType := BaseConfig(&RedisProcessorConfig{})
  54. bcfg, err := Svc.ExtractConfig(backendConfig, configType)
  55. if err != nil {
  56. return err
  57. }
  58. config = bcfg.(*RedisProcessorConfig)
  59. if redisErr := redisClient.redisConnection(config.RedisInterface); redisErr != nil {
  60. err := fmt.Errorf("redis cannot connect, check your settings: %s", redisErr)
  61. return err
  62. }
  63. return nil
  64. }))
  65. // When shutting down
  66. Svc.AddShutdowner(ShutdownWith(func() error {
  67. if redisClient.isConnected {
  68. return redisClient.conn.Close()
  69. }
  70. return nil
  71. }))
  72. var redisErr error
  73. return func(p Processor) Processor {
  74. return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
  75. if task == TaskSaveMail {
  76. hash := ""
  77. if len(e.Hashes) > 0 {
  78. e.QueuedId = e.Hashes[0]
  79. hash = e.Hashes[0]
  80. var stringer fmt.Stringer
  81. // a compressor was set
  82. if c, ok := e.Values["zlib-compressor"]; ok {
  83. stringer = c.(*compressor)
  84. } else {
  85. stringer = e
  86. }
  87. redisErr = redisClient.redisConnection(config.RedisInterface)
  88. if redisErr != nil {
  89. Log().WithError(redisErr).Warn("Error while connecting to redis")
  90. result := NewResult(response.Canned.FailBackendTransaction)
  91. return result, redisErr
  92. }
  93. _, doErr := redisClient.conn.Do("SETEX", hash, config.RedisExpireSeconds, stringer)
  94. if doErr != nil {
  95. Log().WithError(doErr).Warn("Error while SETEX to redis")
  96. result := NewResult(response.Canned.FailBackendTransaction)
  97. return result, redisErr
  98. }
  99. e.Values["redis"] = "redis" // the next processor will know to look in redis for the message data
  100. } else {
  101. Log().Error("Redis needs a Hasher() process before it")
  102. result := NewResult(response.Canned.FailBackendTransaction)
  103. return result, StorageError
  104. }
  105. return p.Process(e, task)
  106. } else {
  107. // nothing to do for this task
  108. return p.Process(e, task)
  109. }
  110. })
  111. }
  112. }