redis.php 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. <?php
  2. /**
  3. * Part of the Fuel framework.
  4. *
  5. * @package Fuel
  6. * @version 1.5
  7. * @author Fuel Development Team
  8. * @license MIT License
  9. * @copyright 2010 - 2013 Fuel Development Team
  10. * @link http://fuelphp.com
  11. */
  12. /**
  13. * This code is based on Redisent, a Redis interface for the modest.
  14. *
  15. * It has been modified to work with Fuel and to improve the code slightly.
  16. *
  17. * @author Justin Poliey <[email protected]>
  18. * @copyright 2009-2012 Justin Poliey <[email protected]>
  19. * @license http://www.opensource.org/licenses/ISC The ISC License
  20. */
  21. namespace Fuel\Core;
  22. class RedisException extends \FuelException {}
  23. /**
  24. * Redisent, a Redis interface for the modest among us
  25. */
  26. class Redis
  27. {
  28. /**
  29. * Multiton pattern, keep track of all created instances
  30. */
  31. protected static $instances = array();
  32. /**
  33. * Get an instance of the Redis class
  34. */
  35. public static function instance($name = 'default')
  36. {
  37. if ( ! array_key_exists($name, static::$instances))
  38. {
  39. // @deprecated since 1.4
  40. // call forge() if a new instance needs to be created, this should throw an error
  41. return static::forge($name);
  42. }
  43. return static::$instances[$name];
  44. }
  45. /**
  46. * create an instance of the Redis class
  47. */
  48. public static function forge($name = 'default', $config = array())
  49. {
  50. empty(static::$instances) and \Config::load('db', true);
  51. if ( ! ($conf = \Config::get('db.redis.'.$name)))
  52. {
  53. throw new \RedisException('Invalid instance name given.');
  54. }
  55. $config = \Arr::merge($conf, $config);
  56. static::$instances[$name] = new static($config);
  57. return static::$instances[$name];
  58. }
  59. /**
  60. * @var resource
  61. */
  62. protected $connection = false;
  63. /**
  64. * Flag indicating whether or not commands are being pipelined
  65. *
  66. * @var boolean
  67. */
  68. protected $pipelined = false;
  69. /**
  70. * The queue of commands to be sent to the Redis server
  71. *
  72. * @var array
  73. */
  74. protected $queue = array();
  75. /**
  76. * Create a new Redis instance using the configuration values supplied
  77. */
  78. public function __construct(array $config = array())
  79. {
  80. empty($config['timeout']) and $config['timeout'] = ini_get("default_socket_timeout");
  81. $this->connection = @fsockopen($config['hostname'], $config['port'], $errno, $errstr, $config['timeout']);
  82. if ( ! $this->connection)
  83. {
  84. throw new \RedisException($errstr, $errno);
  85. }
  86. else
  87. {
  88. // execute the auth command if a password is present in config
  89. empty($config['password']) or $this->auth($config['password']);
  90. }
  91. }
  92. /**
  93. * Close the open connection on class destruction
  94. */
  95. public function __destruct()
  96. {
  97. $this->connection and fclose($this->connection);
  98. }
  99. /**
  100. * Returns the Redisent instance ready for pipelining.
  101. *
  102. * Redis commands can now be chained, and the array of the responses will be
  103. * returned when {@link execute} is called.
  104. * @see execute
  105. *
  106. */
  107. public function pipeline()
  108. {
  109. $this->pipelined = true;
  110. return $this;
  111. }
  112. /**
  113. * Flushes the commands in the pipeline queue to Redis and returns the responses.
  114. * @see pipeline
  115. */
  116. public function execute()
  117. {
  118. // open a Redis connection and execute the queued commands
  119. foreach ($this->queue as $command)
  120. {
  121. for ($written = 0; $written < strlen($command); $written += $fwrite)
  122. {
  123. $fwrite = fwrite($this->connection, substr($command, $written));
  124. if ($fwrite === false)
  125. {
  126. throw new \RedisException('Failed to write entire command to stream');
  127. }
  128. }
  129. }
  130. // Read in the results from the pipelined commands
  131. $responses = array();
  132. for ($i = 0; $i < count($this->queue); $i++)
  133. {
  134. $responses[] = $this->readResponse();
  135. }
  136. // Clear the queue and return the response
  137. $this->queue = array();
  138. if ($this->pipelined)
  139. {
  140. $this->pipelined = false;
  141. return $responses;
  142. }
  143. else
  144. {
  145. return $responses[0];
  146. }
  147. }
  148. /**
  149. */
  150. public function __call($name, $args)
  151. {
  152. // build the Redis unified protocol command
  153. array_unshift($args, strtoupper($name));
  154. $command = sprintf('*%d%s%s%s', count($args), CRLF, implode(array_map(function($arg) {
  155. return sprintf('$%d%s%s', strlen($arg), CRLF, $arg);
  156. }, $args), CRLF), CRLF);
  157. // add it to the pipeline queue
  158. $this->queue[] = $command;
  159. if ($this->pipelined)
  160. {
  161. return $this;
  162. }
  163. else
  164. {
  165. return $this->execute();
  166. }
  167. }
  168. protected function readResponse()
  169. {
  170. // parse the response based on the reply identifier
  171. $reply = trim(fgets($this->connection, 512));
  172. switch (substr($reply, 0, 1))
  173. {
  174. // error reply
  175. case '-':
  176. throw new \RedisException(trim(substr($reply, 4)));
  177. break;
  178. // inline reply
  179. case '+':
  180. $response = substr(trim($reply), 1);
  181. if ($response === 'OK')
  182. {
  183. $response = true;
  184. }
  185. break;
  186. // bulk reply
  187. case '$':
  188. $response = null;
  189. if ($reply == '$-1')
  190. {
  191. break;
  192. }
  193. $read = 0;
  194. $size = intval(substr($reply, 1));
  195. if ($size > 0)
  196. {
  197. do
  198. {
  199. $block_size = ($size - $read) > 1024 ? 1024 : ($size - $read);
  200. $r = fread($this->connection, $block_size);
  201. if ($r === false)
  202. {
  203. throw new \RedisException('Failed to read response from stream');
  204. }
  205. else
  206. {
  207. $read += strlen($r);
  208. $response .= $r;
  209. }
  210. }
  211. while ($read < $size);
  212. }
  213. // discard the crlf
  214. fread($this->connection, 2);
  215. break;
  216. // multi-bulk reply
  217. case '*':
  218. $count = intval(substr($reply, 1));
  219. if ($count == '-1')
  220. {
  221. return null;
  222. }
  223. $response = array();
  224. for ($i = 0; $i < $count; $i++)
  225. {
  226. $response[] = $this->readResponse();
  227. }
  228. break;
  229. // integer reply
  230. case ':':
  231. $response = intval(substr(trim($reply), 1));
  232. break;
  233. default:
  234. throw new \RedisException("Unknown response: {$reply}");
  235. break;
  236. }
  237. // party on...
  238. return $response;
  239. }
  240. }