p_redis.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package backends
  2. import (
  3. "github.com/flashmob/go-guerrilla/envelope"
  4. "github.com/flashmob/go-guerrilla/response"
  5. )
  6. type RedisProcessorConfig struct {
  7. RedisExpireSeconds int `json:"redis_expire_seconds"`
  8. RedisInterface string `json:"redis_interface"`
  9. }
  10. // The redis decorator stores the email data in redis
  11. func Redis(dc *DecoratorCallbacks) Decorator {
  12. var config *RedisProcessorConfig
  13. redisClient := &redisClient{}
  14. dc.loader = func(backendConfig BackendConfig) error {
  15. configType := baseConfig(&RedisProcessorConfig{})
  16. bcfg, err := ab.extractConfig(backendConfig, configType)
  17. if err != nil {
  18. return err
  19. }
  20. config = bcfg.(*RedisProcessorConfig)
  21. return nil
  22. }
  23. var redisErr error
  24. return func(c Processor) Processor {
  25. return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) {
  26. hash := ""
  27. if len(e.Hashes) > 0 {
  28. hash = e.Hashes[0]
  29. var cData *compressor
  30. // a compressor was set
  31. if e.Meta != nil {
  32. if c, ok := e.Meta["zlib-compressor"]; ok {
  33. cData = c.(*compressor)
  34. }
  35. } else {
  36. e.Meta = make(map[string]interface{})
  37. }
  38. redisErr = redisClient.redisConnection(config.RedisInterface)
  39. if redisErr == nil {
  40. if cData != nil {
  41. // send data is using the compressor
  42. _, doErr := redisClient.conn.Do("SETEX", hash, config.RedisExpireSeconds, cData)
  43. if doErr != nil {
  44. redisErr = doErr
  45. }
  46. } else {
  47. // not using compressor
  48. _, doErr := redisClient.conn.Do("SETEX", hash, config.RedisExpireSeconds, e.Data.String())
  49. if doErr != nil {
  50. redisErr = doErr
  51. }
  52. }
  53. }
  54. if redisErr != nil {
  55. mainlog.WithError(redisErr).Warn("Error while talking to redis")
  56. result := NewBackendResult(response.Canned.FailBackendTransaction)
  57. return result, redisErr
  58. } else {
  59. e.Meta["redis"] = "redis" // the backend system will know to look in redis for the message data
  60. }
  61. } else {
  62. mainlog.Error("Redis needs a Hash() process before it")
  63. }
  64. return c.Process(e)
  65. })
  66. }
  67. }