p_redis.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package backends
  2. import (
  3. "fmt"
  4. "github.com/flashmob/go-guerrilla/mail"
  5. "github.com/flashmob/go-guerrilla/response"
  6. "github.com/garyburd/redigo/redis"
  7. )
  8. // ----------------------------------------------------------------------------------
  9. // Processor Name: redis
  10. // ----------------------------------------------------------------------------------
  11. // Description : Saves the e.Data (email data) and e.DeliveryHeader together in redis
  12. // : using the hash generated by the "hash" processor and stored in
  13. // : e.Hashes
  14. // ----------------------------------------------------------------------------------
  15. // Config Options: redis_expire_seconds int - how many seconds to expiry
  16. // : redis_interface string - <host>:<port> eg, 127.0.0.1:6379
  17. // --------------:-------------------------------------------------------------------
  18. // Input : e.Data
  19. // : e.DeliveryHeader generated by Header() processor
  20. // :
  21. // ----------------------------------------------------------------------------------
  22. // Output : Sets e.QueuedId with the first item fromHashes[0]
  23. // ----------------------------------------------------------------------------------
  24. func init() {
  25. processors["redis"] = func() Decorator {
  26. return Redis()
  27. }
  28. }
  29. type RedisProcessorConfig struct {
  30. RedisExpireSeconds int `json:"redis_expire_seconds"`
  31. RedisInterface string `json:"redis_interface"`
  32. }
  33. type RedisProcessor struct {
  34. isConnected bool
  35. conn redis.Conn
  36. }
  37. func (r *RedisProcessor) redisConnection(redisInterface string) (err error) {
  38. if r.isConnected == false {
  39. r.conn, err = redis.Dial("tcp", redisInterface)
  40. if err != nil {
  41. // handle error
  42. return err
  43. }
  44. r.isConnected = true
  45. }
  46. return nil
  47. }
  48. // The redis decorator stores the email data in redis
  49. func Redis() Decorator {
  50. var config *RedisProcessorConfig
  51. redisClient := &RedisProcessor{}
  52. // read the config into RedisProcessorConfig
  53. Svc.AddInitializer(InitializeWith(func(backendConfig BackendConfig) error {
  54. configType := BaseConfig(&RedisProcessorConfig{})
  55. bcfg, err := Svc.ExtractConfig(backendConfig, configType)
  56. if err != nil {
  57. return err
  58. }
  59. config = bcfg.(*RedisProcessorConfig)
  60. if redisErr := redisClient.redisConnection(config.RedisInterface); redisErr != nil {
  61. err := fmt.Errorf("Redis cannot connect, check your settings: %s", redisErr)
  62. return err
  63. }
  64. return nil
  65. }))
  66. // When shutting down
  67. Svc.AddShutdowner(ShutdownWith(func() error {
  68. if redisClient.isConnected {
  69. return redisClient.conn.Close()
  70. }
  71. return nil
  72. }))
  73. var redisErr error
  74. return func(p Processor) Processor {
  75. return ProcessWith(func(e *mail.Envelope, task SelectTask) (Result, error) {
  76. if task == TaskSaveMail {
  77. hash := ""
  78. if len(e.Hashes) > 0 {
  79. e.QueuedId = e.Hashes[0]
  80. hash = e.Hashes[0]
  81. var stringer fmt.Stringer
  82. // a compressor was set
  83. if c, ok := e.Values["zlib-compressor"]; ok {
  84. stringer = c.(*compressor)
  85. } else {
  86. stringer = e
  87. }
  88. redisErr = redisClient.redisConnection(config.RedisInterface)
  89. if redisErr != nil {
  90. Log().WithError(redisErr).Warn("Error while connecting to redis")
  91. result := NewResult(response.Canned.FailBackendTransaction)
  92. return result, redisErr
  93. }
  94. _, doErr := redisClient.conn.Do("SETEX", hash, config.RedisExpireSeconds, stringer)
  95. if doErr != nil {
  96. Log().WithError(doErr).Warn("Error while SETEX to redis")
  97. result := NewResult(response.Canned.FailBackendTransaction)
  98. return result, redisErr
  99. }
  100. e.Values["redis"] = "redis" // the next processor will know to look in redis for the message data
  101. } else {
  102. Log().Error("Redis needs a Hash() process before it")
  103. result := NewResult(response.Canned.FailBackendTransaction)
  104. return result, StorageError
  105. }
  106. return p.Process(e, task)
  107. } else {
  108. // nothing to do for this task
  109. return p.Process(e, task)
  110. }
  111. })
  112. }
  113. }