Este projeto apresenta uma implementação de demonstração sobre como poderíamos utilizar mecanismos de retry e DLQ numa arquitetura event driven, utilizando-se do Apache Kafka, Vertx e RxJava.
Temos um sistema de catálogo de filmes, composto basicamente por três aggregates: Person, Catalog e Recommendations. Onde Person é responsável pelos usuários do sistema, Catalog pelos filmes disponíveis e Recommendations por gerir a recomendação de filmes ao usuário de acordo com seus critérios diversos. Cada qual com seu próprio serviço instanciado, com um diferencial em Recommendations, onde teremos dois serviços compondo o aggregate: recommendations-service, o serviço principal onde o ator do nosso caso de uso interage com o sistema, e temos também o recommendations-processor-verticle que seria uma instância dedicada para processamento de recomendações, ele que aplicará a lógica que definirá qual filme será recomendado ao usuário. Esta divisão foi feita a fim de simular algo mais próximo do mundo real, além de agregar mais conteúdo à obra.
Quando um novo usuário é registrado em Person, um evento comunicando sobre o fato que ocorreu no sistema é publicado (PersonRegisteredEvent), e o serviço recommendations-service reage a esse evento, começando a traçar o perfil do usuário para recomendar filmes
Esta implementação utiliza a estratégia descrita como "Reprocessamento em filas separadas" no artigo em que esse projeto se baseou.
Nessa estratégia, quando uma mensagem falha, assume-se que a mesma foi consumida com sucesso, realizando o commit do offset do Kafka Consumer, dessa forma liberando a fila de processamento que irá para a próxima mensagem. A mensagem problemática é republicada em tópicos de retry, onde tem-se um tópico específico para cada tentativa de processamento, então as mensagens irão circular de um tópico para o outro até que sejam processadas corretamente ou a condição de parada for atingida e a mesma seja direcionada para DLQ.
O projeto está organizado da seguinte forma:
commons
- classes utilitárias, conversores de datas, etc.;jdbc
- modulo facilitador para execuções de queries e gerenciamento de transações via vertx-sql-client;vertx-base-server
- abstrações que facilitam o uso do Vertx como servidor http;event
- abstração de eventos e comandos;kafka
- abstração de producers e consumers juntamente com seu mecanismo de retry;person-event
- dados sobre os eventos e tópicos do domínio de Person;recommendations-event
- dados sobre os eventos e tópicos do domínio de Recommendations;recommendations-processor
- abstração para processamento de recomendações de filmes;modules
- multi-module maven project, agregando todos os artefatos citados até então, em um único projeto, a fim de facilitar a containerização do projeto;catalog
- serviço responsável por gerenciar os filmes do sistema;person
- serviço responsável por gerenciar as pessoas do sistema;recommendations
- serviço responsável por gerenciar as recomendações de filmes;recommendations-processor-verticle
- nó de um cluster Vertx onde um serviço de processamento de recomendações está endereçado;
Na raiz do projeto encontra-se o arquivo docker-compose.yml
, com os serviços que entrarão em execução, juntamente com o Apache Kafka. Garanta que seu environment possua o Docker e Docker Compose instalados, e simplemente execute o projeto:
docker-compose up
Os tópicos criados automaticamente, são:
- person-registered-topic
- recs-per-person-registered-topic-RETRY-1
- recs-per-person-registered-topic-RETRY-2
- recs-per-person-registered-topic-RETRY-3
- recs-per-person-registered-topic-DLQ
- recommendations-generated-topic
- recs-gen-recommendations-generated-topic-RETRY-1
- recs-gen-recommendations-generated-topic-RETRY-2
- recs-gen-recommendations-generated-topic-RETRY-3
- recs-gen-recommendations-generated-topic-DLQ
Note que alguns prefixos sao utilizados nos tópicos, cada um representa um consumer group com sua própria finalidade:
recs-per
- sigla pararecommendations-person
, consumer group do serviço derecommendations
que irá consumir o eventoPersonRegisteredEvent
, iniciando o processamento das recomendações;recs-gen
- sigla pararecommendations-generated
, consumer group do serviço derecommendations
que irá consumir o eventoRecommendationsGeneratedEvent
e realizar a persistencia das recomendações criadas em um banco de dados para leitura (CQRS);
Para inciar o fluxo, basta cadastrar uma nova pessoa, para tal, basta realizar uma requisição POST para http://localhost:8080/person/v1
com um JSON que segue a seguinte estrutura:
{
"name": "Victor",
"birth_date": "21/06/1994",
"email": "[email protected]",
"preferences": ["thriller"]
}
Como resposta podemos esperar os dados da nova pessoa cadastrada acrescido do seu GUID gerado:
{
"id": "5e3a4d60-1c8e-4857-8a33-d688afde3855",
"name": "Victor - 8",
"email": "[email protected]",
"birth_date": "21/06/1994",
"preferences": [
"thriller"
]
}
A partir deste ponto, já é possível acompanhar nos logs do sistema o processo obtenção de filmes no serviço de catalog, a execução do algoritmo de recomendações em recommendations-processor-verticle e por fim, a publicação do evento RecommendationsGeneratedEvent e seu registro em um banco de dados de leitura, no serviço de recommendations.
Caso deseje verificar as recomendaçõe feitas para a pessoa criada, basta realizar uma requisição GET para http://localhost:8082/recommendations/v1/person/:person_id
. Como resposta eh esperada a seguinte estrutura:
{
"id": "32b8031a-ec3e-4657-8ec8-d138be0ae501",
"person_id": "5e3a4d60-1c8e-4857-8a33-d688afde3855",
"movies": [
"9717da82-1fd1-44d3-898c-2524a360a827",
"55598532-dc16-4ad4-b497-965bc8ad9c02",
"85034615-d30a-49fa-a8a7-5db81097123a",
"4c4369df-2f45-4023-8f8d-a339fd0c6869"
]
}
Por fim, para obter detalhes de algum filme, bata realizar uma requisição GET para http://localhost:8083/catalog/v1/details/:movie_id
. Como resposta é esperada a seguinte estrutura:
{
"id": "9717da82-1fd1-44d3-898c-2524a360a827",
"title": "Get Out",
"tags": [
"horror",
"mistery",
"thriller"
],
"release_year": 2017
}
Foi adicionado um bug proposital no catalog-service ao se cadastrar uma pessoa que não possua preferência de filmes. Nessa situação, o serviço irá falhar e portanto, o evento PersonRegisteredEvent não poderá ser processado com sucesso, iniciando-se assim o fluxo de retry e DLQ.
A execução do fluxo de retry pode ser acompanhanda a partir da simples leitura dos logs, especialmente de recommendations
. Pode-se claro, adicionar seus proprios meios de observação, como um kafka-console-consumer
por exemplo, para acompanhar por lá o percurso das mensagens.
Exemplo de requisição que ocasiona o bug:
{
"name": "Victor",
"birth_date": "21/06/1994",
"email": "[email protected]"
}
E pelos logs:
[RxNewThreadScheduler-1] INFO ...RetryableKafkaConsumer - Aplicando delay de '0' milisegundo(s) ao evento 'dd759fae-5bd4-4c34-a9e6-bed95d22bbd9', na tentativa '0'
[RxNewThreadScheduler-1] INFO ...KafkaListenerRetryProxyProcessor - Iniciando processamento do evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9
[RxNewThreadScheduler-1] ERROR ...KafkaListenerRetryProxyProcessor - Falha no processamento do evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9, excessao: java.lang.RuntimeException: Falha no catalog aggregate
[RxNewThreadScheduler-1] INFO ...KafkaListenerRetryProxyProcessor - Republicando evento no topico: recs-per-person-registered-topic-RETRY-1
[RxNewThreadScheduler-1] INFO ...KafkaListenerRetryProxyProcessor - Evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9 republicado com sucesso no topico recs-per-person-registered-topic-RETRY-1
[RxNewThreadScheduler-2] INFO ...RetryableKafkaConsumer - Aplicando delay de '5000' milisegundo(s) ao evento 'dd759fae-5bd4-4c34-a9e6-bed95d22bbd9', na tentativa '1'
[RxNewThreadScheduler-2] INFO ...KafkaListenerRetryProxyProcessor - Iniciando processamento do evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9
[RxNewThreadScheduler-2] ERROR ...KafkaListenerRetryProxyProcessor - Falha no processamento do evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9, excessao: java.lang.RuntimeException: Falha no catalog aggregate
[RxNewThreadScheduler-2] INFO ...KafkaListenerRetryProxyProcessor - Republicando evento no topico: recs-per-person-registered-topic-RETRY-2
[RxNewThreadScheduler-2] INFO ...KafkaListenerRetryProxyProcessor - Evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9 republicado com sucesso no topico recs-per-person-registered-topic-RETRY-2
[RxNewThreadScheduler-3] INFO ...RetryableKafkaConsumer - Aplicando delay de '10000' milisegundo(s) ao evento 'dd759fae-5bd4-4c34-a9e6-bed95d22bbd9', na tentativa '2'
[RxNewThreadScheduler-3] INFO ...KafkaListenerRetryProxyProcessor - Iniciando processamento do evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9
[RxNewThreadScheduler-3] ERROR ...KafkaListenerRetryProxyProcessor - Falha no processamento do evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9, excessao: java.lang.RuntimeException: Falha no catalog aggregate
[RxNewThreadScheduler-3] INFO ...KafkaListenerRetryProxyProcessor - Republicando evento no topico: recs-per-person-registered-topic-RETRY-3
[RxNewThreadScheduler-3] INFO ...KafkaListenerRetryProxyProcessor - Evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9 republicado com sucesso no topico recs-per-person-registered-topic-RETRY-3
[RxNewThreadScheduler-4] INFO ...RetryableKafkaConsumer - Aplicando delay de '15000' milisegundo(s) ao evento 'dd759fae-5bd4-4c34-a9e6-bed95d22bbd9', na tentativa '3'
[RxNewThreadScheduler-4] INFO ...KafkaListenerRetryProxyProcessor - Iniciando processamento do evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9
[RxNewThreadScheduler-4] ERROR ...KafkaListenerRetryProxyProcessor - Falha no processamento do evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9, excessao: java.lang.RuntimeException: Falha no catalog aggregate
[RxNewThreadScheduler-4] WARN ...KafkaListenerRetryProxyProcessor - Evento alcancou o maximo de tentativas de processamento (3), enviando para DLQ...
[RxNewThreadScheduler-4] INFO ...KafkaListenerRetryProxyProcessor - Republicando evento no topico: recs-per-person-registered-topic-DLQ
[RxNewThreadScheduler-4] INFO ...KafkaListenerRetryProxyProcessor - Evento dd759fae-5bd4-4c34-a9e6-bed95d22bbd9 republicado com sucesso no topico recs-per-person-registered-topic-DLQ