8000 Fix bad merge · symfony/symfony@9b197fe · GitHub
[go: up one dir, main page]

Skip to content

Commit 9b197fe

Browse files
committed
Fix bad merge
1 parent 78f4a9f commit 9b197fe

File tree

1 file changed

+0
-175
lines changed

1 file changed

+0
-175
lines changed

src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

Lines changed: 0 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -13,182 +13,7 @@
1313

1414
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection as BridgeConnection;
1515

16-
<<<<<<< HEAD
1716
trigger_deprecation('symfony/messenger', '5.1', 'The "%s" class is deprecated, use "%s" instead. The RedisExt transport has been moved to package "symfony/redis-messenger" and will not be included by default in 6.0. Run "composer require symfony/redis-messenger".', Connection::class, BridgeConnection::class);
18-
=======
19-
/**
20-
* A Redis connection.
21-
*
22-
* @author Alexander Schranz <alexander@sulu.io>
23-
* @author Antoine Bluchet <soyuka@gmail.com>
24-
* @author Robin Chalas <robin.chalas@gmail.com>
25-
*
26-
* @internal
27-
* @final
28-
*/
29-
class Connection
30-
{
31-
private const DEFAULT_OPTIONS = [
10000 32-
'stream' => 'messages',
33-
'group' => 'symfony',
34-
'consumer' => 'consumer',
35-
'auto_setup' => true,
36-
'stream_max_entries' => 0, // any value higher than 0 defines an approximate maximum number of stream entries
37-
'dbindex' => 0,
38-
];
39-
40-
private $connection;
41-
private $stream;
42-
private $queue;
43-
private $group;
44-
private $consumer;
45-
private $autoSetup;
46-
private $maxEntries;
47-
private $couldHavePendingMessages = true;
48-
49-
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
50-
{
51-
if (version_compare(phpversion('redis'), '4.3.0', '<')) {
52-
throw new LogicException('The redis transport requires php-redis 4.3.0 or higher.');
53-
}
54-
55-
$this->connection = $redis ?: new \Redis();
56-
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
57-
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
58-
59-
$auth = $connectionCredentials['auth'] ?? null;
60-
if ('' === $auth) {
61-
$auth = null;
62-
}
63-
64-
if (null !== $auth && !$this->connection->auth($auth)) {
65-
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
66-
}
67-
68-
if (($dbIndex = $configuration['dbindex'] ?? self::DEFAULT_OPTIONS['dbindex']) && !$this->connection->select($dbIndex)) {
69-
throw new InvalidArgumentException('Redis connection failed: '.$redis->getLastError());
70-
}
71-
72-
foreach (['stream', 'group', 'consumer'] as $key) {
73-
if (isset($configuration[$key]) && '' === $configuration[$key]) {
74-
throw new InvalidArgumentException(sprintf('"%s" should be configured, got an empty string.', $key));
75-
}
76-
}
77-
78-
$this->stream = $configuration['stream'] ?? self::DEFAULT_OPTIONS['stream'];
79-
$this->group = $configuration['group'] ?? self::DEFAULT_OPTIONS['group'];
80-
$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];
81-
$this->queue = $this->stream.'__queue';
82-
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
83-
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
84-
}
85-
86-
public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $redis = null): self
87-
{
88-
if (false === $parsedUrl = parse_url($dsn)) {
89-
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
90-
}
91-
92-
$pathParts = explode('/', rtrim($parsedUrl['path'] ?? '', '/'));
93-
94-
$stream = $pathParts[1] ?? $redisOptions['stream'] ?? null;
95-
$group = $pathParts[2] ?? $redisOptions['group'] ?? null;
96-
$consumer = $pathParts[3] ?? $redisOptions['consumer'] ?? null;
97-
98-
$connectionCredentials = [
99-
'host' => $parsedUrl['host'] ?? '127.0.0.1',
100-
'port' => $parsedUrl['port'] ?? 6379,
101-
'auth' => $parsedUrl['pass'] ?? $parsedUrl['user'] ?? null,
102-
];
103-
104-
if (isset($parsedUrl['query'])) {
105-
parse_str($parsedUrl['query'], $redisOptions);
106-
}
107-
108-
$autoSetup = null;
109-
if (\array_key_exists('auto_setup', $redisOptions)) {
110-
$autoSetup = filter_var($redisOptions['auto_setup'], FILTER_VALIDATE_BOOLEAN);
111-
unset($redisOptions['auto_setup']);
112-
}
113-
114-
$maxEntries = null;
115-
if (\array_key_exists('stream_max_entries', $redisOptions)) {
116-
$maxEntries = filter_var($redisOptions['stream_max_entries'], FILTER_VALIDATE_INT);
117-
unset($redisOptions['stream_max_entries']);
118-
}
119-
120-
$dbIndex = null;
121-
if (\array_key_exists('dbindex', $redisOptions)) {
122-
$dbIndex = filter_var($redisOptions['dbindex'], FILTER_VALIDATE_INT);
123-
unset($redisOptions['dbindex']);
124-
}
125-
126-
return new self([
127-
'stream' => $stream,
128-
'group' => $group,
129-
'consumer' => $consumer,
130-
'auto_setup' => $autoSetup,
131-
'stream_max_entries' => $maxEntries,
132-
'dbindex' => $dbIndex,
133-
], $connectionCredentials, $redisOptions, $redis);
134-
}
135-
136-
public function get(): ?array
137-
{
138-
if ($this->autoSetup) {
139-
$this->setup();
140-
}
141-
142-
try {
143-
$queuedMessageCount = $this->connection->zcount($this->queue, 0, $this->getCurrentTimeInMilliseconds());
144-
} catch (\RedisException $e) {
145-
throw new TransportException($e->getMessage(), 0, $e);
146-
}
147-
148-
if ($queuedMessageCount) {
149-
for ($i = 0; $i < $queuedMessageCount; ++$i) {
150-
try {
151-
$queuedMessages = $this->connection->zpopmin($this->queue, 1);
152-
} catch (\RedisException $e) {
153-
throw new TransportException($e->getMessage(), 0, $e);
154-
}
155-
156-
foreach ($queuedMessages as $queuedMessage => $time) {
157-
$queuedMessage = json_decode($queuedMessage, true);
158-
// if a futured placed message is actually popped because of a race condition with
159-
// another running message consumer, the message is readded to the queue by add function
160-
// else its just added stream and will be available for all stream consumers
161-
$this->add(
162-
$queuedMessage['body'],
163-
$queuedMessage['headers'],
164-
$time - $this->getCurrentTimeInMilliseconds()
165-
);
166-
}
167-
}
168-
}
169-
170-
$messageId = '>'; // will receive new messages
171-
172-
if ($this->couldHavePendingMessages) {
173-
$messageId = '0'; // will receive consumers pending messages
174-
}
175-
176-
try {
177-
$messages = $this->connection->xreadgroup(
178-
$this->group,
179-
$this->consumer,
180-
[$this->stream => $messageId],
181-
1
182-
);
183-
} catch (\RedisException $e) {
184-
throw new TransportException($e->getMessage(), 0, $e);
185-
}
186-
187-
if (false === $messages) {
188-
if ($error = $this->connection->getLastError() ?: null) {
189-
$this->connection->clearLastError();
190-
}
191-
>>>>>>> 4.4
19217

19318
class_exists(BridgeConnection::class);
19419

0 commit comments

Comments
 (0)
0