8000 [kafka][doc][skip ci] Add docs on how to change offset. · enumag/enqueue-dev@3c1fd49 · GitHub
[go: up one dir, main page]

Skip to content

Commit 3c1fd49

Browse files
committed
[kafka][doc][skip ci] Add docs on how to change offset.
1 parent eded88e commit 3c1fd49

File tree

1 file changed

+21
-0
lines changed

1 file changed

+21
-0
lines changed

docs/transport/kafka.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ
88
* [Send message to queue](#send-message-to-queue)
99
* [Consume message](#consume-message)
1010
* [Serialize message](#serialize-message)
11+
* [Chnage offset](#change-offset)
1112

1213
## Installation
1314

@@ -84,6 +85,9 @@ $fooQueue = $psrContext->createQueue('foo');
8485

8586
$consumer = $psrContext->createConsumer($fooQueue);
8687

88+
// Enable async commit to gain better performance.
89+
//$consumer->setCommitAsync(true);
90+
8791
$message = $consumer->receive();
8892

8993
// process a message
@@ -115,4 +119,21 @@ class FooSerializer implements Serializer
115119
$psrContext->setSerializer(new FooSerializer());
116120
```
117121

122+
## Change offset
123+
124+
By default consumers starts from the beginning of the topic and updates the offset while you are processing messages.
125+
There is an ability to change the current offset.
126+
127+
```php
128+
<?php
129+
/** @var \Enqueue\RdKafka\RdKafkaContext $psrContext */
130+
131+
$fooQueue = $psrContext->createQueue('foo');
132+
133+
$consumer = $psrContext->createConsumer($fooQueue);
134+
$consumer->setOffset(123);
135+
136+
$message = $consumer->receive(2000);
137+
```
138+
118139
[back to index](index.md)

0 commit comments

Comments
 (0)
0