@@ -141,32 +141,31 @@ public function get(): ?array
141141 if ($ this ->autoSetup ) {
142142 $ this ->setup ();
143143 }
144+ $ now = microtime ();
145+ $ now = substr ($ now , 11 ).substr ($ now , 2 , 3 );
144146
145- try {
146- $ queuedMessageCount = $ this ->connection ->zcount ($ this ->queue , 0 , $ this ->getCurrentTimeInMilliseconds ());
147- } catch (\RedisException $ e ) {
148- throw new TransportException ($ e ->getMessage (), 0 , $ e );
149- }
147+ if ($ queuedMessageCount = $ this ->rawCommand ('ZCOUNT ' , 0 , $ now )) {
148+ $ queuedMessages = $ this ->rawCommand ('ZPOPMIN ' , $ queuedMessageCount );
149+ $ queuedMessageCount = \count ($ queuedMessages );
150+ $ i = 0 ;
150151
151- if ($ queuedMessageCount ) {
152- for ($ i = 0 ; $ i < $ queuedMessageCount ; ++$ i ) {
153- try {
154- $ queuedMessages = $ this ->connection ->zpopmin ($ this ->queue , 1 );
155- } catch (\RedisException $ e ) {
156- throw new TransportException ($ e ->getMessage (), 0 , $ e );
157- }
152+ while ($ i < $ queuedMessageCount ) {
153+ $ queuedMessage = $ queuedMessages [$ i ++];
154+ $ expiry = $ queuedMessages [$ i ++];
155+
156+ if (\strlen ($ expiry ) === \strlen ($ now ) ? $ expiry > $ now : \strlen ($ expiry ) < \strlen ($ now )) {
157+ if (!$ this ->rawCommand ('ZADD ' , 'NX ' , $ expiry , $ queuedMessage )) {
158+ throw new TransportException ('Could not add a message to the redis stream. ' );
159+ }
158160
159- foreach ($ queuedMessages as $ queuedMessage => $ time ) {
160- $ queuedMessage = json_decode ($ queuedMessage , true );
161- // if a futured placed message is actually popped because of a race condition with
162- // another running message consumer, the message is readded to the queue by add function
163- // else its just added stream and will be available for all stream consumers
164- $ this ->add (
165- $ queuedMessage ['body ' ],
166- $ queuedMessage ['headers ' ],
167- $ time - $ this ->getCurrentTimeInMilliseconds ()
168- );
161+ continue ;
169162 }
163+
164+ $ queuedMessage = json_decode ($ queuedMessage , true );
165+ // if a futured placed message is actually popped because of a race condition with
166+ // another running message consumer, the message is readded to the queue by add function
167+ // else its just added stream and will be available for all stream consumers
168+ $ this ->add ($ queuedMessage ['body ' ], $ queuedMessage ['headers ' ], 0 );
170169 }
171170 }
172171
@@ -255,7 +254,7 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
255254 }
256255
257256 try {
258- if ($ delayInMs > 0 ) { // the delay could be smaller 0 in a queued message
257+ if ($ delayInMs > 0 ) { // the delay is <= 0 for queued messages
259258 $ message = json_encode ([
260259 'body ' => $ body ,
261260 'headers ' => $ headers ,
@@ -267,8 +266,18 @@ public function add(string $body, array $headers, int $delayInMs = 0): void
267266 throw new TransportException (json_last_error_msg ());
268267 }
269268
270- $ score = $ this ->getCurrentTimeInMilliseconds () + $ delayInMs ;
271- $ added = $ this ->connection ->zadd ($ this ->queue , ['NX ' ], $ score , $ message );
269+ $ now = explode (' ' , microtime (), 2 );
270+ $ now [0 ] = str_pad ($ delayInMs + substr ($ now [0 ], 2 , 3 ), 3 , '0 ' , \STR_PAD_LEFT );
271+ if (3 < \strlen ($ now [0 ])) {
272+ $ now [1 ] += substr ($ now [0 ], 0 , -3 );
273+ $ now [0 ] = substr ($ now [0 ], -3 );
274+
275+ if (\is_float ($ now [1 ])) {
276+ throw new TransportException ("Message delay is too big: {$ delayInMs }ms. " );
277+ }
278+ }
279+
280+ $ added = $ this ->rawCommand ('ZADD ' , 'NX ' , $ now [1 ].$ now [0 ], $ message );
272281 } else {
273282 $ message = json_encode ([
274283 'body ' => $ body ,
@@ -316,14 +325,30 @@ public function setup(): void
316325 $ this ->autoSetup = false ;
317326 }
318327
319- private function getCurrentTimeInMilliseconds (): int
320- {
321- return (int ) (microtime (true ) * 1000 );
322- }
323-
324328 public function cleanup (): void
325329 {
326330 $ this ->connection ->del ($ this ->stream );
327331 $ this ->connection ->del ($ this ->queue );
328332 }
333+
334+ /**
335+ * @return mixed
336+ */
337+ private function rawCommand (string $ command , ...$ arguments )
338+ {
339+ try {
340+ $ result = $ this ->connection ->rawCommand ($ command , $ this ->queue , ...$ arguments );
341+ } catch (\RedisException $ e ) {
342+ throw new TransportException ($ e ->getMessage (), 0 , $ e );
343+ }
344+
345+ if (false === $ result ) {
346+ if ($ error = $ this ->connection ->getLastError () ?: null ) {
347+ $ this ->connection ->clearLastError ();
348+ }
349+ throw new TransportException ($ error ?? sprintf ('Could not run "%s" on Redis queue. ' , $ command ));
350+ }
351+
352+ return $ result ;
353+ }
329354}
0 commit comments