Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@PollScope - specialized @ThreadLocal beans to refresh each poll #1041

Draft
wants to merge 7 commits into
base: 5.5.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ private void pollAndProcessRecords() {
Argument<?> lastArgument = info.method.getArguments()[info.method.getArguments().length - 1];
boundArguments.put(lastArgument, null);
}

kafkaConsumerProcessor.refreshPollScopeBeans();

processRecords(consumerRecords, currentOffsets);
if (failed) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@
import io.micronaut.configuration.kafka.exceptions.KafkaListenerException;
import io.micronaut.configuration.kafka.exceptions.KafkaListenerExceptionHandler;
import io.micronaut.configuration.kafka.retry.ConditionalRetryBehaviourHandler;
import io.micronaut.configuration.kafka.scope.PollScope;
import io.micronaut.configuration.kafka.seek.KafkaSeeker;
import io.micronaut.configuration.kafka.serde.SerdeRegistry;
import io.micronaut.context.BeanContext;
import io.micronaut.context.BeanRegistration;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
Expand Down Expand Up @@ -378,6 +381,18 @@ BatchConsumerRecordsBinderRegistry getBatchBinderRegistry() {
return batchBinderRegistry;
}


/**
* TODO Will this be thread safe and only get the beans for the consumer/consumer state we are calling it from?
* what if one listener is subscribed to multiple topics? how does that work? really any method annotated with @Topic should have it's own
* bean and have just that one refreshed? Or if there are multiple records for topics fetched, it calls to process all of them individually and waits
* for all to finish before polling any topics again? In that case this would be fine?
*/
public void refreshPollScopeBeans() {
Collection<BeanRegistration<Object>> beans = beanContext.getBeanRegistrations(Object.class, Qualifiers.byAnnotation(AnnotationMetadata.EMPTY_METADATA, PollScope.class));
beans.forEach(beanContext::refreshBean);
}

@SuppressWarnings("rawtypes")
private AbstractKafkaConsumerConfiguration getConsumerConfigurationDefaults(String groupId) {
return findConfigurationBean(groupId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.scope;
import io.micronaut.runtime.context.scope.ThreadLocal;
/**
* Extension of {@link io.micronaut.runtime.context.scope.ThreadLocal} that recreates / refreshed the bean for every consumer poll cycle, similar to {@link io.micronaut.runtime.http.scope.RequestScope} but for consumers.
* @author Haiden Rothwell
* @since 5.5.0
*/
@ThreadLocal
public @interface PollScope {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io.micronaut.configuration.kafka.scope

import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import io.micronaut.configuration.kafka.AbstractKafkaContainerSpec
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.context.annotation.Requires
import io.micronaut.serde.annotation.Serdeable
import jakarta.inject.Inject
import jakarta.inject.Singleton
import spock.lang.Retry

import static io.micronaut.configuration.kafka.config.AbstractKafkaConfiguration.EMBEDDED_TOPICS

@Retry
class PollScopeSpec extends AbstractKafkaContainerSpec {

public static final String BOOKS_TOPIC = 'PollScopeSpec-books'

protected Map<String, Object> getConfiguration() {
super.configuration +
[(EMBEDDED_TOPICS): [BOOKS_TOPIC]]
}

void "PollScope beans are injected per kafka consumer and updated before processing records"() {
given:
MyClient myClient = context.getBean(MyClient)
MyListener myListener = context.getBean(MyListener)
MyListener2 myListener2 = context.getBean(MyListener2)

when:
(0..10).forEach {
myClient.sendBook(new Book(title: 'title'))
}

then:
conditions.eventually {
myListener.seenNumbers.size() == 20
myListener2.seenNumbers.size() == 20
}
(myListener.seenNumbers.unique() + myListener2.seenNumbers.unique()).size() == 20
}

@Requires(property = 'spec.name', value = 'PollScopeSpec')
@PollScope
static class PollClass{
Double random = Math.random()
}

@Requires(property = 'spec.name', value = 'PollScopeSpec')
@KafkaListener(batch = false, offsetReset = OffsetReset.EARLIEST)
static class MyListener {

@Inject
PollClass pollClass

@Inject
MyService myService

List<Double> seenNumbers = []

@Topic(PollScopeSpec.BOOKS_TOPIC)
void receiveBook(Book book) {
seenNumbers.add(pollClass.random)
seenNumbers.add(myService.getPollClassValue())
}
}

@Requires(property = 'spec.name', value = 'PollScopeSpec')
@KafkaListener(batch = false, offsetReset = OffsetReset.EARLIEST)
static class MyListener2 {

@Inject
PollClass pollClass

@Inject
MyService myService

List<Double> seenNumbers = []

@Topic(PollScopeSpec.BOOKS_TOPIC)
void receiveBook(Book book) {
seenNumbers.add(pollClass.random)
seenNumbers.add(myService.getPollClassValue())
}
}

@Requires(property = 'spec.name', value = 'PollScopeSpec')
@Singleton
static class MyService {
@Inject
PollClass pollClass

Double getPollClassValue(){
return pollClass.random
}
}

@Requires(property = 'spec.name', value = 'PollScopeSpec')
@KafkaClient
static interface MyClient {
@Topic(PollScopeSpec.BOOKS_TOPIC)
void sendBook(Book book)
}

@ToString(includePackage = false)
@EqualsAndHashCode
@Serdeable
static class Book {
String title
}
}