Skip to content

Commit

Permalink
Add allow_auto_create_topics to make automatic topic creation configu…
Browse files Browse the repository at this point in the history
…rable (#1556)

* Add allow_auto_create_topics param to make automatic topic creation configurable

* Call blocking functions using run_in_executor

* Increment version
  • Loading branch information
kumaranvpl authored Jun 25, 2024
1 parent 28f4783 commit 4a2341d
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 6 deletions.
2 changes: 1 addition & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple and fast framework to create message brokers based microservices."""

__version__ = "0.5.13"
__version__ = "0.5.14"

SERVICE_NAME = f"faststream-{__version__}"

Expand Down
7 changes: 7 additions & 0 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ def __init__(
"""
),
] = SERVICE_NAME,
allow_auto_create_topics: Annotated[
bool,
Doc("""
Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics.
"""),
] = True,
config: Annotated[
Optional[ConfluentConfig],
Doc("""
Expand Down Expand Up @@ -350,6 +356,7 @@ def __init__(
request_timeout_ms=request_timeout_ms,
retry_backoff_ms=retry_backoff_ms,
metadata_max_age_ms=metadata_max_age_ms,
allow_auto_create_topics=allow_auto_create_topics,
connections_max_idle_ms=connections_max_idle_ms,
loop=loop,
# publisher args
Expand Down
21 changes: 16 additions & 5 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def __init__(
enable_idempotence: bool = False,
transactional_id: Optional[Union[str, int]] = None,
transaction_timeout_ms: int = 60000,
allow_auto_create_topics: bool = True,
sasl_mechanism: Optional[str] = None,
sasl_plain_password: Optional[str] = None,
sasl_plain_username: Optional[str] = None,
Expand Down Expand Up @@ -138,6 +139,7 @@ def __init__(
"retry.backoff.ms": retry_backoff_ms,
"security.protocol": security_protocol.lower(),
"connections.max.idle.ms": connections_max_idle_ms,
"allow.auto.create.topics": allow_auto_create_topics,
}
self.config = {**self.config, **config_from_params}

Expand All @@ -152,8 +154,8 @@ def __init__(

self.producer = Producer(self.config, logger=self.logger)
# self.producer.init_transactions()
self.producer.list_topics()
self.loop = loop or asyncio.get_event_loop()
self.loop.run_in_executor(None, self.producer.list_topics)

async def stop(self) -> None:
"""Stop the Kafka producer and flush remaining messages."""
Expand Down Expand Up @@ -223,6 +225,7 @@ class TopicPartition(NamedTuple):
def create_topics(
topics: List[str],
config: Dict[str, Optional[Union[str, int, float, bool, Any]]],
logger: Union["LoggerProto", None, object] = logger,
) -> None:
"""Creates Kafka topics using the provided configuration."""
required_config_params = (
Expand Down Expand Up @@ -252,9 +255,9 @@ def create_topics(
f.result() # The result itself is None
except Exception as e: # noqa: PERF203
if "TOPIC_ALREADY_EXISTS" not in str(e):
logger.warning(f"Failed to create topic {topic}: {e}")
logger.warning(f"Failed to create topic {topic}: {e}") # type: ignore[union-attr]
else:
logger.info(f"Topic `{topic}` created.")
logger.info(f"Topic `{topic}` created.") # type: ignore[union-attr]


class AsyncConfluentConsumer:
Expand Down Expand Up @@ -285,6 +288,7 @@ def __init__(
security_protocol: str = "PLAINTEXT",
connections_max_idle_ms: int = 540000,
isolation_level: str = "read_uncommitted",
allow_auto_create_topics: bool = True,
sasl_mechanism: Optional[str] = None,
sasl_plain_password: Optional[str] = None,
sasl_plain_username: Optional[str] = None,
Expand Down Expand Up @@ -316,7 +320,7 @@ def __init__(
]
)
config_from_params = {
"allow.auto.create.topics": True,
"allow.auto.create.topics": allow_auto_create_topics,
# "topic.metadata.refresh.interval.ms": 1000,
"bootstrap.servers": bootstrap_servers,
"client.id": client_id,
Expand Down Expand Up @@ -354,7 +358,14 @@ def __init__(

self.loop = loop or asyncio.get_event_loop()

create_topics(topics=self.topics, config=self.config)
if allow_auto_create_topics:
self.loop.run_in_executor(
None, create_topics, self.topics, self.config, logger
)
else:
logger.warning( # type: ignore[union-attr]
"Auto create topics is disabled. Make sure the topics exist."
)
self.consumer = Consumer(self.config, logger=self.logger)

async def start(self) -> None:
Expand Down
1 change: 1 addition & 0 deletions faststream/confluent/schemas/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class ConsumerConnectionParams(TypedDict, total=False):
"PLAINTEXT",
]
connections_max_idle_ms: int
allow_auto_create_topics: bool
sasl_mechanism: Literal[
"PLAIN",
"GSSAPI",
Expand Down

0 comments on commit 4a2341d

Please sign in to comment.