Skip to content

simPod/PhpKafkaBundle

Folders and files

NameName
Last commit message
Last commit date
Jul 3, 2020
May 8, 2025
May 8, 2025
May 8, 2025
Mar 23, 2024
Mar 23, 2024
Oct 3, 2018
Mar 23, 2024
May 8, 2025
Mar 13, 2023
Jan 25, 2021
Mar 24, 2022
Nov 23, 2021
Sep 4, 2023
Mar 9, 2022

Repository files navigation

PHP Kafka Symfony bundle for php-rdkafka

GitHub Actions Code Coverage Downloads Packagist Infection MSI

Installation

Add as Composer dependency:

composer require simpod/kafka-bundle

Then add KafkaBundle to Symfony's bundles.php:

use SimPod\KafkaBundle\SimPodKafkaBundle;

return [
    ...
    new SimPodKafkaBundle()
    ...
];

Usage

This package simply makes it easier to integrate https://github.com/arnaud-lb/php-rdkafka with Symfony. For more details how to work with Kafka in PHP, refer to its documentation.

Available console commands:

  • bin/console debug:kafka:consumers to list all available consumer groups
  • bin/console kafka:consumer:run <consumer name> to run consumer instance

Config:

You can create eg. kafka.yaml file in your config directory with following content:

kafka:
    authentication: '%env(KAFKA_AUTHENTICATION)%'
    bootstrap_servers: '%env(KAFKA_BOOTSTRAP_SERVERS)%'
    client:
        id: 'your-application-name'
  • authentication reads env var KAFKA_AUTHENTICATIOn that contains authentication uri (sasl-plain://user:password, or it might be just empty indicating no authentication).
  • bootstrap_servers reads env var KAFKA_BOOTSTRAP_SERVERS that contains comma-separated list of bootstrap servers (broker-1.kafka:9092,broker-2.kafka:9092).

If bootstrap_servers isn't set, it defaults to 127.0.0.1:9092

Services

Following services are registered in container and can be DI injected.

Configuration

class: \SimPod\KafkaBundle\Kafka\Configuration

Configuration service allows easy access to all the configuration properties.

$config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getIdWithHostname());

Consuming

There's interface NamedConsumer available. When your consumer implements it, this bundle autoregisters it.

This is example of simple consumer, it can be then run via bin/console kafka:consumer:run consumer1

<?php

declare(strict_types=1);

namespace Your\AppNamespace;

use SimPod\Kafka\Clients\Consumer\ConsumerConfig;
use SimPod\Kafka\Clients\Consumer\KafkaConsumer;
use SimPod\KafkaBundle\Kafka\Configuration;
use SimPod\KafkaBundle\Kafka\Clients\Consumer\NamedConsumer;

final class ExampleKafkaConsumer implements NamedConsumer
{
    private Configuration $configuration;

    public function __construct(Configuration $configuration)
    {
        $this->configuration = $configuration;
    }

    public function run(): void
    {
        $kafkaConsumer = new KafkaConsumer($this->getConfig());

        $kafkaConsumer->subscribe(['topic1']);

        while (true) {
            ...
        }
    }
    
    public function getName(): string {
        return 'consumer1';    
    }

    private function getConfig(): ConsumerConfig
    {
        $config = new ConsumerConfig();

        $config->set(ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG, $this->configuration->getBootstrapServers());
        $config->set(ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG, false);
        $config->set(ConsumerConfig::CLIENT_ID_CONFIG, $this->configuration->getClientIdWithHostname());
        $config->set(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, 'earliest');
        $config->set(ConsumerConfig::GROUP_ID_CONFIG, 'consumer_group');

        return $config;
    }
}

Development

There is kwn/php-rdkafka-stubs listed as a dev dependency so it properly integrates php-rdkafka extension with IDE.