vendor/predis/predis/src/Client.php line 334

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Predis package.
  4.  *
  5.  * (c) 2009-2020 Daniele Alessandri
  6.  * (c) 2021-2025 Till Krüss
  7.  *
  8.  * For the full copyright and license information, please view the LICENSE
  9.  * file that was distributed with this source code.
  10.  */
  11. namespace Predis;
  12. use ArrayIterator;
  13. use InvalidArgumentException;
  14. use IteratorAggregate;
  15. use Predis\Command\CommandInterface;
  16. use Predis\Command\Container\ContainerFactory;
  17. use Predis\Command\Container\ContainerInterface;
  18. use Predis\Command\RawCommand;
  19. use Predis\Command\ScriptCommand;
  20. use Predis\Configuration\Options;
  21. use Predis\Configuration\OptionsInterface;
  22. use Predis\Connection\ConnectionInterface;
  23. use Predis\Connection\Parameters;
  24. use Predis\Connection\ParametersInterface;
  25. use Predis\Connection\RelayConnection;
  26. use Predis\Consumer\PubSub\Consumer as PubSubConsumer;
  27. use Predis\Consumer\PubSub\RelayConsumer as RelayPubSubConsumer;
  28. use Predis\Consumer\Push\Consumer as PushConsumer;
  29. use Predis\Monitor\Consumer as MonitorConsumer;
  30. use Predis\Pipeline\Atomic;
  31. use Predis\Pipeline\FireAndForget;
  32. use Predis\Pipeline\Pipeline;
  33. use Predis\Pipeline\RelayAtomic;
  34. use Predis\Pipeline\RelayPipeline;
  35. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  36. use Predis\Response\ResponseInterface;
  37. use Predis\Response\ServerException;
  38. use Predis\Transaction\MultiExec as MultiExecTransaction;
  39. use ReturnTypeWillChange;
  40. use RuntimeException;
  41. use Traversable;
  42. /**
  43.  * Client class used for connecting and executing commands on Redis.
  44.  *
  45.  * This is the main high-level abstraction of Predis upon which various other
  46.  * abstractions are built. Internally it aggregates various other classes each
  47.  * one with its own responsibility and scope.
  48.  *
  49.  * @template-implements \IteratorAggregate<string, static>
  50.  */
  51. class Client implements ClientInterfaceIteratorAggregate
  52. {
  53.     public const VERSION '3.0.1';
  54.     /** @var OptionsInterface */
  55.     private $options;
  56.     /** @var ConnectionInterface */
  57.     private $connection;
  58.     /** @var Command\FactoryInterface */
  59.     private $commands;
  60.     /**
  61.      * @param mixed $parameters Connection parameters for one or more servers.
  62.      * @param mixed $options    Options to configure some behaviours of the client.
  63.      */
  64.     public function __construct($parameters null$options null)
  65.     {
  66.         $this->options = static::createOptions($options ?? new Options());
  67.         $this->connection = static::createConnection($this->options$parameters ?? new Parameters());
  68.         $this->commands $this->options->commands;
  69.     }
  70.     /**
  71.      * Creates a new set of client options for the client.
  72.      *
  73.      * @param array|OptionsInterface $options Set of client options
  74.      *
  75.      * @return OptionsInterface
  76.      * @throws InvalidArgumentException
  77.      */
  78.     protected static function createOptions($options)
  79.     {
  80.         if (is_array($options)) {
  81.             return new Options($options);
  82.         } elseif ($options instanceof OptionsInterface) {
  83.             return $options;
  84.         } else {
  85.             throw new InvalidArgumentException('Invalid type for client options');
  86.         }
  87.     }
  88.     /**
  89.      * Creates single or aggregate connections from supplied arguments.
  90.      *
  91.      * This method accepts the following types to create a connection instance:
  92.      *
  93.      *  - Array (dictionary: single connection, indexed: aggregate connections)
  94.      *  - String (URI for a single connection)
  95.      *  - Callable (connection initializer callback)
  96.      *  - Instance of Predis\Connection\ParametersInterface (used as-is)
  97.      *  - Instance of Predis\Connection\ConnectionInterface (returned as-is)
  98.      *
  99.      * When a callable is passed, it receives the original set of client options
  100.      * and must return an instance of Predis\Connection\ConnectionInterface.
  101.      *
  102.      * Connections are created using the connection factory (in case of single
  103.      * connections) or a specialized aggregate connection initializer (in case
  104.      * of cluster and replication) retrieved from the supplied client options.
  105.      *
  106.      * @param OptionsInterface $options    Client options container
  107.      * @param mixed            $parameters Connection parameters
  108.      *
  109.      * @return ConnectionInterface
  110.      * @throws InvalidArgumentException
  111.      */
  112.     protected static function createConnection(OptionsInterface $options$parameters)
  113.     {
  114.         if ($parameters instanceof ConnectionInterface) {
  115.             return $parameters;
  116.         }
  117.         if ($parameters instanceof ParametersInterface || is_string($parameters)) {
  118.             return $options->connections->create($parameters);
  119.         }
  120.         if (is_array($parameters)) {
  121.             if (!isset($parameters[0])) {
  122.                 return $options->connections->create($parameters);
  123.             } elseif ($options->defined('cluster') && $initializer $options->cluster) {
  124.                 return $initializer($parameterstrue);
  125.             } elseif ($options->defined('replication') && $initializer $options->replication) {
  126.                 return $initializer($parameterstrue);
  127.             } elseif ($options->defined('aggregate') && $initializer $options->aggregate) {
  128.                 return $initializer($parametersfalse);
  129.             } else {
  130.                 throw new InvalidArgumentException(
  131.                     'Array of connection parameters requires `cluster`, `replication` or `aggregate` client option'
  132.                 );
  133.             }
  134.         }
  135.         if (is_callable($parameters)) {
  136.             $connection call_user_func($parameters$options);
  137.             if (!$connection instanceof ConnectionInterface) {
  138.                 throw new InvalidArgumentException('Callable parameters must return a valid connection');
  139.             }
  140.             return $connection;
  141.         }
  142.         throw new InvalidArgumentException('Invalid type for connection parameters');
  143.     }
  144.     /**
  145.      * {@inheritdoc}
  146.      */
  147.     public function getCommandFactory()
  148.     {
  149.         return $this->commands;
  150.     }
  151.     /**
  152.      * {@inheritdoc}
  153.      */
  154.     public function getOptions()
  155.     {
  156.         return $this->options;
  157.     }
  158.     /**
  159.      * Creates a new client using a specific underlying connection.
  160.      *
  161.      * This method allows to create a new client instance by picking a specific
  162.      * connection out of an aggregate one, with the same options of the original
  163.      * client instance.
  164.      *
  165.      * The specified selector defines which logic to use to look for a suitable
  166.      * connection by the specified value. Supported selectors are:
  167.      *
  168.      *   - `id`
  169.      *   - `key`
  170.      *   - `slot`
  171.      *   - `command`
  172.      *   - `alias`
  173.      *   - `role`
  174.      *
  175.      * Internally the client relies on duck-typing and follows this convention:
  176.      *
  177.      *   $selector string => getConnectionBy$selector($value) method
  178.      *
  179.      * This means that support for specific selectors may vary depending on the
  180.      * actual logic implemented by connection classes and there is no interface
  181.      * binding a connection class to implement any of these.
  182.      *
  183.      * @param string $selector Type of selector.
  184.      * @param mixed  $value    Value to be used by the selector.
  185.      *
  186.      * @return ClientInterface
  187.      */
  188.     public function getClientBy($selector$value)
  189.     {
  190.         $selector strtolower($selector);
  191.         if (!in_array($selector, ['id''key''slot''role''alias''command'])) {
  192.             throw new InvalidArgumentException("Invalid selector type: `$selector`");
  193.         }
  194.         if (!method_exists($this->connection$method "getConnectionBy$selector")) {
  195.             $class get_class($this->connection);
  196.             throw new InvalidArgumentException("Selecting connection by $selector is not supported by $class");
  197.         }
  198.         if (!$connection $this->connection->$method($value)) {
  199.             throw new InvalidArgumentException("Cannot find a connection by $selector matching `$value`");
  200.         }
  201.         return new static($connection$this->getOptions());
  202.     }
  203.     /**
  204.      * Opens the underlying connection and connects to the server.
  205.      */
  206.     public function connect()
  207.     {
  208.         $this->connection->connect();
  209.     }
  210.     /**
  211.      * Closes the underlying connection and disconnects from the server.
  212.      */
  213.     public function disconnect()
  214.     {
  215.         $this->connection->disconnect();
  216.     }
  217.     /**
  218.      * Closes the underlying connection and disconnects from the server.
  219.      *
  220.      * This is the same as `Client::disconnect()` as it does not actually send
  221.      * the `QUIT` command to Redis, but simply closes the connection.
  222.      */
  223.     public function quit()
  224.     {
  225.         $this->disconnect();
  226.     }
  227.     /**
  228.      * Returns the current state of the underlying connection.
  229.      *
  230.      * @return bool
  231.      */
  232.     public function isConnected()
  233.     {
  234.         return $this->connection->isConnected();
  235.     }
  236.     /**
  237.      * {@inheritdoc}
  238.      */
  239.     public function getConnection()
  240.     {
  241.         return $this->connection;
  242.     }
  243.     /**
  244.      * Applies the configured serializer and compression to given value.
  245.      *
  246.      * @param  mixed  $value
  247.      * @return string
  248.      */
  249.     public function pack($value)
  250.     {
  251.         return $this->connection instanceof RelayConnection
  252.             $this->connection->pack($value)
  253.             : $value;
  254.     }
  255.     /**
  256.      * Deserializes and decompresses to given value.
  257.      *
  258.      * @param  mixed  $value
  259.      * @return string
  260.      */
  261.     public function unpack($value)
  262.     {
  263.         return $this->connection instanceof RelayConnection
  264.             $this->connection->unpack($value)
  265.             : $value;
  266.     }
  267.     /**
  268.      * Executes a command without filtering its arguments, parsing the response,
  269.      * applying any prefix to keys or throwing exceptions on Redis errors even
  270.      * regardless of client options.
  271.      *
  272.      * It is possible to identify Redis error responses from normal responses
  273.      * using the second optional argument which is populated by reference.
  274.      *
  275.      * @param array $arguments Command arguments as defined by the command signature.
  276.      * @param bool  $error     Set to TRUE when Redis returned an error response.
  277.      *
  278.      * @return mixed
  279.      */
  280.     public function executeRaw(array $arguments, &$error null)
  281.     {
  282.         $error false;
  283.         $commandID array_shift($arguments);
  284.         $response $this->connection->executeCommand(
  285.             new RawCommand($commandID$arguments)
  286.         );
  287.         if ($response instanceof ResponseInterface) {
  288.             if ($response instanceof ErrorResponseInterface) {
  289.                 $error true;
  290.             }
  291.             return (string) $response;
  292.         }
  293.         return $response;
  294.     }
  295.     /**
  296.      * {@inheritdoc}
  297.      */
  298.     public function __call($commandID$arguments)
  299.     {
  300.         return $this->executeCommand(
  301.             $this->createCommand($commandID$arguments)
  302.         );
  303.     }
  304.     /**
  305.      * {@inheritdoc}
  306.      */
  307.     public function createCommand($commandID$arguments = [])
  308.     {
  309.         return $this->commands->create($commandID$arguments);
  310.     }
  311.     /**
  312.      * @param  string             $name
  313.      * @return ContainerInterface
  314.      */
  315.     public function __get(string $name)
  316.     {
  317.         return ContainerFactory::create($this$name);
  318.     }
  319.     /**
  320.      * @param  string $name
  321.      * @param  mixed  $value
  322.      * @return mixed
  323.      */
  324.     public function __set(string $name$value)
  325.     {
  326.         throw new RuntimeException('Not allowed');
  327.     }
  328.     /**
  329.      * @param  string $name
  330.      * @return mixed
  331.      */
  332.     public function __isset(string $name)
  333.     {
  334.         throw new RuntimeException('Not allowed');
  335.     }
  336.     /**
  337.      * {@inheritdoc}
  338.      */
  339.     public function executeCommand(CommandInterface $command)
  340.     {
  341.         $response $this->connection->executeCommand($command);
  342.         $parameters $this->connection->getParameters();
  343.         if ($response instanceof ResponseInterface) {
  344.             if ($response instanceof ErrorResponseInterface) {
  345.                 $response $this->onErrorResponse($command$response);
  346.             }
  347.             return $response;
  348.         }
  349.         if ($parameters->protocol === 2) {
  350.             return $command->parseResponse($response);
  351.         }
  352.         return $command->parseResp3Response($response);
  353.     }
  354.     /**
  355.      * Handles -ERR responses returned by Redis.
  356.      *
  357.      * @param CommandInterface       $command  Redis command that generated the error.
  358.      * @param ErrorResponseInterface $response Instance of the error response.
  359.      *
  360.      * @return mixed
  361.      * @throws ServerException
  362.      */
  363.     protected function onErrorResponse(CommandInterface $commandErrorResponseInterface $response)
  364.     {
  365.         if ($command instanceof ScriptCommand && $response->getErrorType() === 'NOSCRIPT') {
  366.             $response $this->executeCommand($command->getEvalCommand());
  367.             if (!$response instanceof ResponseInterface) {
  368.                 $response $command->parseResponse($response);
  369.             }
  370.             return $response;
  371.         }
  372.         if ($this->options->exceptions) {
  373.             throw new ServerException($response->getMessage());
  374.         }
  375.         return $response;
  376.     }
  377.     /**
  378.      * Executes the specified initializer method on `$this` by adjusting the
  379.      * actual invocation depending on the arity (0, 1 or 2 arguments). This is
  380.      * simply an utility method to create Redis contexts instances since they
  381.      * follow a common initialization path.
  382.      *
  383.      * @param string $initializer Method name.
  384.      * @param array  $argv        Arguments for the method.
  385.      *
  386.      * @return mixed
  387.      */
  388.     private function sharedContextFactory($initializer$argv null)
  389.     {
  390.         switch (count($argv)) {
  391.             case 0:
  392.                 return $this->$initializer();
  393.             case 1:
  394.                 return is_array($argv[0])
  395.                     ? $this->$initializer($argv[0])
  396.                     : $this->$initializer(null$argv[0]);
  397.             case 2:
  398.                 [$arg0$arg1] = $argv;
  399.                 return $this->$initializer($arg0$arg1);
  400.             default:
  401.                 return $this->$initializer($this$argv);
  402.         }
  403.     }
  404.     /**
  405.      * Creates a new pipeline context and returns it, or returns the results of
  406.      * a pipeline executed inside the optionally provided callable object.
  407.      *
  408.      * @param mixed ...$arguments Array of options, a callable for execution, or both.
  409.      *
  410.      * @return Pipeline|array
  411.      */
  412.     public function pipeline(...$arguments)
  413.     {
  414.         return $this->sharedContextFactory('createPipeline'func_get_args());
  415.     }
  416.     /**
  417.      * Actual pipeline context initializer method.
  418.      *
  419.      * @param array|null $options  Options for the context.
  420.      * @param mixed      $callable Optional callable used to execute the context.
  421.      *
  422.      * @return Pipeline|array
  423.      */
  424.     protected function createPipeline(?array $options null$callable null)
  425.     {
  426.         if (isset($options['atomic']) && $options['atomic']) {
  427.             $class Atomic::class;
  428.         } elseif (isset($options['fire-and-forget']) && $options['fire-and-forget']) {
  429.             $class FireAndForget::class;
  430.         } else {
  431.             $class Pipeline::class;
  432.         }
  433.         if ($this->connection instanceof RelayConnection) {
  434.             if (isset($options['atomic']) && $options['atomic']) {
  435.                 $class RelayAtomic::class;
  436.             } elseif (isset($options['fire-and-forget']) && $options['fire-and-forget']) {
  437.                 throw new NotSupportedException('The "relay" extension does not support fire-and-forget pipelines.');
  438.             } else {
  439.                 $class RelayPipeline::class;
  440.             }
  441.         }
  442.         /*
  443.          * @var ClientContextInterface
  444.          */
  445.         $pipeline = new $class($this);
  446.         if (isset($callable)) {
  447.             return $pipeline->execute($callable);
  448.         }
  449.         return $pipeline;
  450.     }
  451.     /**
  452.      * Creates a new transaction context and returns it, or returns the results
  453.      * of a transaction executed inside the optionally provided callable object.
  454.      *
  455.      * @param mixed ...$arguments Array of options, a callable for execution, or both.
  456.      *
  457.      * @return MultiExecTransaction|array
  458.      */
  459.     public function transaction(...$arguments)
  460.     {
  461.         return $this->sharedContextFactory('createTransaction'func_get_args());
  462.     }
  463.     /**
  464.      * Actual transaction context initializer method.
  465.      *
  466.      * @param array|null $options  Options for the context.
  467.      * @param mixed      $callable Optional callable used to execute the context.
  468.      *
  469.      * @return MultiExecTransaction|array
  470.      */
  471.     protected function createTransaction(?array $options null$callable null)
  472.     {
  473.         $transaction = new MultiExecTransaction($this$options);
  474.         if (isset($callable)) {
  475.             return $transaction->execute($callable);
  476.         }
  477.         return $transaction;
  478.     }
  479.     /**
  480.      * Creates a new publish/subscribe context and returns it, or starts its loop
  481.      * inside the optionally provided callable object.
  482.      *
  483.      * @param mixed ...$arguments Array of options, a callable for execution, or both.
  484.      *
  485.      * @return PubSubConsumer|null
  486.      */
  487.     public function pubSubLoop(...$arguments)
  488.     {
  489.         return $this->sharedContextFactory('createPubSub'func_get_args());
  490.     }
  491.     /**
  492.      * Creates new push notifications consumer.
  493.      *
  494.      * @param  callable|null $preLoopCallback Callback that should be called on client before enter a loop.
  495.      * @return PushConsumer
  496.      */
  497.     public function push(?callable $preLoopCallback null): PushConsumer
  498.     {
  499.         return new PushConsumer($this$preLoopCallback);
  500.     }
  501.     /**
  502.      * Actual publish/subscribe context initializer method.
  503.      *
  504.      * @param array|null $options  Options for the context.
  505.      * @param mixed      $callable Optional callable used to execute the context.
  506.      *
  507.      * @return PubSubConsumer|null
  508.      */
  509.     protected function createPubSub(?array $options null$callable null)
  510.     {
  511.         if ($this->connection instanceof RelayConnection) {
  512.             $pubsub = new RelayPubSubConsumer($this$options);
  513.         } else {
  514.             $pubsub = new PubSubConsumer($this$options);
  515.         }
  516.         if (!isset($callable)) {
  517.             return $pubsub;
  518.         }
  519.         foreach ($pubsub as $message) {
  520.             if (call_user_func($callable$pubsub$message) === false) {
  521.                 $pubsub->stop();
  522.             }
  523.         }
  524.         return null;
  525.     }
  526.     /**
  527.      * Creates a new monitor consumer and returns it.
  528.      *
  529.      * @return MonitorConsumer
  530.      */
  531.     public function monitor()
  532.     {
  533.         return new MonitorConsumer($this);
  534.     }
  535.     /**
  536.      * @return Traversable<string, static>
  537.      */
  538.     #[ReturnTypeWillChange]
  539.     public function getIterator()
  540.     {
  541.         $clients = [];
  542.         $connection $this->getConnection();
  543.         if (!$connection instanceof Traversable) {
  544.             return new ArrayIterator([
  545.                 (string) $connection => new static($connection$this->getOptions()),
  546.             ]);
  547.         }
  548.         foreach ($connection as $node) {
  549.             $clients[(string) $node] = new static($node$this->getOptions());
  550.         }
  551.         return new ArrayIterator($clients);
  552.     }
  553. }