8000 feature #35347 [VarDumper] Add a RdKafka caster (romainneutron) · symfony/symfony@8e22719 · GitHub
[go: up one dir, main page]

Skip to content

Commit 8e22719

Browse files
feature #35347 [VarDumper] Add a RdKafka caster (romainneutron)
This PR was merged into the 5.1-dev branch. Discussion ---------- [VarDumper] Add a RdKafka caster | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | N/A | License | MIT | Doc PR | N/A Hello, This is the very beginning of this new feature. First, I'd like to know if there's a no-go for this feature so I won't waste my time. Then, if you are a RdKafka expert, I'd take your comments with much pleasure sincer I'm a noob in this technology (and that's why I'm implementing this caster 🤓) Tests need to be added once the implementation will be a bit more complete. Commits ------- 6cd1235 Add a RdKafka caster to Var-Dumper
2 parents c6d0a2a + 6cd1235 commit 8e22719

File tree

5 files changed

+512
-0
lines changed

5 files changed

+512
-0
lines changed

.travis.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,20 @@ before_install:
6262
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4
6363
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
6464
65+
- |
66+
# Start Kafka and install an up-to-date librdkafka
67+
docker network create kafka_network
68+
docker pull wurstmeister/zookeeper:3.4.6
69+
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6
70+
docker pull wurstmeister/kafka:2.12-2.3.1
71+
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=false" -e "KAFKA_CREATE_TOPICS=test-topic:1:1:compact" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.12-2.3.1
72+
export KAFKA_BROKER=kafka:9092
73+
sudo sh -c 'echo "\n127.0.0.1 kafka\n" >> /etc/hosts'
74+
75+
mkdir /tmp/librdkafka
76+
curl https://codeload.github.com/edenhill/librdkafka/tar.gz/v0.11.6 | tar xzf - -C /tmp/librdkafka
77+
(cd /tmp/librdkafka/librdkafka-0.11.6 && ./configure && make && sudo make install)
78+
6579
- |
6680
# General configuration
6781
set -e
@@ -175,6 +189,7 @@ before_install:
175189
tfold ext.igbinary tpecl igbinary-3.1.2 igbinary.so $INI
176190
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
177191
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
192+
tfold ext.rdkafka tpecl rdkafka-4.0.2 rdkafka.so $INI
178193
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
179194
done
180195
- |

