vendor/predis/predis/src/Connection/StreamConnection.php line 86

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Predis package.
  4.  *
  5.  * (c) 2009-2020 Daniele Alessandri
  6.  * (c) 2021-2025 Till Krüss
  7.  *
  8.  * For the full copyright and license information, please view the LICENSE
  9.  * file that was distributed with this source code.
  10.  */
  11. namespace Predis\Connection;
  12. use Predis\Command\CommandInterface;
  13. use Predis\Command\RawCommand;
  14. use Predis\CommunicationException;
  15. use Predis\Connection\Resource\Exception\StreamInitException;
  16. use Predis\Connection\Resource\StreamFactory;
  17. use Predis\Connection\Resource\StreamFactoryInterface;
  18. use Predis\Consumer\Push\PushNotificationException;
  19. use Predis\Consumer\Push\PushResponse;
  20. use Predis\Protocol\Parser\Strategy\Resp2Strategy;
  21. use Predis\Protocol\Parser\Strategy\Resp3Strategy;
  22. use Predis\Protocol\Parser\UnexpectedTypeException;
  23. use Predis\Response\Error;
  24. use Predis\Response\ErrorInterface as ErrorResponseInterface;
  25. use Psr\Http\Message\StreamInterface;
  26. use RuntimeException;
  27. /**
  28.  * Standard connection to Redis servers implemented on top of PHP's streams.
  29.  * The connection parameters supported by this class are:.
  30.  *
  31.  *  - scheme: it can be either 'redis', 'tcp', 'rediss', 'tls' or 'unix'.
  32.  *  - host: hostname or IP address of the server.
  33.  *  - port: TCP port of the server.
  34.  *  - path: path of a UNIX domain socket when scheme is 'unix'.
  35.  *  - timeout: timeout to perform the connection (default is 5 seconds).
  36.  *  - read_write_timeout: timeout of read / write operations.
  37.  *  - async_connect: performs the connection asynchronously.
  38.  *  - tcp_nodelay: enables or disables Nagle's algorithm for coalescing.
  39.  *  - persistent: the connection is left intact after a GC collection.
  40.  *  - ssl: context options array (see http://php.net/manual/en/context.ssl.php)
  41.  *
  42.  * @method StreamInterface getResource()
  43.  */
  44. class StreamConnection extends AbstractConnection
  45. {
  46.     /**
  47.      * @var StreamFactoryInterface
  48.      */
  49.     protected $streamFactory;
  50.     /**
  51.      * @param ParametersInterface         $parameters
  52.      * @param StreamFactoryInterface|null $factory
  53.      */
  54.     public function __construct(ParametersInterface $parameters, ?StreamFactoryInterface $factory null)
  55.     {
  56.         parent::__construct($parameters);
  57.         $this->parameters->conn_uid spl_object_hash($this);
  58.         $this->streamFactory $factory ?? new StreamFactory();
  59.     }
  60.     /**
  61.      * Disconnects from the server and destroys the underlying resource when the
  62.      * garbage collector kicks in only if the connection has not been marked as
  63.      * persistent.
  64.      */
  65.     public function __destruct()
  66.     {
  67.         if (isset($this->parameters->persistent) && $this->parameters->persistent) {
  68.             return;
  69.         }
  70.         $this->disconnect();
  71.     }
  72.     /**
  73.      * {@inheritdoc}
  74.      */
  75.     protected function createResource(): StreamInterface
  76.     {
  77.         return $this->streamFactory->createStream($this->parameters);
  78.     }
  79.     /**
  80.      * {@inheritdoc}
  81.      */
  82.     public function connect()
  83.     {
  84.         if (parent::connect() && $this->initCommands) {
  85.             foreach ($this->initCommands as $command) {
  86.                 $response $this->executeCommand($command);
  87.                 $this->handleOnConnectResponse($response$command);
  88.             }
  89.         }
  90.     }
  91.     /**
  92.      * {@inheritdoc}
  93.      */
  94.     public function disconnect()
  95.     {
  96.         if ($this->isConnected()) {
  97.             $this->getResource()->close();
  98.             parent::disconnect();
  99.         }
  100.     }
  101.     /**
  102.      * {@inheritDoc}
  103.      * @throws CommunicationException
  104.      */
  105.     public function write(string $buffer): void
  106.     {
  107.         $stream $this->getResource();
  108.         while (($length strlen($buffer)) > 0) {
  109.             try {
  110.                 $written $stream->write($buffer);
  111.             } catch (RuntimeException $e) {
  112.                 $this->onStreamError($e'Error while writing bytes to the server.');
  113.             }
  114.             if ($length === $written) { // @phpstan-ignore-line
  115.                 return;
  116.             }
  117.             $buffer substr($buffer$written); // @phpstan-ignore-line
  118.         }
  119.     }
  120.     /**
  121.      * {@inheritdoc}
  122.      * @throws PushNotificationException
  123.      * @throws StreamInitException|CommunicationException
  124.      */
  125.     public function read()
  126.     {
  127.         $stream $this->getResource();
  128.         if ($stream->eof()) {
  129.             $this->onStreamError(new RuntimeException('Stream is already at the end'), '');
  130.         }
  131.         try {
  132.             $chunk $stream->read(-1);
  133.         } catch (RuntimeException $e) {
  134.             $this->onStreamError($e'Error while reading line from the server.');
  135.         }
  136.         try {
  137.             $parsedData $this->parserStrategy->parseData($chunk); // @phpstan-ignore-line
  138.         } catch (UnexpectedTypeException $e) {
  139.             $this->onProtocolError("Unknown response prefix: '{$e->getType()}'.");
  140.             return;
  141.         }
  142.         if (!is_array($parsedData)) {
  143.             return $parsedData;
  144.         }
  145.         switch ($parsedData['type']) {
  146.             case Resp3Strategy::TYPE_PUSH:
  147.                 $data = [];
  148.                 for ($i 0$i $parsedData['value']; ++$i) {
  149.                     $data[$i] = $this->read();
  150.                 }
  151.                 return new PushResponse($data);
  152.             case Resp2Strategy::TYPE_ARRAY:
  153.                 $data = [];
  154.                 for ($i 0$i $parsedData['value']; ++$i) {
  155.                     $data[$i] = $this->read();
  156.                 }
  157.                 return $data;
  158.             case Resp2Strategy::TYPE_BULK_STRING:
  159.                 $bulkData $this->readByChunks($stream$parsedData['value']);
  160.                 return substr($bulkData0, -2);
  161.             case Resp3Strategy::TYPE_VERBATIM_STRING:
  162.                 $bulkData $this->readByChunks($stream$parsedData['value']);
  163.                 return substr($bulkData$parsedData['offset'], -2);
  164.             case Resp3Strategy::TYPE_BLOB_ERROR:
  165.                 $errorMessage $this->readByChunks($stream$parsedData['value']);
  166.                 return new Error(substr($errorMessage0, -2));
  167.             case Resp3Strategy::TYPE_MAP:
  168.                 $data = [];
  169.                 for ($i 0$i $parsedData['value']; ++$i) {
  170.                     $key $this->read();
  171.                     $data[$key] = $this->read();
  172.                 }
  173.                 return $data;
  174.             case Resp3Strategy::TYPE_SET:
  175.                 $data = [];
  176.                 for ($i 0$i $parsedData['value']; ++$i) {
  177.                     $element $this->read();
  178.                     if (!in_array($element$datatrue)) {
  179.                         $data[] = $element;
  180.                     }
  181.                 }
  182.                 return $data;
  183.         }
  184.         return $parsedData;
  185.     }
  186.     /**
  187.      * {@inheritdoc}
  188.      */
  189.     public function writeRequest(CommandInterface $command)
  190.     {
  191.         $buffer $command->serializeCommand();
  192.         $this->write($buffer);
  193.     }
  194.     /**
  195.      * {@inheritDoc}
  196.      */
  197.     public function hasDataToRead(): bool
  198.     {
  199.         return !$this->getResource()->eof();
  200.     }
  201.     /**
  202.      * Reads given resource split on chunks with given size.
  203.      *
  204.      * @param  StreamInterface        $stream
  205.      * @param  int                    $chunkSize
  206.      * @return string
  207.      * @throws CommunicationException
  208.      */
  209.     private function readByChunks(StreamInterface $streamint $chunkSize): string
  210.     {
  211.         $string '';
  212.         $bytesLeft = ($chunkSize += 2);
  213.         do {
  214.             try {
  215.                 $chunk $stream->read(min($bytesLeft4096));
  216.             } catch (RuntimeException $e) {
  217.                 $this->onStreamError($e'Error while reading bytes from the server.');
  218.             }
  219.             $string .= $chunk// @phpstan-ignore-line
  220.             $bytesLeft $chunkSize strlen($string);
  221.         } while ($bytesLeft 0);
  222.         return $string;
  223.     }
  224.     /**
  225.      * Handle response from on-connect command.
  226.      *
  227.      * @param                         $response
  228.      * @param  CommandInterface       $command
  229.      * @return void
  230.      * @throws CommunicationException
  231.      */
  232.     private function handleOnConnectResponse($responseCommandInterface $command): void
  233.     {
  234.         if ($response instanceof ErrorResponseInterface) {
  235.             $this->handleError($response$command);
  236.         }
  237.         if ($command->getId() === 'HELLO' && is_array($response)) {
  238.             // Searching for the CLIENT ID in RESP2 connection tricky because no dictionaries.
  239.             if (
  240.                 $this->getParameters()->protocol == 2
  241.                 && false !== $key array_search('id'$responsetrue)
  242.             ) {
  243.                 $this->clientId $response[$key 1];
  244.             } elseif ($this->getParameters()->protocol == 3) {
  245.                 $this->clientId $response['id'];
  246.             }
  247.         }
  248.     }
  249.     /**
  250.      * Handle server errors.
  251.      *
  252.      * @param  ErrorResponseInterface $error
  253.      * @param  CommandInterface       $failedCommand
  254.      * @return void
  255.      * @throws CommunicationException
  256.      */
  257.     private function handleError(ErrorResponseInterface $errorCommandInterface $failedCommand): void
  258.     {
  259.         if ($failedCommand->getId() === 'CLIENT') {
  260.             // Do nothing on CLIENT SETINFO command failure
  261.             return;
  262.         }
  263.         if ($failedCommand->getId() === 'HELLO') {
  264.             if (in_array('AUTH'$failedCommand->getArguments(), true)) {
  265.                 $parameters $this->getParameters();
  266.                 // If Redis <= 6.0
  267.                 $auth = new RawCommand('AUTH', [$parameters->password]);
  268.                 $response $this->executeCommand($auth);
  269.                 if ($response instanceof ErrorResponseInterface) {
  270.                     $this->onConnectionError("Failed: {$response->getMessage()}");
  271.                 }
  272.             }
  273.             $setName = new RawCommand('CLIENT', ['SETNAME''predis']);
  274.             $response $this->executeCommand($setName);
  275.             $this->handleOnConnectResponse($response$setName);
  276.             return;
  277.         }
  278.         $this->onConnectionError("Failed: {$error->getMessage()}");
  279.     }
  280.     /**
  281.      * Handles stream-related exceptions.
  282.      *
  283.      * @param  RuntimeException                        $e
  284.      * @param  string|null                             $message
  285.      * @throws RuntimeException|CommunicationException
  286.      */
  287.     protected function onStreamError(RuntimeException $e, ?string $message null)
  288.     {
  289.         // Code = 1 represents issues related to read/write operation.
  290.         if ($e->getCode() === 1) {
  291.             $this->onConnectionError($message);
  292.         }
  293.         throw $e;
  294.     }
  295. }