@@ -39,6 +39,7 @@ class Connection
39
39
'tls ' => false ,
40
40
'redeliver_timeout ' => 3600 , // Timeout before redeliver messages still in pending state (seconds)
41
41
'claim_interval ' => 60000 , // Interval by which pending/abandoned messages should be checked
42
+ 'lazy ' => false ,
42
43
];
43
44
44
45
private $ connection ;
@@ -61,23 +62,42 @@ public function __construct(array $configuration, array $connectionCredentials =
61
62
throw new LogicException ('The redis transport requires php-redis 4.3.0 or higher. ' );
62
63
}
63
64
64
- $ this -> connection = $ redis ?: new \ Redis () ;
65
- $ this -> connectio
10000
n -> connect ( $ connectionCredentials [ ' host ' ] ?? ' 127.0.0.1 ' , $ connectionCredentials ['port ' ] ?? 6379 ) ;
66
- $ this -> connection -> setOption (\Redis:: OPT_SERIALIZER , $ redisOptions ['serializer ' ] ?? \Redis::SERIALIZER_PHP ) ;
67
-
65
+ $ host = $ connectionCredentials [ ' host ' ] ?? ' 127.0.0.1 ' ;
66
+ $ port = $ connectionCredentials ['port ' ] ?? 6379 ;
67
+ $ serializer = $ redisOptions ['serializer ' ] ?? \Redis::SERIALIZER_PHP ;
68
+ $ dbIndex = $ configuration [ ' dbindex ' ] ?? self :: DEFAULT_OPTIONS [ ' dbindex ' ];
68
69
$ auth = $ connectionCredentials ['auth ' ] ?? null ;
69
70
if ('' === $ auth ) {
70
71
$ auth = null ;
71
72
}
72
73
73
- if (null !== $ auth && !$ this ->connection ->auth ($ auth )) {
74
- throw new InvalidArgumentException ('Redis connection failed: ' .$ this ->connection ->getLastError ());
74
+ $ initializer = static function ($ redis ) use ($ host , $ port , $ auth , $ serializer , $ dbIndex ) {
75
+ $ redis ->connect ($ host , $ port );
76
+ $ redis ->setOption (\Redis::OPT_SERIALIZER , $ serializer );
77
+
78
+ if (null !== $ auth && !$ redis ->auth ($ auth )) {
79
+ throw new InvalidArgumentException ('Redis connection failed: ' .$ redis ->getLastError ());
80
+ }
81
+
82
+ if ($ dbIndex && !$ redis ->select ($ dbIndex )) {
83
+ throw new InvalidArgumentException ('Redis connection failed: ' .$ redis ->getLastError ());
84
+ }
85
+
86
+ return true ;
87
+ };
88
+
89
+ if (null === $ redis ) {
90
+ $ redis = new \Redis ();
75
91
}
76
92
77
- if (($ dbIndex = $ configuration ['dbindex ' ] ?? self ::DEFAULT_OPTIONS ['dbindex ' ]) && !$ this ->connection ->select ($ dbIndex )) {
78
- throw new InvalidArgumentException ('Redis connection failed: ' .$ this ->connection ->getLastError ());
93
+ if ($ configuration ['lazy ' ] ?? self ::DEFAULT_OPTIONS ['lazy ' ]) {
94
+ $ redis = new RedisProxy ($ redis , $ initializer );
95
+ } else {
96
+ $ initializer ($ redis );
79
97
}
80
98
99
+ $ this ->connection = $ redis ;
100
+
81
101
foreach (['stream ' , 'group ' , 'consumer ' ] as $ key ) {
82
102
if (isset ($ configuration [$ key ]) && '' === $ configuration [$ key ]) {
83
103
throw new InvalidArgumentException (sprintf ('"%s" should be configured, got an empty string. ' , $ key ));
@@ -165,6 +185,7 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
165
185
'stream ' => $ redisOptions ['stream ' ] ?? null ,
166
186
'group ' => $ redisOptions ['group ' ] ?? null ,
167
187
'consumer ' => $ redisOptions ['consumer ' ] ?? null ,
188
+ 'lazy ' => $ redisOptions ['lazy ' ] ?? self ::DEFAULT_OPTIONS ['lazy ' ],
168
189
'auto_setup ' => $ autoSetup ,
169
190
'stream_max_entries ' => $ maxEntries ,
170
191
'delete_after_ack ' => $ deleteAfterAck ,
0 commit comments