@@ -64,6 +64,26 @@ async def test_read_and_commit_with_ack(self, driver, topic_with_messages, topic
64
64
65
65
assert message != batch .messages [0 ]
66
66
67
+ async def test_commit_offset_works (self , driver , topic_with_messages , topic_consumer ):
68
+ for out in ["123" , "456" , "789" , "0" ]:
69
+ async with driver .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
70
+ message = await reader .receive_message ()
71
+ assert message .data .decode () == out
72
+
73
+ await driver .topic_client .commit_offset (
74
+ topic_with_messages , topic_consumer , message .partition_id , message .offset + 1
75
+ )
76
+
77
+ async def test_reader_reconnect_after_commit_offset (self , driver , topic_with_messages , topic_consumer ):
78
+ async with driver .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
79
+ for out in ["123" , "456" , "789" , "0" ]:
80
+ message = await reader .receive_message ()
81
+ assert message .data .decode () == out
82
+
83
+ await driver .topic_client .commit_offset (
84
+ topic_with_messages , topic_consumer , message .partition_id , message .offset + 1
85
+ )
86
+
67
87
async def test_read_compressed_messages (self , driver , topic_path , topic_consumer ):
68
88
async with driver .topic_client .writer (topic_path , codec = ydb .TopicCodec .GZIP ) as writer :
69
89
await writer .write ("123" )
@@ -183,6 +203,26 @@ def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_
183
203
184
204
assert message != batch .messages [0 ]
185
205
206
+ def test_commit_offset_works (self , driver_sync , topic_with_messages , topic_consumer ):
207
+ for out in ["123" , "456" , "789" , "0" ]:
208
+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
209
+ message = reader .receive_message ()
210
+ assert message .data .decode () == out
211
+
212
+ driver_sync .topic_client .commit_offset (
213
+ topic_with_messages , topic_consumer , message .partition_id , message .offset + 1
214
+ )
215
+
216
+ def test_reader_reconnect_after_commit_offset (self , driver_sync , topic_with_messages , topic_consumer ):
217
+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
218
+ for out in ["123" , "456" , "789" , "0" ]:
219
+ message = reader .receive_message ()
220
+ assert message .data .decode () == out
221
+
222
+ driver_sync .topic_client .commit_offset (
223
+ topic_with_messages , topic_consumer , message .partition_id , message .offset + 1
224
+ )
225
+
186
226
def test_read_compressed_messages (self , driver_sync , topic_path , topic_consumer ):
187
227
with driver_sync .topic_client .writer (topic_path , codec = ydb .TopicCodec .GZIP ) as writer :
188
228
writer .write ("123" )
0 commit comments