Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,16 @@ datacustomcode run ./payload/entrypoint.py
After modifying the `entrypoint.py` as needed, using any dependencies you add in the `.venv` virtual environment, you can run this script in Data Cloud:
```zsh
datacustomcode scan ./payload/entrypoint.py
datacustomcode deploy --path ./payload --name my_custom_script
datacustomcode deploy --path ./payload --name my_custom_script --compute-type CPU_L
```

> [!TIP]
> The `deploy` process can take several minutes. If you'd like more feedback on the underlying process, you can add `--debug` to the command like `datacustomcode --debug deploy --path ./payload --name my_custom_script`
>
> [!NOTE]
> **Compute Types**: Choose the appropriate compute type based on your workload requirements:
> - **CPU_L/CPU_XL/CPU_2XL/CPU_4XL**: Large, X-Large, 2X-Large and 4X-Large CPU instances for data processing
> - Default is `CPU_2XL` which provides a good balance of performance and cost for most use cases

You can now use the Salesforce Data Cloud UI to find the created Data Transform and use the `Run Now` button to run it.
Once the Data Transform run is successful, check the DLO your script is writing to and verify the correct records were added.
Expand Down Expand Up @@ -139,6 +144,7 @@ Options:
- `--name TEXT`: Name of the transformation job [required]
- `--version TEXT`: Version of the transformation job (default: "0.0.1")
- `--description TEXT`: Description of the transformation job (default: "")
- `--compute-type TEXT`: Compute type for the deployment (default: "CPU_XL"). Available options: CPU_L(Large), CPU_XL (Extra Large), CPU_2XL (2X Large), CPU_4XL (4X Large)

#### `datacustomcode init`
Initialize a new development environment with a template.
Expand Down
29 changes: 28 additions & 1 deletion src/datacustomcode/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,43 @@ def zip(path: str):
@click.option("--name", required=True)
@click.option("--version", default="0.0.1")
@click.option("--description", default="Custom Data Transform Code")
def deploy(path: str, name: str, version: str, description: str):
@click.option(
"--cpu-size",
default="CPU_2XL",
help="""CPU size for deployment. Available options:

\b
CPU_L - Large CPU instance
CPU_XL - X-Large CPU instance
CPU_2XL - 2X-Large CPU instance [DEFAULT]
CPU_4XL - 4X-Large CPU instance

Choose based on your workload requirements.""",
)
def deploy(path: str, name: str, version: str, description: str, cpu_size: str):
from datacustomcode.credentials import Credentials
from datacustomcode.deploy import TransformationJobMetadata, deploy_full

logger.debug("Deploying project")

# Validate compute type
from datacustomcode.deploy import COMPUTE_TYPES

if cpu_size not in COMPUTE_TYPES.keys():
click.secho(
f"Error: Invalid CPU size '{cpu_size}'. "
f"Available options: {', '.join(COMPUTE_TYPES.keys())}",
fg="red",
)
raise click.Abort()

logger.debug(f"Deploying with CPU size: {cpu_size}")

metadata = TransformationJobMetadata(
name=name,
version=version,
description=description,
computeType=COMPUTE_TYPES[cpu_size],
)
try:
credentials = Credentials.from_available()
Expand Down
22 changes: 18 additions & 4 deletions src/datacustomcode/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,25 @@
AUTH_PATH = "services/oauth2/token"
WAIT_FOR_DEPLOYMENT_TIMEOUT = 3000

# Available compute types for Data Cloud deployments.
# Nomenclature used by COMPUTE_TYPES keys align with
# compute instances provisioned by Data Cloud.
COMPUTE_TYPES = {
"CPU_L": "CPU_XS", # Large CPU instance
"CPU_XL": "CPU_S", # X-Large CPU instance
"CPU_2XL": "CPU_M", # 2X-Large CPU instance (default)
"CPU_4XL": "CPU_L", # 4X-Large CPU instance
}


class TransformationJobMetadata(BaseModel):
name: str
version: str
description: str
computeType: str

def __init__(self, **data):
super().__init__(**data)


def _join_strip_url(*args: str) -> str:
Expand Down Expand Up @@ -123,7 +137,7 @@ def create_deployment(
"name": metadata.name,
"description": metadata.description,
"version": metadata.version,
"computeType": "CPU_M",
"computeType": metadata.computeType,
}
logger.debug(f"Creating deployment {metadata.name}...")
try:
Expand Down Expand Up @@ -372,9 +386,9 @@ def zip(
# Skip .DS_Store files when adding to zip
for file in files:
if file != ".DS_Store":
file_path = os.path.join(root, file)
zipf.write(file_path)

abs_path = os.path.join(root, file)
arcname = os.path.relpath(abs_path, directory)
zipf.write(abs_path, arcname)
logger.debug(f"Created zip file: {ZIP_FILE_NAME}")


Expand Down
4 changes: 0 additions & 4 deletions src/datacustomcode/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ class DataAccessLayerCalls(pydantic.BaseModel):
def validate_access_layer(self) -> DataAccessLayerCalls:
if self.read_dlo and self.read_dmo:
raise ValueError("Cannot read from DLO and DMO in the same file.")
if len(self.write_to_dlo) > 1 or len(self.write_to_dmo) > 1:
raise ValueError(
"Cannot write to more than one DLO or DMO in the same file."
)
if not self.read_dlo and not self.read_dmo:
raise ValueError("Must read from at least one DLO or DMO.")
if self.read_dlo and self.write_to_dmo:
Expand Down
45 changes: 36 additions & 9 deletions tests/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,10 @@ def test_create_deployment_success(self, mock_make_api_call):
access_token="test_token", instance_url="https://instance.example.com"
)
metadata = TransformationJobMetadata(
name="test_job", version="1.0.0", description="Test job"
name="test_job",
version="1.0.0",
description="Test job",
computeType="CPU_M",
)

