vendor/predis/predis/src/Connection/Resource/StreamFactory.php line 26

Open in your IDE?
  1. <?php
  2. /*
  3.  * This file is part of the Predis package.
  4.  *
  5.  * (c) 2009-2020 Daniele Alessandri
  6.  * (c) 2021-2025 Till Krüss
  7.  *
  8.  * For the full copyright and license information, please view the LICENSE
  9.  * file that was distributed with this source code.
  10.  */
  11. namespace Predis\Connection\Resource;
  12. use InvalidArgumentException;
  13. use Predis\Connection\ParametersInterface;
  14. use Predis\Connection\Resource\Exception\StreamInitException;
  15. use Psr\Http\Message\StreamInterface;
  16. class StreamFactory implements StreamFactoryInterface
  17. {
  18.     /**
  19.      * {@inheritDoc}
  20.      * @throws StreamInitException
  21.      */
  22.     public function createStream(ParametersInterface $parameters): StreamInterface
  23.     {
  24.         $parameters $this->assertParameters($parameters);
  25.         switch ($parameters->scheme) {
  26.             case 'tcp':
  27.             case 'redis':
  28.                 $stream $this->tcpStreamInitializer($parameters);
  29.                 break;
  30.             case 'unix':
  31.                 $stream $this->unixStreamInitializer($parameters);
  32.                 break;
  33.             case 'tls':
  34.             case 'rediss':
  35.                 $stream $this->tlsStreamInitializer($parameters);
  36.                 break;
  37.             default:
  38.                 throw new InvalidArgumentException("Invalid scheme: '{$parameters->scheme}'.");
  39.         }
  40.         return new Stream($stream);
  41.     }
  42.     /**
  43.      * Checks some parameters used to initialize the connection.
  44.      *
  45.      * @param ParametersInterface $parameters Initialization parameters for the connection.
  46.      *
  47.      * @return ParametersInterface
  48.      * @throws InvalidArgumentException
  49.      */
  50.     protected function assertParameters(ParametersInterface $parameters): ParametersInterface
  51.     {
  52.         switch ($parameters->scheme) {
  53.             case 'tcp':
  54.             case 'redis':
  55.             case 'unix':
  56.             case 'tls':
  57.             case 'rediss':
  58.                 break;
  59.             default:
  60.                 throw new InvalidArgumentException("Invalid scheme: '$parameters->scheme'.");
  61.         }
  62.         return $parameters;
  63.     }
  64.     /**
  65.      * Initializes a TCP stream resource.
  66.      *
  67.      * @param ParametersInterface $parameters Initialization parameters for the connection.
  68.      *
  69.      * @return resource
  70.      * @throws StreamInitException
  71.      */
  72.     protected function tcpStreamInitializer(ParametersInterface $parameters)
  73.     {
  74.         if (!filter_var($parameters->hostFILTER_VALIDATE_IPFILTER_FLAG_IPV6)) {
  75.             $address "tcp://$parameters->host:$parameters->port";
  76.         } else {
  77.             $address "tcp://[$parameters->host]:$parameters->port";
  78.         }
  79.         $flags STREAM_CLIENT_CONNECT;
  80.         if (isset($parameters->async_connect) && $parameters->async_connect) {
  81.             $flags |= STREAM_CLIENT_ASYNC_CONNECT;
  82.         }
  83.         if (isset($parameters->persistent)) {
  84.             if (false !== $persistent filter_var($parameters->persistentFILTER_VALIDATE_BOOLEANFILTER_NULL_ON_FAILURE)) {
  85.                 $flags |= STREAM_CLIENT_PERSISTENT;
  86.                 if ($persistent === null) {
  87.                     $address "{$address}/{$parameters->persistent}";
  88.                 }
  89.             }
  90.         }
  91.         return $this->createStreamSocket($parameters$address$flags);
  92.     }
  93.     /**
  94.      * Initializes a UNIX stream resource.
  95.      *
  96.      * @param ParametersInterface $parameters Initialization parameters for the connection.
  97.      *
  98.      * @return resource
  99.      * @throws StreamInitException
  100.      */
  101.     protected function unixStreamInitializer(ParametersInterface $parameters)
  102.     {
  103.         if (!isset($parameters->path)) {
  104.             throw new InvalidArgumentException('Missing UNIX domain socket path.');
  105.         }
  106.         $flags STREAM_CLIENT_CONNECT;
  107.         if (isset($parameters->persistent)) {
  108.             if (false !== $persistent filter_var($parameters->persistentFILTER_VALIDATE_BOOLEANFILTER_NULL_ON_FAILURE)) {
  109.                 $flags |= STREAM_CLIENT_PERSISTENT;
  110.                 if ($persistent === null) {
  111.                     throw new InvalidArgumentException(
  112.                         'Persistent connection IDs are not supported when using UNIX domain sockets.'
  113.                     );
  114.                 }
  115.             }
  116.         }
  117.         return $this->createStreamSocket($parameters"unix://{$parameters->path}"$flags);
  118.     }
  119.     /**
  120.      * Initializes a SSL-encrypted TCP stream resource.
  121.      *
  122.      * @param ParametersInterface $parameters Initialization parameters for the connection.
  123.      *
  124.      * @return resource
  125.      * @throws StreamInitException
  126.      */
  127.     protected function tlsStreamInitializer(ParametersInterface $parameters)
  128.     {
  129.         $resource $this->tcpStreamInitializer($parameters);
  130.         $metadata stream_get_meta_data($resource);
  131.         // Detect if crypto mode is already enabled for this stream (PHP >= 7.0.0).
  132.         if (isset($metadata['crypto'])) {
  133.             return $resource;
  134.         }
  135.         if (isset($parameters->ssl) && is_array($parameters->ssl)) {
  136.             $options $parameters->ssl;
  137.         } else {
  138.             $options = [];
  139.         }
  140.         if (!isset($options['crypto_type'])) {
  141.             $options['crypto_type'] = STREAM_CRYPTO_METHOD_TLS_CLIENT;
  142.         }
  143.         if (!stream_context_set_option($resource, ['ssl' => $options])) {
  144.             $this->onInitializationError($resource$parameters'Error while setting SSL context options');
  145.         }
  146.         if (!stream_socket_enable_crypto($resourcetrue$options['crypto_type'])) {
  147.             $this->onInitializationError($resource$parameters'Error while switching to encrypted communication');
  148.         }
  149.         return $resource;
  150.     }
  151.     /**
  152.      * Creates a connected stream socket resource.
  153.      *
  154.      * @param ParametersInterface $parameters Connection parameters.
  155.      * @param string              $address    Address for stream_socket_client().
  156.      * @param int                 $flags      Flags for stream_socket_client().
  157.      *
  158.      * @return resource
  159.      * @throws StreamInitException
  160.      */
  161.     protected function createStreamSocket(ParametersInterface $parameters$address$flags)
  162.     {
  163.         $timeout = (isset($parameters->timeout) ? (float) $parameters->timeout 5.0);
  164.         $context stream_context_create(['socket' => ['tcp_nodelay' => (bool) $parameters->tcp_nodelay]]);
  165.         if (
  166.             (isset($parameters->persistent) && $parameters->persistent)
  167.             && (isset($parameters->conn_uid) && $parameters->conn_uid)
  168.         ) {
  169.             $conn_uid '/' $parameters->conn_uid;
  170.         } else {
  171.             $conn_uid '';
  172.         }
  173.         // Needs to create multiple persistent connections to the same resource
  174.         $address $address $conn_uid;
  175.         if (!$resource = @stream_socket_client($address$errno$errstr$timeout$flags$context)) {
  176.             $this->onInitializationError($resource$parameterstrim($errstr), $errno);
  177.         }
  178.         if (isset($parameters->read_write_timeout)) {
  179.             $rwtimeout = (float) $parameters->read_write_timeout;
  180.             $rwtimeout $rwtimeout $rwtimeout : -1;
  181.             $timeoutSeconds floor($rwtimeout);
  182.             $timeoutUSeconds = ($rwtimeout $timeoutSeconds) * 1000000;
  183.             stream_set_timeout($resource$timeoutSeconds$timeoutUSeconds);
  184.         }
  185.         return $resource;
  186.     }
  187.     /**
  188.      * Helper method to handle connection errors.
  189.      *
  190.      * @param  string              $message Error message.
  191.      * @param  int                 $code    Error code.
  192.      * @throws StreamInitException
  193.      */
  194.     protected function onInitializationError($streamParametersInterface $parametersstring $messageint $code 0): void
  195.     {
  196.         if (is_resource($stream)) {
  197.             fclose($stream);
  198.         }
  199.         throw new StreamInitException("$message [{$parameters}]"$code);
  200.     }
  201. }