p_redis.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package backends
  2. import (
  3. "fmt"
  4. "github.com/flashmob/go-guerrilla/envelope"
  5. "github.com/flashmob/go-guerrilla/response"
  6. "github.com/garyburd/redigo/redis"
  7. )
  8. func init() {
  9. Processors["redis"] = func() Decorator {
  10. return Redis()
  11. }
  12. }
  13. type RedisProcessorConfig struct {
  14. RedisExpireSeconds int `json:"redis_expire_seconds"`
  15. RedisInterface string `json:"redis_interface"`
  16. }
  17. type RedisProcessor struct {
  18. isConnected bool
  19. conn redis.Conn
  20. }
  21. func (r *RedisProcessor) redisConnection(redisInterface string) (err error) {
  22. if r.isConnected == false {
  23. r.conn, err = redis.Dial("tcp", redisInterface)
  24. if err != nil {
  25. // handle error
  26. return err
  27. }
  28. r.isConnected = true
  29. }
  30. return nil
  31. }
  32. // The redis decorator stores the email data in redis
  33. func Redis() Decorator {
  34. var config *RedisProcessorConfig
  35. redisClient := &RedisProcessor{}
  36. // read the config into RedisProcessorConfig
  37. Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error {
  38. configType := baseConfig(&RedisProcessorConfig{})
  39. bcfg, err := Service.extractConfig(backendConfig, configType)
  40. if err != nil {
  41. return err
  42. }
  43. config = bcfg.(*RedisProcessorConfig)
  44. if redisErr := redisClient.redisConnection(config.RedisInterface); redisErr != nil {
  45. err := fmt.Errorf("Redis cannot connect, check your settings: %s", redisErr)
  46. return err
  47. }
  48. return nil
  49. }))
  50. // When shutting down
  51. Service.AddShutdowner(Shutdown(func() error {
  52. if redisClient.isConnected {
  53. redisClient.conn.Close()
  54. }
  55. return nil
  56. }))
  57. var redisErr error
  58. return func(c Processor) Processor {
  59. return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
  60. hash := ""
  61. if len(e.Hashes) > 0 {
  62. hash = e.Hashes[0]
  63. var cData *compressor
  64. // a compressor was set
  65. if c, ok := e.Info["zlib-compressor"]; ok {
  66. cData = c.(*compressor)
  67. }
  68. redisErr = redisClient.redisConnection(config.RedisInterface)
  69. if redisErr == nil {
  70. if cData != nil {
  71. // send data is using the compressor
  72. _, doErr := redisClient.conn.Do("SETEX", hash, config.RedisExpireSeconds, cData)
  73. if doErr != nil {
  74. redisErr = doErr
  75. }
  76. } else {
  77. // not using compressor
  78. _, doErr := redisClient.conn.Do("SETEX", hash, config.RedisExpireSeconds, e.Data.String())
  79. if doErr != nil {
  80. redisErr = doErr
  81. }
  82. }
  83. }
  84. if redisErr != nil {
  85. mainlog.WithError(redisErr).Warn("Error while talking to redis")
  86. result := NewBackendResult(response.Canned.FailBackendTransaction)
  87. return result, redisErr
  88. } else {
  89. e.Info["redis"] = "redis" // the backend system will know to look in redis for the message data
  90. }
  91. } else {
  92. mainlog.Error("Redis needs a Hash() process before it")
  93. }
  94. return c.Process(e)
  95. })
  96. }
  97. }