@@ -114,12 +114,15 @@ public function get(): ?array
114
114
1
115
115
);
116
116
} catch (\RedisException $ e ) {
117
+ throw new TransportException ($ e ->getMessage (), 0 , $ e );
117
118
}
118
119
119
- if ($ e || false === $ messages ) {
120
- throw new TransportException (
121
- ($ e ? $ e ->getMessage () : $ this ->connection ->getLastError ()) ?? 'Could not read messages from the redis stream. '
122
- );
120
+ if (false === $ messages ) {
121
+ if ($ error = $ this ->connection ->getLastError () ?: null ) {
122
+ $ this ->connection ->clearLastError ();
123
+ }
124
+
125
+ throw new TransportException ($ error ?? 'Could not read messages from the redis stream. ' );
123
126
}
124
127
125
128
if ($ this ->couldHavePendingMessages && empty ($ messages [$ this ->stream ])) {
@@ -144,28 +147,34 @@ public function get(): ?array
144
147
145
148
public function ack (string $ id ): void
146
149
{
147
- $ e = null ;
148
150
try {
149
151
$ acknowledged = $ this ->connection ->xack ($ this ->stream , $ this ->group , [$ id ]);
150
152
} catch (\RedisException $ e ) {
153
+ throw new TransportException ($ e ->getMessage (), 0 , $ e );
151
154
}
152
155
153
- if ($ e || !$ acknowledged ) {
154
- throw new TransportException (($ e ? $ e ->getMessage () : $ this ->connection ->getLastError ()) ?? sprintf ('Could not acknowledge redis message "%s". ' , $ id ), 0 , $ e );
156
+ if (!$ acknowledged ) {
157
+ if ($ error = $ this ->connection ->getLastError () ?: null ) {
158
+ $ this ->connection ->clearLastError ();
159
+ }
160
+ throw new TransportException ($ error ?? sprintf ('Could not acknowledge redis message "%s". ' , $ id ));
155
161
}
156
162
}
157
163
158
164
public function reject (string $ id ): void
159
165
{
160
- $ e = null ;
161
166
try {
162
167
$ deleted = $ this ->connection ->xack ($ this ->stream , $ this ->group , [$ id ]);
163
168
$ deleted = $ this ->connection ->xdel ($ this ->stream , [$ id ]) && $ deleted ;
164
169
} catch (\RedisException $ e ) {
170
+ throw new TransportException ($ e ->getMessage (), 0 , $ e );
165
171
}
166
172
167
- if ($ e || !$ deleted ) {
168
- throw new TransportException (($ e ? $ e ->getMessage () : $ this ->connection ->getLastError ()) ?? sprintf ('Could not delete message "%s" from the redis stream. ' , $ id ), 0 , $ e );
173
+ if (!$ deleted ) {
174
+ if ($ error = $ this ->connection ->getLastError () ?: null ) {
175
+ $ this ->connection ->clearLastError ();
176
+ }
177
+ throw new TransportException ($ error ?? sprintf ('Could not delete message "%s" from the redis stream. ' , $ id ));
169
178
}
170
179
}
171
180
@@ -175,16 +184,19 @@ public function add(string $body, array $headers): void
175
184
$ this ->setup ();
176
185
}
177
186
178
- $ e = null ;
179
187
try {
180
188
$ added = $ this ->connection ->xadd ($ this ->stream , '* ' , ['message ' => json_encode (
181
189
['body ' => $ body , 'headers ' => $ headers ]
182
190
)]);
183
191
} catch (\RedisException $ e ) {
192
+ throw new TransportException ($ e ->getMessage (), 0 , $ e );
184
193
}
185
194
186
- if ($ e || !$ added ) {
187
- throw new TransportException (($ e ? $ e ->getMessage () : $ this ->connection ->getLastError ()) ?? 'Could not add a message to the redis stream. ' , 0 , $ e );
195
+ if (!$ added ) {
196
+ if ($ error = $ this ->connection ->getLastError () ?: null ) {
197
+ $ this ->connection ->clearLastError ();
198
+ }
199
+ throw new TransportException ($ error ?? 'Could not add a message to the redis stream. ' );
188
200
}
189
201
}
190
202
@@ -196,6 +208,11 @@ public function setup(): void
196
208
throw new TransportException ($ e ->getMessage (), 0 , $ e );
197
209
}
198
210
211
+ // group might already exist, ignore
212
+ if ($ this ->connection ->getLastError ()) {
213
+ $ this ->connection ->clearLastError ();
214
+ }
215
+
199
216
$ this ->autoSetup = false ;
200
217
}
201
218
}
0 commit comments