Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Adding Crew.AI as a plugin. #10474

Merged
merged 18 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ BOOKING_SAMPLE_CLIENT_ID=""
BOOKING_SAMPLE_TENANT_ID=""
BOOKING_SAMPLE_CLIENT_SECRET=""
BOOKING_SAMPLE_BUSINESS_ID=""
BOOKING_SAMPLE_SERVICE_ID=""
BOOKING_SAMPLE_SERVICE_ID=""
CREW_AI_ENDPOINT=""
CREW_AI_TOKEN=""
76 changes: 76 additions & 0 deletions python/samples/concepts/plugins/crew_ai_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
import logging

from semantic_kernel import Kernel
from semantic_kernel.core_plugins.crew_ai import CrewAIEnterprise
from semantic_kernel.core_plugins.crew_ai.crew_ai_models import InputMetadata
alliscode marked this conversation as resolved.
Show resolved Hide resolved
from semantic_kernel.functions import KernelArguments, KernelFunction

logging.basicConfig(level=logging.INFO)


async def using_crew_ai_enterprise():
alliscode marked this conversation as resolved.
Show resolved Hide resolved
# Create an instance of the CrewAI Enterprise Crew
crew = CrewAIEnterprise()

#####################################################################
# Using the CrewAI Enterprise Crew directly #
#####################################################################

# The required inputs for the Crew must be known in advance. This example is modeled after the
alliscode marked this conversation as resolved.
Show resolved Hide resolved
# Enterprise Content Marketing Crew Template and requires the following inputs:
inputs = {"company": "CrewAI", "topic": "Agentic products for consumers"}

# Invoke directly with our inputs
kickoff_id = await crew.kickoff(inputs)
print(f"CrewAI Enterprise Crew kicked off with ID: {kickoff_id}")

# Wait for completion
result = await crew.wait_for_crew_completion(kickoff_id)
print("CrewAI Enterprise Crew completed with the following result:")
print(result)

#####################################################################
# Using the CrewAI Enterprise as a Plugin #
#####################################################################

# Define the description of the Crew. This will used as the semantic description of the plugin.
crew_description = (
"Conducts thorough research on the specified company and topic to identify emerging trends,"
"analyze competitor strategies, and gather data-driven insights."
)

# The required inputs for the Crew must be known in advance. This example is modeled after the
# Enterprise Content Marketing Crew Template and requires string inputs for the company and topic.
alliscode marked this conversation as resolved.
Show resolved Hide resolved
# We need to describe the type and purpose of each input to allow the LLM to invoke the crew as expected.
crew_plugin_definitions = [
InputMetadata(name="company", type="string", description="The name of the company that should be researched"),
InputMetadata(name="topic", type="string", description="The topic that should be researched"),
]

# Create the CrewAI Plugin. This builds a plugin that can be added to the Kernel and invoked like any other plugin.
# The plugin will contain the following functions:
# - kickoff: Starts the Crew with the specified inputs and returns the Id of the scheduled kickoff.
# - kickoff_and_wait: Starts the Crew with the specified inputs and waits for the Crew to complete before returning
# the result.
# - wait_for_completion: Waits for the specified Crew kickoff to complete and returns the result.
# - get_status: Gets the status of the specified Crew kickoff.
crew_plugin = crew.create_kernel_plugin(
name="EnterpriseContentMarketingCrew",
description=crew_description,
input_metadata=crew_plugin_definitions,
)

# Example of invoking the plugin directly
kickoff_and_wait_function: KernelFunction = crew_plugin["kickoff_and_wait"]
result = await kickoff_and_wait_function.invoke(
alliscode marked this conversation as resolved.
Show resolved Hide resolved
kernel=Kernel(), arguments=KernelArguments(company="CrewAI", topic="Consumer AI Products")
)

print(result)


if __name__ == "__main__":
asyncio.run(using_crew_ai_enterprise())
11 changes: 11 additions & 0 deletions python/semantic_kernel/core_plugins/crew_ai/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Copyright (c) Microsoft. All rights reserved.

from semantic_kernel.core_plugins.crew_ai.crew_ai_enterprise import CrewAIEnterprise
from semantic_kernel.core_plugins.crew_ai.crew_ai_models import (
CrewAIStatusResponse,
)
from semantic_kernel.core_plugins.crew_ai.crew_ai_settings import (
CrewAISettings,
)

__all__ = ["CrewAIEnterprise", "CrewAISettings", "CrewAIStatusResponse"]
254 changes: 254 additions & 0 deletions python/semantic_kernel/core_plugins/crew_ai/crew_ai_enterprise.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
import logging
from typing import Any