src/Symfony/Component/VarDumper/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
5.1.0
5+
-----
6+
7+
* added `RdKafka` support
8+
49
4.4.0
510
-----
611

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\VarDumper\Caster;
13+
14+
use RdKafka;
15+
use RdKafka\Conf;
16+
use RdKafka\Exception as RdKafkaException;
17+
use RdKafka\KafkaConsumer;
18+
use RdKafka\Message;
19+
use RdKafka\Metadata\Broker as BrokerMetadata;
20+
use RdKafka\Metadata\Collection as CollectionMetadata;
21+
use RdKafka\Metadata\Partition as PartitionMetadata;
22+
use RdKafka\Metadata\Topic as TopicMetadata;
23+
use RdKafka\Topic;
24+
use RdKafka\TopicConf;
25+
use RdKafka\TopicPartition;
26+
use Symfony\Component\VarDumper\Cloner\Stub;
27+
28+
/**
29+
* Casts RdKafka related classes to array representation.
30+
*
31+
* @author Romain Neutron <imprec@gmail.com>
32+
*/
33+
class RdKafkaCaster
34+
{
35+
public static function castKafkaConsumer(KafkaConsumer $c, array $a, Stub $stub, $isNested)
36+
{
37+
$prefix = Caster::PREFIX_VIRTUAL;
38+
39+
try {
40+
$assignment = $c->getAssignment();
41+
} catch (RdKafkaException $e) {
42+
$assignment = [];
43+
}
44+
45+
$a += [
46+
$prefix.'subscription' => $c->getSubscription(),
47+
$prefix.'assignment' => $assignment,
48+
];
49+
50+
$a += self::extractMetadata($c);
51+
52+
return $a;
53+
}
54+
55+
public static function castTopic(Topic $c, array $a, Stub $stub, $isNested)
56+
{
57+
$prefix = Caster::PREFIX_VIRTUAL;
58+
59+
$a += [
60+
$prefix.'name' => $c->getName(),
61+
];
62+
63+
return $a;
64+
}
65+
66+
public static function castTopicPartition(TopicPartition $c, array $a)
67+
{
68+
$prefix = Caster::PREFIX_VIRTUAL;
69+
70+
$a += [
71+
$prefix.'offset' => $c->getOffset(),
72+
$prefix.'partition' => $c->getPartition(),
73+
$prefix.'topic' => $c->getTopic(),
74+
];
75+
76+
return $a;
77+
}
78+
79+
public static function castMessage(Message $c, array $a, Stub $stub, $isNested)
80+
{
81+
$prefix = Caster::PREFIX_VIRTUAL;
82+
83+
$a += [
84+
$prefix.'errstr' => $c->errstr(),
85+
];
86+
87+
return $a;
88+
}
89+
90+
public static function castConf(Conf $c, array $a, Stub $stub, $isNested)
91+
{
92+
$prefix = Caster::PREFIX_VIRTUAL;
93+
94+
foreach ($c->dump() as $key => $value) {
95+
$a[$prefix.$key] = $value;
96+
}
97+
98+
return $a;
99+
}
100+
101+
public static function castTopicConf(TopicConf $c, array $a, Stub $stub, $isNested)
102+
{
103+
$prefix = Caster::PREFIX_VIRTUAL;
104+
105+
foreach ($c->dump() as $key => $value) {
106+
$a[$prefix.$key] = $value;
107+
}
108+
109+
return $a;
110+
}
111+
112+
public static function castRdKafka(\RdKafka $c, array $a, Stub $stub, $isNested)
113+
{
114+
$prefix = Caster::PREFIX_VIRTUAL;
115+
116+
$a += [
117+
$prefix.'out_q_len' => $c->getOutQLen(),
118+
];
119+
120+
$a += self::extractMetadata($c);
121+
122+
return $a;
123+
}
124+
125+
public static function castCollectionMetadata(CollectionMetadata $c, array $a, Stub $stub, $isNested)
126+
{
127+
$a += iterator_to_array($c);
128+
129+
return $a;
130+
}
131+
132+
public static function castTopicMetadata(TopicMetadata $c, array $a, Stub $stub, $isNested)
133+
{
134+
$prefix = Caster::PREFIX_VIRTUAL;
135+
136+
$a += [
137+
$prefix.'name' => $c->getTopic(),
138+
$prefix.'partitions' => $c->getPartitions(),
139+
];
140+
141+
return $a;
142+
}
143+
144+
public static function castPartitionMetadata(PartitionMetadata $c, array $a, Stub $stub, $isNested)
145+
{
146+
$prefix = Caster::PREFIX_VIRTUAL;
147+
148+
$a += [
149+
$prefix.'id' => $c->getId(),
150+
$prefix.'err' => $c->getErr(),
151+
$prefix.'leader' => $c->getLeader(),
152+
];
153+
154+
return $a;
155+
}
156+
157+
public static function castBrokerMetadata(BrokerMetadata $c, array $a, Stub $stub, $isNested)
158+
{
159+
$prefix = Caster::PREFIX_VIRTUAL;
160+
161+
$a += [
162+
$prefix.'id' => $c->getId(),
163+
$prefix.'host' => $c->getHost(),
164+
$prefix.'port' => $c->getPort(),
165+
];
166+
167+
return $a;
168+
}
169+
170+
private static function extractMetadata($c)
171+
{
172+
$prefix = Caster::PREFIX_VIRTUAL;
173+
174+
try {
175+
$m = $c->getMetadata(true, null, 500);
176+
} catch (RdKafkaException $e) {
177+
return [];
178+
}
179+
180+
return [
181+
$prefix.'orig_broker_id' => $m->getOrigBrokerId(),
182+
$prefix.'orig_broker_name' => $m->getOrigBrokerName(),
183+
$prefix.'brokers' => $m->getBrokers(),
184+
$prefix.'topics' => $m->getTopics(),
185+
];
186+
}
187+
}

src/Symfony/Component/VarDumper/Cloner/AbstractCloner.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,18 @@ abstract class AbstractCloner implements ClonerInterface
160160
':persistent stream' => ['Symfony\Component\VarDumper\Caster\ResourceCaster', 'castStream'],
161161
':stream-context' => ['Symfony\Component\VarDumper\Caster\ResourceCaster', 'castStreamContext'],
162162
':xml' => ['Symfony\Component\VarDumper\Caster\XmlResourceCaster', 'castXml'],
163+
164+
'RdKafka' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castRdKafka'],
165+
'RdKafka\Conf' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castConf'],
166+
'RdKafka\KafkaConsumer' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castKafkaConsumer'],
167+
'RdKafka\Metadata\Broker' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castBrokerMetadata'],
168+
'RdKafka\Metadata\Collection' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castCollectionMetadata'],
169+
'RdKafka\Metadata\Partition' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castPartitionMetadata'],
170+
'RdKafka\Metadata\Topic' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicMetadata'],
171+
'RdKafka\Message' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castMessage'],
172+
'RdKafka\Topic' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopic'],
173+
'RdKafka\TopicPartition' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicPartition'],
174+
'RdKafka\TopicConf' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicConf'],
163175
];
164176

165177
protected $maxItems = 2500;

0 commit comments

Comments
 (0)
0