@@ -141,32 +141,35 @@ public function get(): ?array
141
141
if ($ this ->autoSetup ) {
142
142
$ this ->setup ();
143
143
}
144
+ $ now = microtime ();
145
+ $ now = substr ($ now , 11 ).substr ($ now , 2 , 3 );
144
146
145
- try {
146
- $ queuedMessageCount = $ this ->connection ->zcount ($ this ->queue , 0 , $ this ->getCurrentTimeInMilliseconds ());
147
- } catch (\RedisException $ e ) {
148
- throw new TransportException ($ e ->getMessage (), 0 , $ e );
149
- }
147
+ if ($ queuedMessageCount = $ this ->rawCommand ('ZCOUNT ' , 0 , $ now )) {
148
+ $ queuedMessages = $ this ->rawCommand ('ZPOPMIN ' , $ queuedMessageCount );
150
149
10000
151
- if ($ queuedMessageCount ) {
152
- for ($ i = 0 ; $ i < $ queuedMessageCount ; ++$ i ) {
153
- try {
154
- $ queuedMessages = $ this ->connection ->zpopmin ($ this ->queue , 1 );
155
- } catch (\RedisException $ e ) {
156
- throw new TransportException ($ e ->getMessage (), 0 , $ e );
157
- }
150
+ $ i = \count ($ queuedMessages );
151
+
152
+ while (2 <= $ i ) {
153
+ $ expiry = $ queuedMessages [--$ i ];
154
+ $ queuedMessage = $ queuedMessages [--$ i ];
155
+
156
+ if (\strlen ($ expiry ) === \strlen ($ now ) ? $ expiry > $ now : \strlen ($ expiry ) < \strlen ($ now )) {
157
+ if (!$ this ->rawCommand ('ZADD ' , 'NX ' , $ expiry , $ queuedMessage )) {
158
+ throw new TransportException ('Could not add a message to the redis stream. ' );
159
+ }
158
160
159
- foreach ($ queuedMessages as $ queuedMessage => $ time ) {
160
- $ queuedMessage = json_decode ($ queuedMessage , true );
161
- // if a futured placed message is actually popped because of a race condition with
162
- // another running message consumer, the message is readded to the queue by add function
163
- // else its just added stream and will be available for all stream consumers
164
- $ this ->add (
165
- $ queuedMessage ['body ' ],
166
- $ queuedMessage ['headers ' ],
167
- $ time - $ this ->getCurrentTimeInMilliseconds ()
168
- );
161
+ continue ;
169
162
}
163
+
164
+ $ queuedMessage = json_decode ($ queuedMessage , true );
165
+ // if a futured placed message is actually popped because of a race condition with
166
+ // another running message consumer, the message is readded to the queue by add function
167
+ // else its just added stream and will be available for all stream consumers
168
+ $ this ->add (
169
+ $ queuedMessage ['body ' ],
170
+ $ queuedMessage ['headers ' ],
171
+ 0
172
+ );
170
173
}
171
174
}
172
175
@@ -255,7 +258,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
255
258
}
256
259
257
260
try {
258
- if ($ delayInMs > 0 ) { // the delay could be smaller 0 in a queued message
261
+ if ($ delayInMs > 0 ) { // the delay is <= 0 for queued messages
259
262
$ message = json_encode ([
260
263
'body ' => $ body ,
261
264
'headers ' => $ headers ,
@@ -267,8 +270,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
267
270
throw new TransportException (json_last_error_msg ());
268
271
}
269
272
270
- $ score = $ this ->getCurrentTimeInMilliseconds () + $ delayInMs ;
271
- $ added = $ this ->connection ->zadd ($ this ->queue , ['NX ' ], $ score , $ message );
273
+ $ now = explode (' ' , microtime (), 2 );
274
+ $ now [0 ] = str_pad ($ delayInMs + substr ($ now [0 ], 2 , 3 ), 3 , '0 ' , \STR_PAD_LEFT );
275
+ if (3 < \strlen ($ now [0 ])) {
276
+ $ now [1 ] += substr ($ now [0 ], 0 , -3 );
277
+ $ now [0 ] = substr ($ now [0 ], -3 );
278
+
279
+ if (\is_float ($ now [1 ])) {
280
+ throw new TransportException ("Message delay is too big: {$ delayInMs }ms. " );
281
+ }
282
+ }
283
+
284
+ $ added = $ this ->rawCommand ('ZADD ' , 'NX ' , $ now [1 ].$ now [0 ], $ message );
272
285
} else {
273
286
$ message = json_encode ([
274
287
'body ' => $ body ,
@@ -316,14 +329,30 @@ public function setup(): void
316
329
$ this ->autoSetup = false ;
317
330
}
318
331
319
- private function getCurrentTimeInMilliseconds (): int
320
- {
321
- return (int ) (microtime (true ) * 1000 );
322
- }
323
-
324
332
public function cleanup (): void
325
333
{
326
334
$ this ->connection ->del ($ this ->stream );
327
335
$ this ->connection ->del ($ this ->queue );
328
336
}
337
+
338
+ /**
339
+ * @return mixed
340
+ */
341
+ private function rawCommand (string $ command , ...$ arguments )
342
+ {
343
+ try {
344
+ $ result = $ this ->connection ->rawCommand ($ command , $ this ->queue , ...$ arguments );
345
+ } catch (\RedisException $ e ) {
346
+ throw new TransportException ($ e ->getMessage (), 0 , $ e );
347
+ }
348
+
349
+ if (false === $ result ) {
350
+ if ($ error = $ this ->connection ->getLastError () ?: null ) {
351
+ $ this ->connection ->clearLastError ();
352
+ }
353
+ throw new TransportException ($ error ?? sprintf ('Could not run "%s" on Redis queue. ' , $ command ));
354
+ }
355
+
356
+ return $ result ;
357
+ }
329
358
}
0 commit comments