package backends import ( "fmt" "github.com/flashmob/go-guerrilla/envelope" "github.com/flashmob/go-guerrilla/response" "github.com/garyburd/redigo/redis" ) // ---------------------------------------------------------------------------------- // Processor Name: redis // ---------------------------------------------------------------------------------- // Description : Saves the e.Data (email data) and e.DeliveryHeader together in redis // : using the hash generated by the "hash" processor and stored in // : e.Hashes // ---------------------------------------------------------------------------------- // Config Options: redis_expire_seconds int - how many seconds to expiry // : redis_interface string - : eg, 127.0.0.1:6379 // --------------:------------------------------------------------------------------- // Input : e.Data // : e.DeliveryHeader generated by Header() processor // : // ---------------------------------------------------------------------------------- // Output : Sets e.QueuedId with the first item fromHashes[0] // ---------------------------------------------------------------------------------- func init() { Processors["redis"] = func() Decorator { return Redis() } } type RedisProcessorConfig struct { RedisExpireSeconds int `json:"redis_expire_seconds"` RedisInterface string `json:"redis_interface"` } type RedisProcessor struct { isConnected bool conn redis.Conn } func (r *RedisProcessor) redisConnection(redisInterface string) (err error) { if r.isConnected == false { r.conn, err = redis.Dial("tcp", redisInterface) if err != nil { // handle error return err } r.isConnected = true } return nil } // The redis decorator stores the email data in redis func Redis() Decorator { var config *RedisProcessorConfig redisClient := &RedisProcessor{} // read the config into RedisProcessorConfig Service.AddInitializer(Initialize(func(backendConfig BackendConfig) error { configType := BaseConfig(&RedisProcessorConfig{}) bcfg, err := Service.ExtractConfig(backendConfig, configType) if err != nil { return err } config = bcfg.(*RedisProcessorConfig) if redisErr := redisClient.redisConnection(config.RedisInterface); redisErr != nil { err := fmt.Errorf("Redis cannot connect, check your settings: %s", redisErr) return err } return nil })) // When shutting down Service.AddShutdowner(Shutdown(func() error { if redisClient.isConnected { return redisClient.conn.Close() } return nil })) var redisErr error return func(c Processor) Processor { return ProcessorFunc(func(e *envelope.Envelope) (BackendResult, error) { hash := "" if len(e.Hashes) > 0 { e.QueuedId = e.Hashes[0] hash = e.Hashes[0] var stringer fmt.Stringer // a compressor was set if c, ok := e.Info["zlib-compressor"]; ok { stringer = c.(*compressor) } else { stringer = e } redisErr = redisClient.redisConnection(config.RedisInterface) if redisErr == nil { _, doErr := redisClient.conn.Do("SETEX", hash, config.RedisExpireSeconds, stringer) if doErr != nil { redisErr = doErr } } if redisErr != nil { Log().WithError(redisErr).Warn("Error while talking to redis") result := NewBackendResult(response.Canned.FailBackendTransaction) return result, redisErr } else { e.Info["redis"] = "redis" // the backend system will know to look in redis for the message data } } else { Log().Error("Redis needs a Hash() process before it") } return c.Process(e) }) } }