@@ -141,32 +141,31 @@ public function get(): ?array
141
141
if ($ this ->autoSetup ) {
142
142
$ this ->setup ();
143
143
}
144
+ $ now = microtime ();
145
+ $ now = substr ($ now , 11 ).substr ($ now , 2 , 3 );
144
146
145
- try {
146
- $ queuedMessageCount = $ this ->connection ->zcount ($ this ->queue , 0 , $ this ->getCurrentTimeInMilliseconds ());
147
- } catch (\RedisException $ e ) {
148
- throw new TransportException ($ e ->getMessage (), 0 , $ e );
149
- }
147
+ if ($ queuedMessageCount = $ this ->rawCommand ('ZCOUNT ' , 0 , $ now )) {
148
+ $ queuedMessages = $ this ->rawCommand ('ZPOPMIN ' , $ queuedMessageCount );
149
+ $ queuedMessageCount = \count ($ queuedMessages );
150
+ $ i = 0 ;
150
151
151
- if ($ queuedMessageCount ) {
152
- for ($ i = 0 ; $ i < $ queuedMessageCount ; ++$ i ) {
153
- try {
154
- $ queuedMessages = $ this ->connection ->zpopmin ($ this ->queue , 1 );
155
- } catch (\RedisException $ e ) {
156
- throw new TransportException ($ e ->getMessage (), 0 , $ e );
157
- }
152
+ while ($ i < $ queuedMessageCount ) {
153
+ $ queuedMessage = $ queuedMessages [$ i ++];
154
+ $ expiry = $ queuedMessages [$ i ++];
155
+
156
+ if (\strlen ($ expiry ) === \strlen ($ now ) ? $ expiry > $ now : \strlen ($ expiry ) < \strlen ($ now )) {
157
+ if (!$ this ->rawCommand ('ZADD ' , 'NX ' , $ expiry , $ queuedMessage )) {
158
+ throw new TransportException ('Could not add a message to the redis stream. ' );
159
+ }
158
160
159
- foreach ($ queuedMessages as $ queuedMessage => $ time ) {
160
- $ queuedMessage = json_decode ($ queuedMessage , true );
161
- // if a futured placed message is actually popped because of a race condition with
162
- // another running message consumer, the message is readded to the queue by add function
163
- // else its just added stream and will be available for all stream consumers
164
- $ this ->add (
165
- $ queuedMessage ['body ' ],
166
- $ queuedMessage ['headers ' ],
167
- $ time - $ this ->getCurrentTimeInMilliseconds ()
168
- );
161
+ continue ;
169
162
}
163
+
164
+ $ queuedMessage = json_decode ($ queuedMessage , true );
165
+ // if a futured placed message is actually popped because of a race condition with
166
+ // another running message consumer, the message is readded to the queue by add function
167
+ // else its just added stream and will be available for all stream consumers
168
+ $ this ->add ($ queuedMessage ['body ' ], $ queuedMessage ['headers ' ], 0 );
170
169
}
171
170
}
172
171
@@ -255,7 +254,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
255
254
}
256
255
257
256
try {
258
- if ($ delayInMs > 0 ) { // the delay could be smaller 0 in a queued message
257
+ if ($ delayInMs > 0 ) { // the delay is <= 0 for queued messages
259
258
$ message = json_encode ([
260
259
'body ' => $ body ,
261
260
'headers ' => $ headers ,
@@ -267,8 +266,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
267
266
throw new TransportException (json_last_error_msg ());
268
267
}
269
268
270
- $ score = $ this ->getCurrentTimeInMilliseconds () + $ delayInMs ;
271
- $ added = $ this ->connection ->zadd ($ this ->queue , ['NX ' ], $ score , $ message );
269
+ $ now = explode (' ' , microtime (), 2 );
270
+ $ now [0 ] = str_pad ($ delayInMs + substr ($ now [0 ], 2 , 3 ), 3 , '0 ' , \STR_PAD_LEFT );
271
+ if (3 < \strlen ($ now [0 ])) {
272
+ $ now [1 ] += substr ($ now [0 ], 0 , -3 );
273
+ $ now [0 ] = substr ($ now [0 ], -3 );
274
+
275
+ if (\is_float ($ now [1 ])) {
276
+ throw new TransportException ("Message delay is too big: {$ delayInMs }ms. " );
277
+ }
278
+ }
279
+
280
+ $ added = $ this ->rawCommand ('ZADD ' , 'NX ' , $ now [1 ].$ now [0 ], $ message );
272
281
} else {
273
282
$ message = json_encode ([
274
283
'body ' => $ body ,
@@ -316,14 +325,30 @@ public function setup(): void
316
325
$ this ->autoSetup = false ;
317
326
}
318
327
319
- private function getCurrentTimeInMilliseconds (): int
320
- {
321
- return (int ) (microtime (true ) * 1000 );
322
- }
323
-
324
328
public function cleanup (): void
325
329
{
326
330
$ this ->connection ->del ($ this ->stream );
327
331
$ this ->connection ->del ($ this ->queue );
328
332
}
333
+
334
+ /**
335
+ * @return mixed
336
+ */
337
+ private function rawCommand (string $ command , ...$ arguments )
338
+ {
339
+ try {
340
+ $ result = $ this ->connection ->rawCommand ($ command , $ this ->queue , ...$ arguments );
341
+ } catch (\RedisException $ e ) {
342
+ throw new TransportException ($ e ->getMessage (), 0 , $ e );
343
+ }
344
+
345
+ if (false === $ result ) {
346
+ if ($ error = $ this ->connection ->getLastError () ?: null ) {
347
+ $ this ->connection ->clearLastError ();
348
+ }
349
+ throw new TransportException ($ error ?? sprintf ('Could not run "%s" on Redis queue. ' , $ command ));
350
+ }
351
+
352
+ return $ result ;
353
+ }
329
354
}
0 commit comments