import aiohttp
from pydantic import Field, ValidationError

from semantic_kernel.core_plugins.crew_ai.crew_ai_enterprise_client import CrewAIEnterpriseClient
from semantic_kernel.core_plugins.crew_ai.crew_ai_models import (
CrewAIEnterpriseKickoffState,
CrewAIStatusResponse,
InputMetadata,
)
from semantic_kernel.core_plugins.crew_ai.crew_ai_settings import CrewAISettings
from semantic_kernel.exceptions.function_exceptions import (
FunctionExecutionException,
FunctionResultError,
PluginInitializationError,
)
from semantic_kernel.functions import kernel_function
from semantic_kernel.functions.kernel_function_from_method import KernelFunctionFromMethod
from semantic_kernel.functions.kernel_parameter_metadata import KernelParameterMetadata
from semantic_kernel.functions.kernel_plugin import KernelPlugin
from semantic_kernel.kernel_pydantic import KernelBaseModel
from semantic_kernel.utils.experimental_decorator import experimental_class

logger: logging.Logger = logging.getLogger(__name__)


@experimental_class
class CrewAIEnterprise(KernelBaseModel):
"""Class to interface with Crew.AI Crews from Semantic Kernel.

This object can be used directly or as a plugin in the Kernel.
"""

client: CrewAIEnterpriseClient
polling_interval: float = Field(default=1.0)
polling_timeout: float = Field(default=30.0)

def __init__(
self,
endpoint: str | None = None,
auth_token: str | None = None,
polling_interval: float | None = 1.0,
polling_timeout: float | None = 30.0,
session: aiohttp.ClientSession | None = None,
):
"""Initialize a new instance of the class. This object can be used directly or as a plugin in the Kernel.

Args:
endpoint (str | None, optional): The API endpoint.
auth_token (str | None, optional): The authentication token.
polling_interval (float, optional): The polling interval in seconds. Defaults to 1.0.
polling_timeout (float, optional): The polling timeout in seconds. Defaults to 30.0.
session (aiohttp.ClientSession | None, optional): The HTTP client session. Defaults to None.
"""
try:
settings = CrewAISettings.create(
alliscode marked this conversation as resolved.
Show resolved Hide resolved
endpoint=endpoint,
auth_token=auth_token,
polling_interval=polling_interval,
polling_timeout=polling_timeout,
)
except ValidationError as ex:
raise PluginInitializationError("Failed to initialize CrewAI settings.") from ex

client = CrewAIEnterpriseClient(
endpoint=settings.endpoint, auth_token=settings.auth_token.get_secret_value(), session=session
)

super().__init__(
client=client,
polling_interval=settings.polling_interval,
polling_timeout=settings.polling_timeout,
)

async def kickoff(
self,
inputs: dict[str, Any] | None = None,
task_webhook_url: str | None = None,
step_webhook_url: str | None = None,
crew_webhook_url: str | None = None,
) -> str:
"""Kickoff a new Crew AI task.

Args:
inputs (dict[str, Any], optional): The inputs for the task. Defaults to None.
task_webhook_url (str | None, optional): The webhook URL for task updates. Defaults to None.
step_webhook_url (str | None, optional): The webhook URL for step updates. Defaults to None.
crew_webhook_url (str | None, optional): The webhook URL for crew updates. Defaults to None.

Returns:
str: The ID of the kickoff response.
"""
try:
kickoff_response = await self.client.kickoff(inputs, task_webhook_url, step_webhook_url, crew_webhook_url)
logger.info(f"CrewAI Crew kicked off with Id: {kickoff_response.kickoff_id}")
return kickoff_response.kickoff_id
except Exception as ex:
raise FunctionExecutionException("Failed to kickoff CrewAI Crew.") from ex

@kernel_function(description="Get the status of a Crew AI kickoff.")
async def get_crew_kickoff_status(self, kickoff_id: str) -> CrewAIStatusResponse:
"""Get the status of a Crew AI task.

Args:
kickoff_id (str): The ID of the kickoff response.

Returns:
CrewAIStatusResponse: The status response of the task.
"""
try:
status_response = await self.client.get_status(kickoff_id)
logger.info(f"CrewAI Crew status for kickoff Id: {kickoff_id} is {status_response.state}")
return status_response
except Exception as ex:
raise FunctionExecutionException(
f"Failed to get status of CrewAI Crew with kickoff Id: {kickoff_id}."
) from ex

@kernel_function(description="Wait for the completion of a Crew AI kickoff.")
async def wait_for_crew_completion(self, kickoff_id: str) -> str:
"""Wait for the completion of a Crew AI task.

Args:
kickoff_id (str): The ID of the kickoff response.

Returns:
str: The result of the task.

Raises:
FunctionExecutionException: If the task fails or an error occurs while waiting for completion.
"""
try:
status_response = None
status = CrewAIEnterpriseKickoffState.Pending

async def poll_status():
nonlocal status, status_response
while status not in [
CrewAIEnterpriseKickoffState.Failed,
CrewAIEnterpriseKickoffState.Failure,
CrewAIEnterpriseKickoffState.Success,
CrewAIEnterpriseKickoffState.Not_Found,
]:
logger.debug(
f"Waiting for CrewAI Crew with kickoff Id: {kickoff_id} to complete. Current state: {status}"
)

await asyncio.sleep(self.polling_interval)
status_response = await self.client.get_status(kickoff_id)
status = status_response.state

await asyncio.wait_for(poll_status(), timeout=self.polling_timeout)

logger.info(f"CrewAI Crew with kickoff Id: {kickoff_id} completed with status: {status_response.state}")
if status in ["Failed", "Failure"]:
raise FunctionResultError(f"CrewAI Crew failed with error: {status_response.result}")
return status_response.result or ""
except Exception as ex:
raise FunctionExecutionException(
f"Failed to wait for completion of CrewAI Crew with kickoff Id: {kickoff_id}."
) from ex

def create_kernel_plugin(
self,
name: str,
description: str,
input_metadata: list[InputMetadata] | None = None,
task_webhook_url: str | None = None,
step_webhook_url: str | None = None,
crew_webhook_url: str | None = None,
) -> dict[str, Any]:
"""Creates a kernel plugin that can be used to invoke the CrewAI Crew.

Args:
name (str): The name of the kernel plugin.
description (str): The description of the kernel plugin.
input_metadata (Optional[List[InputMetadata]], optional): The definitions of the Crew's
required inputs. Defaults to None.
task_webhook_url (Optional[str], optional): The task level webhook URL. Defaults to None.
step_webhook_url (Optional[str], optional): The step level webhook URL. Defaults to None.
crew_webhook_url (Optional[str], optional): The crew level webhook URL. Defaults to None.

Returns:
dict[str, Any]: A dictionary representing the kernel plugin.
"""

def build_metadata(input_metadata: InputMetadata) -> KernelParameterMetadata:
alliscode marked this conversation as resolved.
Show resolved Hide resolved
return KernelParameterMetadata(
name=input_metadata.name,
description=input_metadata.description,
default_value=None,
type=input_metadata.type,
is_required=True,
)

parameters = [build_metadata(input) for input in input_metadata or []]

@kernel_function(description="Kickoff the CrewAI task.")
async def kickoff(**kwargs: Any) -> str:
args = self._build_arguments(input_metadata, kwargs)
return await self.kickoff(
inputs=args,
task_webhook_url=task_webhook_url,
step_webhook_url=step_webhook_url,
crew_webhook_url=crew_webhook_url,
)

@kernel_function(description="Kickoff the CrewAI task and wait for completion.")
async def kickoff_and_wait(**kwargs: Any) -> str:
args = self._build_arguments(input_metadata, kwargs)
kickoff_id = await self.kickoff(
inputs=args,
task_webhook_url=task_webhook_url,
step_webhook_url=step_webhook_url,
crew_webhook_url=crew_webhook_url,
)
return await self.wait_for_crew_completion(kickoff_id)

return KernelPlugin(
name,
description,
{
"kickoff": KernelFunctionFromMethod(kickoff, stream_method=None, parameters=parameters),
"kickoff_and_wait": KernelFunctionFromMethod(
kickoff_and_wait, stream_method=None, parameters=parameters
),
"get_status": self.get_crew_kickoff_status,
"wait_for_completion": self.wait_for_crew_completion,
},
)

def _build_arguments(self, input_metadata: list[InputMetadata] | None, arguments: dict[str, Any]) -> dict[str, Any]:
"""Builds the arguments for the CrewAI task from the provided metadata and arguments.

Args:
input_metadata (Optional[List[InputMetadata]]): The metadata for the inputs.
alliscode marked this conversation as resolved.
Show resolved Hide resolved
arguments (dict[str, Any]): The provided arguments.

Returns:
dict[str, Any]: The built arguments.
"""
args = {}
if input_metadata:
for input in input_metadata:
name = input.name
if name not in arguments:
raise PluginInitializationError(f"Missing required input '{name}' for CrewAI.")
args[name] = arguments[name]
return args
Loading
Loading