mock_make_api_call.return_value = {
Expand All @@ -454,7 +457,10 @@ def test_create_deployment_conflict(self, mock_make_api_call):
access_token="test_token", instance_url="https://instance.example.com"
)
metadata = TransformationJobMetadata(
name="test_job", version="1.0.0", description="Test job"
name="test_job",
version="1.0.0",
description="Test job",
computeType="CPU_M",
)

# Mock HTTP error with 409 Conflict
Expand Down Expand Up @@ -571,7 +577,10 @@ def test_get_deployments(self, mock_make_api_call):
access_token="test_token", instance_url="https://instance.example.com"
)
metadata = TransformationJobMetadata(
name="test_job", version="1.0.0", description="Test job"
name="test_job",
version="1.0.0",
description="Test job",
computeType="CPU_M",
)

mock_make_api_call.return_value = {"deploymentStatus": "Deployed"}
Expand All @@ -595,7 +604,10 @@ def test_wait_for_deployment_success(
access_token="test_token", instance_url="https://instance.example.com"
)
metadata = TransformationJobMetadata(
name="test_job", version="1.0.0", description="Test job"
name="test_job",
version="1.0.0",
description="Test job",
computeType="CPU_M",
)
callback = MagicMock()

Expand All @@ -622,7 +634,10 @@ def test_wait_for_deployment_timeout(
access_token="test_token", instance_url="https://instance.example.com"
)
metadata = TransformationJobMetadata(
name="test_job", version="1.0.0", description="Test job"
name="test_job",
version="1.0.0",
description="Test job",
computeType="CPU_M",
)

# Mock time to simulate timeout
Expand Down Expand Up @@ -699,7 +714,10 @@ def test_create_data_transform(self, mock_make_api_call, mock_get_config):
access_token="test_token", instance_url="https://instance.example.com"
)
metadata = TransformationJobMetadata(
name="test_job", version="1.0.0", description="Test job"
name="test_job",
version="1.0.0",
description="Test job",
computeType="CPU_M",
)

mock_get_config.return_value = DataTransformConfig(
Expand Down Expand Up @@ -762,7 +780,10 @@ def test_deploy_full(
login_url="https://example.com",
)
metadata = TransformationJobMetadata(
name="test_job", version="1.0.0", description="Test job"
name="test_job",
version="1.0.0",
description="Test job",
computeType="CPU_M",
)
callback = MagicMock()

Expand Down Expand Up @@ -799,7 +820,10 @@ def test_run_data_transform(self, mock_make_api_call):
access_token="test_token", instance_url="https://instance.example.com"
)
metadata = TransformationJobMetadata(
name="test_job", version="1.0.0", description="Test job"
name="test_job",
version="1.0.0",
description="Test job",
computeType="CPU_M",
)

mock_make_api_call.return_value = {"status": "Running"}
Expand Down Expand Up @@ -839,7 +863,10 @@ def test_deploy_full_happy_path(
login_url="https://example.com",
)
metadata = TransformationJobMetadata(
name="test_job", version="1.0.0", description="Test job"
name="test_job",
version="1.0.0",
description="Test job",
computeType="CPU_M",
)
callback = MagicMock()

Expand Down
22 changes: 16 additions & 6 deletions tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def test_read_dmo_write_dlo_throws_error(self):
finally:
os.unlink(temp_path)

def test_invalid_multiple_writes(self):
def test_multiple_writes(self):
"""Test scanning a file with multiple write operations."""
content = textwrap.dedent(
"""
Expand All @@ -258,15 +258,25 @@ def test_invalid_multiple_writes(self):
# Read from DLO
df = client.read_dlo("input_dlo")

# Write to multiple DLOs - invalid
client.write_to_dlo("output_dlo_1", df, "overwrite")
client.write_to_dlo("output_dlo_2", df, "overwrite")
# Transform data for different outputs
df_filtered = df.filter(df.col > 10)
df_aggregated = df.groupBy("category").agg({"value": "sum"})

# Write to multiple DLOs
client.write_to_dlo("output_filtered", df_filtered, "overwrite")
client.write_to_dlo("output_aggregated", df_aggregated, "overwrite")
"""
)
temp_path = create_test_script(content)
try:
with pytest.raises(ValueError, match="Cannot write to more than one DLO"):
scan_file(temp_path)
result = scan_file(temp_path)
assert "input_dlo" in result.read_dlo
assert "output_filtered" in result.write_to_dlo
assert "output_aggregated" in result.write_to_dlo
assert len(result.read_dlo) == 1
assert len(result.write_to_dlo) == 2
assert len(result.read_dmo) == 0
assert len(result.write_to_dmo) == 0
finally:
os.unlink(temp_path)

Expand Down