Skip to content

SyncWriter and flush problem throwing Exception #475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
ekuvardin opened this issue May 5, 2025 · 0 comments
Open

SyncWriter and flush problem throwing Exception #475

ekuvardin opened this issue May 5, 2025 · 0 comments

Comments

@ekuvardin
Copy link
Contributor

When writing to topic code may produce a RuntimeException wraped in CompletionException.

For example, writing with codec = 10000 or 7.


 WriterSettings settings = WriterSettings.newBuilder()
                .setTopicPath(topicName)
                .setCodec(7)
                .build();
        SyncWriter writer = client.createSyncWriter(settings);
         writer.initAndWait();


        for (byte[] testMessage : testMessages) {
            writer.send(Message.newBuilder().setData(testMessage).build());
        }

        writer.flush();
        writer.shutdown(1, TimeUnit.MINUTES);

writer.flush(); - is the cause of our problem
Correct behavior—work without exception because we wait for all jobs to be completed and return results;

But if you machine is too quick - you will get CompletionException

Spoiler warning java.util.concurrent.CompletionException: java.lang.RuntimeException: Unsupported codec: 7 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1621) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_44] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_44] Caused by: java.lang.RuntimeException: Unsupported codec: 7 at tech.ydb.topic.utils.Encoder.getCodec(Encoder.java:96) ~[classes/:?] at tech.ydb.topic.utils.Encoder.makeOutputStream(Encoder.java:80) ~[classes/:?] at tech.ydb.topic.utils.Encoder.encode(Encoder.java:41) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.encode(WriterImpl.java:195) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.lambda$acceptMessageIntoSendingQueue$0(WriterImpl.java:176) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl$$Lambda$189/2108708444.run(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1618) ~[?:1.8.0_44] ... 4 more 11:15:44.949 [pool-4-thread-2] ERROR tech.ydb.topic.write.impl.WriterImpl - [yNNtF0] Exception while encoding message: java.util.concurrent.CompletionException: java.lang.RuntimeException: Unsupported codec: 7 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1621) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_44] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_44] Caused by: java.lang.RuntimeException: Unsupported codec: 7 at tech.ydb.topic.utils.Encoder.getCodec(Encoder.java:96) ~[classes/:?] at tech.ydb.topic.utils.Encoder.makeOutputStream(Encoder.java:80) ~[classes/:?] at tech.ydb.topic.utils.Encoder.encode(Encoder.java:41) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.encode(WriterImpl.java:195) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.lambda$acceptMessageIntoSendingQueue$0(WriterImpl.java:176) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl$$Lambda$189/2108708444.run(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1618) ~[?:1.8.0_44] ... 4 more 11:15:44.950 [pool-4-thread-5] ERROR tech.ydb.topic.write.impl.WriterImpl - [yNNtF0] Exception while encoding message: java.util.concurrent.CompletionException: java.lang.RuntimeException: Unsupported codec: 7 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1621) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_44] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_44] Caused by: java.lang.RuntimeException: Unsupported codec: 7 at tech.ydb.topic.utils.Encoder.getCodec(Encoder.java:96) ~[classes/:?] at tech.ydb.topic.utils.Encoder.makeOutputStream(Encoder.java:80) ~[classes/:?] at tech.ydb.topic.utils.Encoder.encode(Encoder.java:41) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.encode(WriterImpl.java:195) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.lambda$acceptMessageIntoSendingQueue$0(WriterImpl.java:176) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl$$Lambda$189/2108708444.run(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1618) ~[?:1.8.0_44] ... 4 more 11:15:44.950 [pool-4-thread-3] ERROR tech.ydb.topic.write.impl.WriterImpl - [yNNtF0] Exception while encoding message: java.util.concurrent.CompletionException: java.lang.RuntimeException: Unsupported codec: 7 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1621) ~[?:1.8.0_44] at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_44] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_44] at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_44] Caused by: java.lang.RuntimeException: Unsupported codec: 7 at tech.ydb.topic.utils.Encoder.getCodec(Encoder.java:96) ~[classes/:?] at tech.ydb.topic.utils.Encoder.makeOutputStream(Encoder.java:80) ~[classes/:?] at tech.ydb.topic.utils.Encoder.encode(Encoder.java:41) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.encode(WriterImpl.java:195) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl.lambda$acceptMessageIntoSendingQueue$0(WriterImpl.java:176) ~[classes/:?] at tech.ydb.topic.write.impl.WriterImpl$$Lambda$189/2108708444.run(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1618) ~[?:1.8.0_44] ... 4 more

After some research I can make test work more stable without throwing Exception(May be I was lucking). It's just a guess and should be checked:

  1. WriterImpl - make lastAcceptedMessageFuture volatile. (Seems it's changed in different threads)
  2. WriterImpl - acceptMessageIntoSendingQueue should return CompletableFuture
  3. WriterImpl - tryToEnqueue should return acceptMessageIntoSendingQueue(message);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant