27
27
*/
28
28
class Connection
29
29
{
30
+ private const DEFAULT_OPTIONS = [
31
+ 'stream ' => 'messages ' ,
32
+ 'group ' => 'symfony ' ,
33
+ 'consumer ' => 'consumer ' ,
34
+ 'auto_setup ' => true ,
35
+ ];
36
+
30
37
private $ connection ;
31
38
private $ stream ;
32
39
private $ group ;
33
40
private $ consumer ;
41
+ private $ autoSetup ;
34
42
private $ couldHavePendingMessages = true ;
35
43
36
44
public function __construct (array $ configuration , array $ connectionCredentials = [], array $ redisOptions = [], \Redis $ redis = null )
37
45
{
38
46
$ this ->connection = $ redis ?: new \Redis ();
39
47
$ this ->connection ->connect ($ connectionCredentials ['host ' ] ?? '127.0.0.1 ' , $ connectionCredentials ['port ' ] ?? 6379 );
40
48
$ this ->connection ->setOption (\Redis::OPT_SERIALIZER , $ redisOptions ['serializer ' ] ?? \Redis::SERIALIZER_PHP );
41
- $ this ->stream = $ configuration ['stream ' ] ?? '' ?: 'messages ' ;
42
- $ this ->group = $ configuration ['group ' ] ?? '' ?: 'symfony ' ;
43
- $ this ->consumer = $ configuration ['consumer ' ] ?? '' ?: 'consumer ' ;
49
+ $ this ->stream = $ configuration ['stream ' ] ?? self ::DEFAULT_OPTIONS ['stream ' ];
50
+ $ this ->group = $ configuration ['group ' ] ?? self ::DEFAULT_OPTIONS ['group ' ];
51
+ $ this ->consumer = $ configuration ['consumer ' ] ?? self ::DEFAULT_OPTIONS ['consumer ' ];
52
+ $ this ->autoSetup = $ configuration ['auto_setup ' ] ?? self ::DEFAULT_OPTIONS ['auto_setup ' ];
44
53
}
45
54
46
55
public static function fromDsn (string $ dsn , array $ redisOptions = [], \Redis $ redis = null ): self
@@ -51,9 +60,9 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
51
60
52
61
$ pathParts = explode ('/ ' , $ parsedUrl ['path ' ] ?? '' );
53
62
54
- $ stream = $ pathParts [1 ] ?? '' ;
55
- $ group = $ pathParts [2 ] ?? '' ;
56
- $ consumer = $ pathParts [3 ] ?? '' ;
63
+ $ stream = $ pathParts [1 ] ?? null ;
64
+ $ group = $ pathParts [2 ] ?? null ;
65
+ $ consumer = $ pathParts [3 ] ?? null ;
57
66
58
67
$ connectionCredentials = [
59
68
'host ' => $ parsedUrl ['host ' ] ?? '127.0.0.1 ' ,
@@ -64,11 +73,21 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
64
73
parse_str ($ parsedUrl ['query ' ], $ redisOptions );
65
74
}
66
75
67
- return new self (['stream ' => $ stream , 'group ' => $ group , 'consumer ' => $ consumer ], $ connectionCredentials , $ redisOptions , $ redis );
76
+ $ autoSetup = null ;
77
+ if (\array_key_exists ('auto_setup ' , $ redisOptions )) {
78
+ $ autoSetup = filter_var ($ redisOptions ['auto_setup ' ], FILTER_VALIDATE_BOOLEAN );
79
+ unset($ redisOptions ['auto_setup ' ]);
80
+ }
81
+
82
+ return new self (['stream ' => $ stream , 'group ' => $ group , 'consumer ' => $ consumer , 'auto_setup ' => $ autoSetup ], $ connectionCredentials , $ redisOptions , $ redis );
68
83
}
69
84
70
85
public function get (): ?array
71
86
{
87
+ if ($ this ->autoSetup ) {
88
+ $ this ->setup ();
89
+ }
90
+
72
91
$ messageId = '> ' ; // will receive new messages
73
92
74
93
if ($ this ->couldHavePendingMessages ) {
@@ -141,6 +160,10 @@ public function reject(string $id): void
141
160
142
161
public function add (string $ body , array $ headers ): void
143
162
{
163
+ if ($ this ->autoSetup ) {
164
+ $ this ->setup ();
165
+ }
166
+
144
167
$ e = null ;
145
168
try {
146
169
$ added = $ this ->connection ->xadd ($ this ->stream , '* ' , ['message ' => json_encode (
@@ -161,5 +184,7 @@ public function setup(): void
161
184
} catch (\RedisException $ e ) {
162
185
throw new TransportException ($ e ->getMessage (), 0 , $ e );
163
186
}
187
+
188
+ $ this ->autoSetup = false ;
164
189
}
165
190
}
0 commit comments