From 858007acdf8397b3d0bda41a87d0c9ab2423693c Mon Sep 17 00:00:00 2001 From: anish-work Date: Mon, 6 Jan 2025 00:04:44 +0530 Subject: [PATCH 1/4] add zoho transation mapping script --- daras_ai_v2/settings.py | 1 + scripts/zoho_crm.py | 355 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 356 insertions(+) create mode 100644 scripts/zoho_crm.py diff --git a/daras_ai_v2/settings.py b/daras_ai_v2/settings.py index 0819f8024..2ff69a70c 100644 --- a/daras_ai_v2/settings.py +++ b/daras_ai_v2/settings.py @@ -474,3 +474,4 @@ SCRAPING_PROXY_USERNAME = config("SCRAPING_PROXY_USERNAME", "") SCRAPING_PROXY_PASSWORD = config("SCRAPING_PROXY_PASSWORD", "") SCRAPING_PROXY_CERT_URL = config("SCRAPING_PROXY_CERT_URL", "") +ZOHO_AUTH_CODE = config("ZOHO_AUTH_CODE", "") diff --git a/scripts/zoho_crm.py b/scripts/zoho_crm.py new file mode 100644 index 000000000..9a70894ba --- /dev/null +++ b/scripts/zoho_crm.py @@ -0,0 +1,355 @@ +import os +import django +import sys +import logging +import json +from typing import List, Dict, Optional, Any, Callable +from datetime import datetime, timedelta +import requests +import json + +from django.db.models import Model + +project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(project_path) + +# Set up Django environment +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "daras_ai_v2.settings") +django.setup() + +from app_users.models import AppUserTransaction, TransactionReason +from daras_ai_v2 import settings + +ZOHO_CONTACT_API = "https://www.zohoapis.com/crm/v2/Contacts" +ZOHO_DEAL_API = "https://www.zohoapis.com/crm/v7/Deals" +ZOHO_HEADERS = {"Authorization": f"Bearer {settings.ZOHO_AUTH_CODE}"} +ZOHO_BULK_FILE_UPLOAD_API = "https://www.zohoapis.com/crm/v2/upload" + + +class ConfigurableFieldMapper: + def __init__(self): + """ + :param mapping_config_path: Path to JSON mapping configuration + """ + self.logger = logging.getLogger(self.__class__.__name__) + self.mapping_config = self._load_mapping_config() + + def _load_mapping_config(self) -> Dict: + """ + :param config_path: Path to configuration file + :return: Mapping configuration dictionary + """ + default_config = { + "contact_mapping": { + "uid": {"zoho_field": "Gooey User ID"}, + "django_appUser_url": {"zoho_field": "Gooey Admin Link"}, + "display_name": {"zoho_field": "Contact Name"}, + "display_name": {"zoho_field": "Last_Name"}, + "email": {"zoho_field": "Email"}, + "phone_number": { + "zoho_field": "Phone", + "transformer": lambda phone: phone.as_international, + }, + "is_anonymous": {"zoho_field": "Not synced"}, + "is_disabled": {"zoho_field": "Not synced"}, + "photo_url": {"zoho_field": "Contact Image"}, + "workspace.balance": {"zoho_field": "Not synced"}, + "created_at": { + "zoho_field": "Gooey Created Date", + "transformer": lambda date: date.strftime("%Y-%m-%d"), + }, + "handle.name": {"zoho_field": "Gooey Handle"}, + "upgraded_from_anonymous_at": { + "zoho_field": "Registered date", + "transformer": lambda date: date.strftime("%Y-%m-%d"), + }, + "banner_url": {"zoho_field": "Not synced"}, + "bio": {"zoho_field": "Description"}, + "company": {"zoho_field": "Company"}, + "github_username": {"zoho_field": "Not synced"}, + "website_url": {"zoho_field": "Personal URL"}, + "disable_rate_limits": {"zoho_field": "Not synced"}, + }, + "transaction_mapping": { + "workspace.id": {"zoho_field": "Account foreign key"}, + "workspace.name": {"zoho_field": "Account Name"}, + "user.name": {"zoho_field": "Contact foreign key"}, + "invoice_id": {"zoho_field": "invoice_id"}, + "amount": {"zoho_field": "Amount"}, + "end_balance": {"zoho_field": "end_balance"}, + "payment_provider": { + "zoho_field": "Payment Provider", + "transformer": lambda provider: provider.name, + }, + "reason": {"zoho_field": "reason"}, + "created_at": { + "zoho_field": "Payment_date", + "transformer": lambda date: date.strftime("%Y-%m-%d"), + }, + "charged_amount": {"zoho_field": "Amount"}, + "plan": {"zoho_field": "plan"}, + "payment_provider_url": { + "zoho_field": "Link to Payment", + "transformer": lambda url: url(), + }, + }, + } + + return default_config + + def _deep_merge(self, base: Dict, update: Dict): + """ + :param base: Default configuration + :param update: User-provided configuration + """ + for key, value in update.items(): + if isinstance(value, dict): + base[key] = self._deep_merge(base.get(key, {}), value) + else: + base[key] = value + return base + + def map_model_to_zoho( + self, model_instance: Model, mapping_type: str + ) -> dict[str, Any]: + """ + :param model_instance: Django model instance + :param mapping_type: Type of mapping (contact or transaction) + :return: Mapped ZOHO field dictionary + """ + mapping_config = self.mapping_config.get(mapping_type, {}) + zoho_fields = {} + + for model_field, field_config in mapping_config.items(): + try: + # Check if the field is callable (ends with '()') + if model_field.endswith("()"): + method_name = model_field.rstrip("()") + value = getattr(model_instance, method_name, None) + if callable(value): + value = value() # Call the method + else: + raise AttributeError(f"{method_name} is not callable") + else: + # Get value from model + value = getattr(model_instance, model_field, None) + + # Apply transformation if specified + zoho_field = field_config.get("zoho_field") + transformer = field_config.get("transformer") + + if value is not None and zoho_field: + transformered_value = transformer(value) if transformer else value + zoho_fields[zoho_field] = transformered_value + + except Exception as e: + self.logger.warning(f"Mapping error for {model_field}: {e}") + + return zoho_fields + + +class ZOHOSyncManager: + def __init__( + self, + field_mapper: ConfigurableFieldMapper, + batch_size: int = 100, + max_retries: int = 3, + ): + """ + :param field_mapper: Configurable field mapping instance + :param batch_size: Number of records to process in a single batch + :param max_retries: Maximum retry attempts for failed operations + """ + self.logger = logging.getLogger(self.__class__.__name__) + self.field_mapper = field_mapper + self.batch_size = batch_size + self.max_retries = max_retries + + # Track sync progress and state + self.sync_state = { + "start_time": datetime.now(), + "total_users_processed": 0, + "total_transactions_synced": 0, + "failed_operations": [], + "retry_count": 0, + } + + # self._init_zoho_client() + + # def _init_zoho_client(self): + # Initialise ZOHO here + + def bulk_sync_transactions( + self, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + positive_only: bool = True, + dry_run: bool = False, + test: bool = False, + ): + """ + :param start_date: Optional start date for transaction filtering + :param end_date: Optional end date for transaction filtering + :param positive_only: Sync only positive transactions + :param dry_run: Preview sync without actual API calls + + :return: Sync statistics + """ + # Build transaction query with optional date filtering + transaction_query = AppUserTransaction.objects.all() + if start_date: + transaction_query = transaction_query.filter(created_at__gte=start_date) + + if end_date: + transaction_query = transaction_query.filter(created_at__lte=end_date) + + # Filter for positive transactions if specified + if positive_only: + transaction_query = transaction_query.filter(amount__gt=0) + + # Process transactions in batches + for batch_start in range(0, transaction_query.count(), self.batch_size): + batch_transactions = transaction_query[ + batch_start : batch_start + self.batch_size + ] + + for transaction in batch_transactions: + try: + # Fetch or create user contact in ZOHO + user = transaction.user + + # Map user data to ZOHO contact + contact_data = self.field_mapper.map_model_to_zoho( + user, mapping_type="contact_mapping" + ) + + # Find or create contact in ZOHO + try: + contact = self._find_or_create_contact(contact_data) + except Exception as contact_error: + self.logger.error( + f"Contact creation failed for user {user.id}: {contact_error}" + ) + self.sync_state["failed_operations"].append( + { + "transaction_id": transaction.id, + "user_id": user.id, + "error": "Contact creation failed", + } + ) + continue + + # Map transaction to ZOHO Deal + try: + deal_data = self.field_mapper.map_model_to_zoho( + transaction, mapping_type="transaction_mapping" + ) + + # Add mandotary fields + deal_data["Deal_Name"] = ( + f"{transaction.workspace} {transaction.reason_note()}" + ) + deal_data["Stage"] = "Organic Closed Won" + deal_data["Vertical"] = "Organic" + deal_data["Pipeline"] = "Organic Deals" + deal_data["Primary Workflow"] = "Unknown" + + # Create Deal in ZOHO + response = requests.post( + ZOHO_DEAL_API, + headers=ZOHO_HEADERS, + json={"data": [deal_data]}, + ) + if response.status_code == 400: + raise Exception(response.text) + + # Update sync statistics + self.sync_state["total_transactions_synced"] += 1 + if dry_run: + print(f"Dry Run - Transaction {transaction.id}") + print(f"Transaction Data: {deal_data}") + print(f"User Contact Data: {contact_data}") + continue + except Exception as deal_error: + self.logger.error( + f"Deal creation failed for transaction {transaction.id}: {deal_error}" + ) + self.sync_state["failed_operations"].append( + { + "transaction_id": transaction.id, + "user_id": user.id, + "error": "Deal creation failed", + } + ) + + except Exception as general_error: + self.logger.error( + f"Sync failed for transaction {transaction.id}: {general_error}" + ) + self.sync_state["failed_operations"].append( + { + "transaction_id": transaction.id, + "error": "General sync error", + } + ) + + return self.sync_state + + def _find_or_create_contact(self, contact_data: Dict): + """ + Find existing contact or create new in ZOHO + + :param contact_data: Mapped contact information + :return: ZOHO contact record + """ + email = contact_data.get("Email") + if not email: + raise ValueError("Email is required for contact creation") + + search_criteria = f"Email:equals:{email}" + search_response = requests.get( + f"{ZOHO_CONTACT_API}/search?criteria={search_criteria}", + headers=ZOHO_HEADERS, + ) + if search_response.status_code == 400: + raise Exception(search_response.text) + + if search_response.status_code == 204: + response = requests.post( + ZOHO_CONTACT_API, + headers={**ZOHO_HEADERS, "Content-Type": "application/json"}, + json={"data": [contact_data]}, + ) + return response + else: + search_response + + +def run_advanced_zoho_sync(positive_only: bool = True, dry_run: bool = False): + """ + :param config_path: Path to custom mapping configuration + :param positive_only: Sync only positive transactions + :param dry_run: Preview sync without API calls + """ + logging.basicConfig(level=logging.INFO) + + field_mapper = ConfigurableFieldMapper() + sync_manager = ZOHOSyncManager(field_mapper) + + results = sync_manager.bulk_sync_transactions( + positive_only=positive_only, dry_run=dry_run + ) + + print("ZOHO Sync Complete:") + print(f"Users Processed: {results['total_users_processed']}") + print(f"Transactions Synced: {results['total_transactions_synced']}") + print(f"Failed Operations: {len(results['failed_operations'])}") + + if results["failed_operations"]: + print("Detailed Failures:") + for failure in results["failed_operations"]: + print(json.dumps(failure, indent=2)) + + +if __name__ == "__main__": + run_advanced_zoho_sync() From b48747d318f95bee8b1bf39501107a6cdfbecc9f Mon Sep 17 00:00:00 2001 From: anish-work Date: Mon, 6 Jan 2025 20:13:37 +0530 Subject: [PATCH 2/4] add bulk upload --- scripts/zoho_crm.py | 308 +++++++++++++++++++++++--------------------- 1 file changed, 161 insertions(+), 147 deletions(-) diff --git a/scripts/zoho_crm.py b/scripts/zoho_crm.py index 9a70894ba..463066390 100644 --- a/scripts/zoho_crm.py +++ b/scripts/zoho_crm.py @@ -7,6 +7,10 @@ from datetime import datetime, timedelta import requests import json +import csv +from collections import defaultdict +from typing import List, Dict, Optional, Any, Tuple +import tempfile from django.db.models import Model @@ -23,7 +27,7 @@ ZOHO_CONTACT_API = "https://www.zohoapis.com/crm/v2/Contacts" ZOHO_DEAL_API = "https://www.zohoapis.com/crm/v7/Deals" ZOHO_HEADERS = {"Authorization": f"Bearer {settings.ZOHO_AUTH_CODE}"} -ZOHO_BULK_FILE_UPLOAD_API = "https://www.zohoapis.com/crm/v2/upload" +ZOHO_BULK_FILE_UPLOAD_API = "https://content.zohoapis.com/crm/v7/upload" class ConfigurableFieldMapper: @@ -44,7 +48,7 @@ def _load_mapping_config(self) -> Dict: "uid": {"zoho_field": "Gooey User ID"}, "django_appUser_url": {"zoho_field": "Gooey Admin Link"}, "display_name": {"zoho_field": "Contact Name"}, - "display_name": {"zoho_field": "Last_Name"}, + # "display_name": {"zoho_field": "Last_Name"}, "email": {"zoho_field": "Email"}, "phone_number": { "zoho_field": "Phone", @@ -73,7 +77,7 @@ def _load_mapping_config(self) -> Dict: "transaction_mapping": { "workspace.id": {"zoho_field": "Account foreign key"}, "workspace.name": {"zoho_field": "Account Name"}, - "user.name": {"zoho_field": "Contact foreign key"}, + "user.display_name": {"zoho_field": "Contact foreign key"}, "invoice_id": {"zoho_field": "invoice_id"}, "amount": {"zoho_field": "Amount"}, "end_balance": {"zoho_field": "end_balance"}, @@ -93,6 +97,15 @@ def _load_mapping_config(self) -> Dict: "transformer": lambda url: url(), }, }, + "workspace_mapping": { + "id": {"zoho_field": "Gooey Workspace ID"}, + "name": {"zoho_field": "Account Name"}, + "balance": {"zoho_field": "Balance"}, + "created_at": { + "zoho_field": "Created Date", + "transformer": lambda date: date.strftime("%Y-%m-%d"), + }, + }, } return default_config @@ -148,10 +161,98 @@ def map_model_to_zoho( return zoho_fields -class ZOHOSyncManager: +class ZohoBulkUploader: + def __init__(self, field_mapper: "ConfigurableFieldMapper", batch_size: int = 100): + self.field_mapper = field_mapper + self.batch_size = batch_size + self.logger = logging.getLogger(self.__class__.__name__) + + def prepare_bulk_data( + self, transactions: List[AppUserTransaction] + ) -> Tuple[List[Dict], List[Dict], List[Dict]]: + """Prepares data for bulk upload, organizing contacts, accounts, and deals""" + contacts = defaultdict(dict) # Use email as key + accounts = defaultdict(dict) # Use workspace ID as key + deals = [] + + for transaction in transactions: + # Prepare contact data + contact_data = self.field_mapper.map_model_to_zoho( + transaction.user, "contact_mapping" + ) + contacts[contact_data["Email"]] = contact_data + + # Prepare account (workspace) data + workspace_data = self.field_mapper.map_model_to_zoho( + transaction.workspace, "workspace_mapping" + ) + + if workspace_data and transaction.workspace.is_personal: + workspace_data["Account Name"] = ( + f"{transaction.user.display_name} Personal Workspace" + ) + + accounts[transaction.workspace.id] = workspace_data + + # Prepare deal data + deal_data = self.field_mapper.map_model_to_zoho( + transaction, "transaction_mapping" + ) + deal_data.update( + { + "Deal_Name": f"{transaction.workspace} {transaction.reason_note()}", + "Stage": "Organic Closed Won", + "Vertical": "Organic", + "Pipeline": "Organic Deals", + "Primary Workflow": "Unknown", + "Contact_Name": contact_data.get("Full_Name", ""), + "Email": contact_data.get("Email", ""), + } + ) + deals.append(deal_data) + + return list(contacts.values()), list(accounts.values()), deals + + def create_bulk_upload_file(self, records: List[Dict], module: str) -> str: + """Creates CSV file for bulk upload""" + if not records: + return None + + temp_dir = tempfile.gettempdir() + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + csv_path = os.path.join(temp_dir, f"{module}_{timestamp}.csv") + + fieldnames = list(records[0].keys()) + + with open(csv_path, "w", newline="") as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(records) + + return csv_path + + def upload_bulk_file(self, file_path: str, module: str) -> Dict: + """Uploads CSV file to ZOHO CRM""" + with open(file_path, "rb") as file: + files = {"file": (os.path.basename(file_path), file)} + data = {"module": module, "operation": "insert"} + + response = requests.post( + ZOHO_BULK_FILE_UPLOAD_API, + headers={**ZOHO_HEADERS, "feature": "bulk-write"}, + files=files, + data=data, + ) + + if response.status_code != 200: + raise Exception(f"Bulk upload failed: {response.text}") + + return response.json() + + +class ZOHOSync: def __init__( self, - field_mapper: ConfigurableFieldMapper, batch_size: int = 100, max_retries: int = 3, ): @@ -161,23 +262,9 @@ def __init__( :param max_retries: Maximum retry attempts for failed operations """ self.logger = logging.getLogger(self.__class__.__name__) - self.field_mapper = field_mapper + self.field_mapper = ConfigurableFieldMapper() + self.bulk_uploader = ZohoBulkUploader(self.field_mapper) self.batch_size = batch_size - self.max_retries = max_retries - - # Track sync progress and state - self.sync_state = { - "start_time": datetime.now(), - "total_users_processed": 0, - "total_transactions_synced": 0, - "failed_operations": [], - "retry_count": 0, - } - - # self._init_zoho_client() - - # def _init_zoho_client(self): - # Initialise ZOHO here def bulk_sync_transactions( self, @@ -195,6 +282,7 @@ def bulk_sync_transactions( :return: Sync statistics """ + stats = {"processed": 0, "successful": 0, "failed": 0, "errors": []} # Build transaction query with optional date filtering transaction_query = AppUserTransaction.objects.all() if start_date: @@ -207,149 +295,75 @@ def bulk_sync_transactions( if positive_only: transaction_query = transaction_query.filter(amount__gt=0) - # Process transactions in batches for batch_start in range(0, transaction_query.count(), self.batch_size): batch_transactions = transaction_query[ batch_start : batch_start + self.batch_size ] - for transaction in batch_transactions: - try: - # Fetch or create user contact in ZOHO - user = transaction.user - - # Map user data to ZOHO contact - contact_data = self.field_mapper.map_model_to_zoho( - user, mapping_type="contact_mapping" + try: + contacts, accounts, deals = self.bulk_uploader.prepare_bulk_data( + batch_transactions + ) + + # Upload contacts + if contacts: + contact_file = self.bulk_uploader.create_bulk_upload_file( + contacts, "Contacts" ) + if dry_run: + print(f"Contacts: {contacts}") + else: + self.bulk_uploader.upload_bulk_file(contact_file, "Contacts") - # Find or create contact in ZOHO - try: - contact = self._find_or_create_contact(contact_data) - except Exception as contact_error: - self.logger.error( - f"Contact creation failed for user {user.id}: {contact_error}" - ) - self.sync_state["failed_operations"].append( - { - "transaction_id": transaction.id, - "user_id": user.id, - "error": "Contact creation failed", - } - ) - continue - - # Map transaction to ZOHO Deal - try: - deal_data = self.field_mapper.map_model_to_zoho( - transaction, mapping_type="transaction_mapping" - ) - - # Add mandotary fields - deal_data["Deal_Name"] = ( - f"{transaction.workspace} {transaction.reason_note()}" - ) - deal_data["Stage"] = "Organic Closed Won" - deal_data["Vertical"] = "Organic" - deal_data["Pipeline"] = "Organic Deals" - deal_data["Primary Workflow"] = "Unknown" - - # Create Deal in ZOHO - response = requests.post( - ZOHO_DEAL_API, - headers=ZOHO_HEADERS, - json={"data": [deal_data]}, - ) - if response.status_code == 400: - raise Exception(response.text) - - # Update sync statistics - self.sync_state["total_transactions_synced"] += 1 - if dry_run: - print(f"Dry Run - Transaction {transaction.id}") - print(f"Transaction Data: {deal_data}") - print(f"User Contact Data: {contact_data}") - continue - except Exception as deal_error: - self.logger.error( - f"Deal creation failed for transaction {transaction.id}: {deal_error}" - ) - self.sync_state["failed_operations"].append( - { - "transaction_id": transaction.id, - "user_id": user.id, - "error": "Deal creation failed", - } - ) - - except Exception as general_error: - self.logger.error( - f"Sync failed for transaction {transaction.id}: {general_error}" + # Upload accounts + if accounts: + account_file = self.bulk_uploader.create_bulk_upload_file( + accounts, "Accounts" ) - self.sync_state["failed_operations"].append( - { - "transaction_id": transaction.id, - "error": "General sync error", - } + if dry_run: + print(f"Accounts: {accounts}") + else: + self.bulk_uploader.upload_bulk_file(account_file, "Accounts") + + # Upload deals + if deals: + deal_file = self.bulk_uploader.create_bulk_upload_file( + deals, "Deals" ) + if dry_run: + print(f"Deals: {deals}") + else: + self.bulk_uploader.upload_bulk_file(deal_file, "Deals") - return self.sync_state + stats["successful"] += len(batch_transactions) - def _find_or_create_contact(self, contact_data: Dict): - """ - Find existing contact or create new in ZOHO - - :param contact_data: Mapped contact information - :return: ZOHO contact record - """ - email = contact_data.get("Email") - if not email: - raise ValueError("Email is required for contact creation") - - search_criteria = f"Email:equals:{email}" - search_response = requests.get( - f"{ZOHO_CONTACT_API}/search?criteria={search_criteria}", - headers=ZOHO_HEADERS, - ) - if search_response.status_code == 400: - raise Exception(search_response.text) - - if search_response.status_code == 204: - response = requests.post( - ZOHO_CONTACT_API, - headers={**ZOHO_HEADERS, "Content-Type": "application/json"}, - json={"data": [contact_data]}, - ) - return response - else: - search_response + except Exception as e: + self.logger.error(f"Batch sync failed: {str(e)}") + stats["failed"] += len(batch_transactions) + stats["errors"].append({"batch_start": batch_start, "error": str(e)}) + stats["processed"] += len(batch_transactions) -def run_advanced_zoho_sync(positive_only: bool = True, dry_run: bool = False): - """ - :param config_path: Path to custom mapping configuration - :param positive_only: Sync only positive transactions - :param dry_run: Preview sync without API calls - """ - logging.basicConfig(level=logging.INFO) + return stats - field_mapper = ConfigurableFieldMapper() - sync_manager = ZOHOSyncManager(field_mapper) - results = sync_manager.bulk_sync_transactions( - positive_only=positive_only, dry_run=dry_run - ) +def run_optimized_sync( + start_date: Optional[datetime] = None, end_date: Optional[datetime] = None +): + """Convenience function to run the sync""" + sync_manager = ZOHOSync() + results = sync_manager.bulk_sync_transactions(start_date, end_date) - print("ZOHO Sync Complete:") - print(f"Users Processed: {results['total_users_processed']}") - print(f"Transactions Synced: {results['total_transactions_synced']}") - print(f"Failed Operations: {len(results['failed_operations'])}") + print(f"Sync completed:") + print(f"Processed: {results['processed']}") + print(f"Successful: {results['successful']}") + print(f"Failed: {results['failed']}") - if results["failed_operations"]: - print("Detailed Failures:") - for failure in results["failed_operations"]: - print(json.dumps(failure, indent=2)) + if results["errors"]: + print("\nErrors encountered:") + for error in results["errors"]: + print(f"Batch starting at {error['batch_start']}: {error['error']}") if __name__ == "__main__": - run_advanced_zoho_sync() + run_optimized_sync() From 8a98a21caac1e430e2ddbe622396d733e423f97c Mon Sep 17 00:00:00 2001 From: anish-work Date: Wed, 15 Jan 2025 19:26:33 +0530 Subject: [PATCH 3/4] add field mappings for bulk API --- daras_ai_v2/settings.py | 1 + scripts/zoho_crm.py | 442 +++++++++++++++++++++++++++++----------- 2 files changed, 322 insertions(+), 121 deletions(-) diff --git a/daras_ai_v2/settings.py b/daras_ai_v2/settings.py index 2ff69a70c..23a09899f 100644 --- a/daras_ai_v2/settings.py +++ b/daras_ai_v2/settings.py @@ -475,3 +475,4 @@ SCRAPING_PROXY_PASSWORD = config("SCRAPING_PROXY_PASSWORD", "") SCRAPING_PROXY_CERT_URL = config("SCRAPING_PROXY_CERT_URL", "") ZOHO_AUTH_CODE = config("ZOHO_AUTH_CODE", "") +ZOHO_ORG_ID = config("ZOHO_ORG_ID", "") diff --git a/scripts/zoho_crm.py b/scripts/zoho_crm.py index 463066390..b48e5adcd 100644 --- a/scripts/zoho_crm.py +++ b/scripts/zoho_crm.py @@ -10,6 +10,7 @@ import csv from collections import defaultdict from typing import List, Dict, Optional, Any, Tuple +import zipfile import tempfile from django.db.models import Model @@ -21,13 +22,95 @@ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "daras_ai_v2.settings") django.setup() -from app_users.models import AppUserTransaction, TransactionReason +from app_users.models import AppUserTransaction, PaymentProvider, TransactionReason from daras_ai_v2 import settings ZOHO_CONTACT_API = "https://www.zohoapis.com/crm/v2/Contacts" ZOHO_DEAL_API = "https://www.zohoapis.com/crm/v7/Deals" ZOHO_HEADERS = {"Authorization": f"Bearer {settings.ZOHO_AUTH_CODE}"} ZOHO_BULK_FILE_UPLOAD_API = "https://content.zohoapis.com/crm/v7/upload" +ZOHO_BULK_CREATE_JOB = "https://www.zohoapis.com/crm/bulk/v7/write" +ZOHO_ORG_ID = settings.ZOHO_ORG_ID + + +def get_field_mappings(module: str) -> List[Dict]: + field_mappings = { + "Deals": [ + {"api_name": "Layout", "default_value": {"value": "6093802000000498176"}}, + {"api_name": "id", "index": 0}, + {"api_name": "Invoice_ID", "index": 1}, + {"api_name": "Account_Lookup", "index": 2, "find_by": "id"}, + {"api_name": "Contact_Lookup", "index": 3, "find_by": "id"}, + {"api_name": "Account_Title", "index": 4}, + {"api_name": "Contact_Email", "index": 5}, + {"api_name": "Amount", "index": 6}, + {"api_name": "End_Balance", "index": 7}, + {"api_name": "Payment_Provider", "index": 8}, + {"api_name": "Reason", "index": 9}, + {"api_name": "Closing_Date", "index": 10, "format": "yyyy-MM-dd"}, + {"api_name": "Link_to_Payment", "index": 11}, + {"api_name": "Currency_Type", "index": 12}, + {"api_name": "Deal_Name", "index": 13}, + {"api_name": "Stage", "index": 14}, + {"api_name": "Vertical", "index": 15}, + {"api_name": "Pipeline", "index": 16}, + {"api_name": "Type", "index": 17}, + {"api_name": "Primary_Workflow", "index": 18}, + ], + "Contacts": [ + {"api_name": "id", "index": 0}, + {"api_name": "Gooey_User_ID", "index": 1}, + # {"api_name": "Gooey_Admin_Link", "index": 2}, + {"api_name": "Contact_Name", "index": 3}, + {"api_name": "Last_Name", "index": 4}, + {"api_name": "Email", "index": 5}, + {"api_name": "Phone", "index": 6}, + {"api_name": "Not_Synced", "index": 7}, + {"api_name": "Contact_Image", "index": 8}, + {"api_name": "Gooey_Created_Date", "index": 9, "format": "yyyy-MM-dd"}, + {"api_name": "Gooey_Handle", "index": 10}, + {"api_name": "Registered_Date", "index": 11, "format": "yyyy-MM-dd"}, + {"api_name": "Description", "index": 12}, + {"api_name": "Company", "index": 13}, + {"api_name": "Personal_Url", "index": 14}, + ], + "Accounts": [ + {"api_name": "id", "index": 0}, + {"api_name": "Account_Name", "index": 1}, + {"api_name": "Balance", "index": 2}, + {"api_name": "Is_Paying", "index": 3}, + {"api_name": "Gooey_Admin_Link", "index": 4}, + {"api_name": "Created_Date", "index": 5, "format": "yyyy-MM-dd"}, + {"api_name": "Updated_At", "index": 5, "format": "yyyy-MM-dd"}, + ], + } + return field_mappings.get(module, []) + + +def get_zoho_module_name(module: str) -> str: + """ + :param module: Zoho CRM module name + :return: Zoho CRM module API name + """ + module_names = { + "Contacts": "Contacts", + "Accounts": "Accounts", + "Deals": "Deals", + } + return module_names.get(module, "Contacts") + + +def get_unique_field(module: str) -> str: + """ + :param module: Zoho CRM module name + :return: Unique field name for the specified module + """ + unique_fields = { + "Contacts": "id", + "Accounts": "id", + "Deals": "id", + } + return unique_fields.get(module, "ID") class ConfigurableFieldMapper: @@ -40,69 +123,93 @@ def __init__(self): def _load_mapping_config(self) -> Dict: """ - :param config_path: Path to configuration file :return: Mapping configuration dictionary """ default_config = { "contact_mapping": { - "uid": {"zoho_field": "Gooey User ID"}, - "django_appUser_url": {"zoho_field": "Gooey Admin Link"}, - "display_name": {"zoho_field": "Contact Name"}, - # "display_name": {"zoho_field": "Last_Name"}, - "email": {"zoho_field": "Email"}, - "phone_number": { - "zoho_field": "Phone", + "id": {"db_key": "id"}, + "Gooey_User_ID": {"db_key": "uid"}, + "Gooey_Admin_Link": { + "db_key": "django_appUser_url", + "transformer": lambda url: url(), + }, + "Contact_Name": {"db_key": "display_name"}, + "Last_Name": { + "db_key": "display_name", + "transformer": lambda name: name.split(" ")[-1], + }, + "Email": {"db_key": "email"}, + "Phone": { + "db_key": "phone_number", "transformer": lambda phone: phone.as_international, }, - "is_anonymous": {"zoho_field": "Not synced"}, - "is_disabled": {"zoho_field": "Not synced"}, - "photo_url": {"zoho_field": "Contact Image"}, - "workspace.balance": {"zoho_field": "Not synced"}, - "created_at": { - "zoho_field": "Gooey Created Date", + "Not_Synced": {"db_key": "is_anonymous"}, + "Contact_Image": {"db_key": "photo_url"}, + "Gooey_Created_Date": { + "db_key": "created_at", "transformer": lambda date: date.strftime("%Y-%m-%d"), }, - "handle.name": {"zoho_field": "Gooey Handle"}, - "upgraded_from_anonymous_at": { - "zoho_field": "Registered date", + "Gooey_Handle": {"db_key": "handle.name"}, + "Registered_Date": { + "db_key": "upgraded_from_anonymous_at", "transformer": lambda date: date.strftime("%Y-%m-%d"), }, - "banner_url": {"zoho_field": "Not synced"}, - "bio": {"zoho_field": "Description"}, - "company": {"zoho_field": "Company"}, - "github_username": {"zoho_field": "Not synced"}, - "website_url": {"zoho_field": "Personal URL"}, - "disable_rate_limits": {"zoho_field": "Not synced"}, + "Description": {"db_key": "bio"}, + "Company": {"db_key": "company"}, + "Personal_URL": {"db_key": "website_url"}, }, "transaction_mapping": { - "workspace.id": {"zoho_field": "Account foreign key"}, - "workspace.name": {"zoho_field": "Account Name"}, - "user.display_name": {"zoho_field": "Contact foreign key"}, - "invoice_id": {"zoho_field": "invoice_id"}, - "amount": {"zoho_field": "Amount"}, - "end_balance": {"zoho_field": "end_balance"}, - "payment_provider": { - "zoho_field": "Payment Provider", - "transformer": lambda provider: provider.name, + "id": {"db_key": "id"}, + "Invoice_ID": {"db_key": "invoice_id"}, + "Account_Lookup": { + "db_key": "workspace", + "transformer": lambda workspace: workspace.id, + }, + "Contact_Lookup": { + "db_key": "user", + "transformer": lambda user: user.id, }, - "reason": {"zoho_field": "reason"}, - "created_at": { - "zoho_field": "Payment_date", + "Account_Name": {"db_key": "workspace"}, + "Contact_Email": { + "db_key": "user", + "transformer": lambda user: user.email, + }, + "Amount": {"db_key": "amount"}, + "End_Balance": {"db_key": "end_balance"}, + "Payment_Provider": { + "db_key": "payment_provider", + "transformer": lambda provider: PaymentProvider(provider).name, + }, + "Reason": { + "db_key": "reason", + "transformer": lambda reason: TransactionReason(reason).name, + }, + "Closing_Date": { + "db_key": "created_at", "transformer": lambda date: date.strftime("%Y-%m-%d"), }, - "charged_amount": {"zoho_field": "Amount"}, - "plan": {"zoho_field": "plan"}, - "payment_provider_url": { - "zoho_field": "Link to Payment", + "Link_to_Payment": { + "db_key": "payment_provider_url", "transformer": lambda url: url(), }, + "Currency": {"db_key": "currency", "default": "USD"}, }, "workspace_mapping": { - "id": {"zoho_field": "Gooey Workspace ID"}, - "name": {"zoho_field": "Account Name"}, - "balance": {"zoho_field": "Balance"}, - "created_at": { - "zoho_field": "Created Date", + "id": {"db_key": "id"}, + "Account_Name": {"db_key": "name"}, + "Account_Image": {"db_key": "photo_url"}, + "Balance": {"db_key": "balance"}, + "Is_Paying": {"db_key": "is_paying"}, + "Gooey_Admin_Link": { + "db_key": "django_workspace_url", + "transformer": lambda url: url(), + }, + "Created_Date": { + "db_key": "created_at", + "transformer": lambda date: date.strftime("%Y-%m-%d"), + }, + "Updated_At": { + "db_key": "updated_at", "transformer": lambda date: date.strftime("%Y-%m-%d"), }, }, @@ -133,30 +240,23 @@ def map_model_to_zoho( mapping_config = self.mapping_config.get(mapping_type, {}) zoho_fields = {} - for model_field, field_config in mapping_config.items(): + for zoho_field, field_config in mapping_config.items(): try: - # Check if the field is callable (ends with '()') - if model_field.endswith("()"): - method_name = model_field.rstrip("()") - value = getattr(model_instance, method_name, None) - if callable(value): - value = value() # Call the method - else: - raise AttributeError(f"{method_name} is not callable") - else: - # Get value from model - value = getattr(model_instance, model_field, None) - - # Apply transformation if specified - zoho_field = field_config.get("zoho_field") + db_key = field_config.get("db_key") transformer = field_config.get("transformer") - if value is not None and zoho_field: - transformered_value = transformer(value) if transformer else value - zoho_fields[zoho_field] = transformered_value + value = getattr(model_instance, db_key, None) + + # Apply transformation if specified + if value is not None: + zoho_fields[zoho_field] = ( + transformer(value) if transformer else value + ) + else: + zoho_fields[zoho_field] = field_config.get("default") or "None" except Exception as e: - self.logger.warning(f"Mapping error for {model_field}: {e}") + self.logger.warning(f"Mapping error for {zoho_field}: {e}") return zoho_fields @@ -180,6 +280,7 @@ def prepare_bulk_data( contact_data = self.field_mapper.map_model_to_zoho( transaction.user, "contact_mapping" ) + contact_data["Account_Lookup"] = transaction.workspace.id contacts[contact_data["Email"]] = contact_data # Prepare account (workspace) data @@ -200,27 +301,27 @@ def prepare_bulk_data( ) deal_data.update( { - "Deal_Name": f"{transaction.workspace} {transaction.reason_note()}", + "Deal_Name": f"${transaction.amount} {transaction.workspace} {transaction.reason_note()}", "Stage": "Organic Closed Won", "Vertical": "Organic", "Pipeline": "Organic Deals", - "Primary Workflow": "Unknown", - "Contact_Name": contact_data.get("Full_Name", ""), - "Email": contact_data.get("Email", ""), + "Type": "Organic - Other", + "Primary_Workflow": "Unknown", } ) deals.append(deal_data) return list(contacts.values()), list(accounts.values()), deals - def create_bulk_upload_file(self, records: List[Dict], module: str) -> str: + def create_bulk_upload_file( + self, records: List[Dict], module: str, filename: str + ) -> str: """Creates CSV file for bulk upload""" if not records: return None temp_dir = tempfile.gettempdir() - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - csv_path = os.path.join(temp_dir, f"{module}_{timestamp}.csv") + csv_path = os.path.join(temp_dir, f"{module}_{filename}.csv") fieldnames = list(records[0].keys()) @@ -231,36 +332,122 @@ def create_bulk_upload_file(self, records: List[Dict], module: str) -> str: return csv_path - def upload_bulk_file(self, file_path: str, module: str) -> Dict: - """Uploads CSV file to ZOHO CRM""" - with open(file_path, "rb") as file: - files = {"file": (os.path.basename(file_path), file)} - data = {"module": module, "operation": "insert"} + def upload_bulk_file(self, file_path: str) -> Dict: + zip_path = f"{file_path}.zip" + with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: + zipf.write(file_path, arcname=os.path.basename(file_path)) + + with open(zip_path, "rb") as file: + files = {"file": (os.path.basename(zip_path), file)} response = requests.post( ZOHO_BULK_FILE_UPLOAD_API, - headers={**ZOHO_HEADERS, "feature": "bulk-write"}, + headers={ + **ZOHO_HEADERS, + "feature": "bulk-write", + "X-CRM-ORG": ZOHO_ORG_ID, + }, files=files, - data=data, ) if response.status_code != 200: - raise Exception(f"Bulk upload failed: {response.text}") + raise Exception(f"Bulk upload failed: {response}") + + response_data = response.json() + if ( + "details" not in response_data + or "file_id" not in response_data["details"] + ): + raise Exception( + f"Failed to retrieve file_id from upload response: {response_data}" + ) + + return response_data["details"] + + def create_bulk_upload_job( + self, file_ids: List[str], operation: str = "upsert" + ) -> Dict: + deal_file_id, account_file_id, contact_file_id = file_ids + modules = ["Accounts", "Contacts", "Deals"] + file_id_map = { + "Accounts": account_file_id, + "Contacts": contact_file_id, + "Deals": deal_file_id, + } + + results = [] + + for module in modules: + data = { + "operation": operation, + "resource": [ + { + "type": "data", + "module": {"api_name": module}, + "file_id": file_id_map[module], + "find_by": get_unique_field(module), + "field_mappings": get_field_mappings(module), + } + ], + } + + try: + response = requests.post( + ZOHO_BULK_CREATE_JOB, + headers={**ZOHO_HEADERS}, + json=data, + ) + + if response.status_code != 201: + raise Exception( + f"Bulk upload job creation failed for {module}: {response.text}" + ) + + results.append({"module": module, "response": response.json()}) + + except Exception as e: + print(f"Error creating bulk upload job for {module}: {str(e)}") + raise - return response.json() + return results + + def process_bulk_upload(self, files: []) -> Dict: + if not files: + raise Exception("No files provided for bulk upload.") + + deal_file, account_file, contact_file = files + account_file_id = self.upload_bulk_file(account_file).get("file_id") + deal_file_id = self.upload_bulk_file(deal_file).get("file_id") + contact_file_id = self.upload_bulk_file(contact_file).get("file_id") + + print(f"Account File ID: {account_file_id}") + print(f"Deal File ID: {deal_file_id}") + print(f"Contact File ID: {contact_file_id}") + + for file_id in account_file_id, deal_file_id, contact_file_id: + if not file_id: + raise Exception("File upload did not return a valid file_id.") + + self.logger.info(f"File uploaded successfully. File ID: {file_id}") + + self.logger.info(f"Creating bulk upload job") + job_response = self.create_bulk_upload_job( + [deal_file_id, account_file_id, contact_file_id], + operation="upsert", + ) + + self.logger.info( + f"Bulk upload job created successfully. Job Details: {job_response}" + ) + return job_response class ZOHOSync: + def __init__( self, - batch_size: int = 100, - max_retries: int = 3, + batch_size: int = 50, ): - """ - :param field_mapper: Configurable field mapping instance - :param batch_size: Number of records to process in a single batch - :param max_retries: Maximum retry attempts for failed operations - """ self.logger = logging.getLogger(self.__class__.__name__) self.field_mapper = ConfigurableFieldMapper() self.bulk_uploader = ZohoBulkUploader(self.field_mapper) @@ -271,20 +458,14 @@ def bulk_sync_transactions( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, positive_only: bool = True, - dry_run: bool = False, - test: bool = False, + limit: int | None = None, ): - """ - :param start_date: Optional start date for transaction filtering - :param end_date: Optional end date for transaction filtering - :param positive_only: Sync only positive transactions - :param dry_run: Preview sync without actual API calls - - :return: Sync statistics - """ stats = {"processed": 0, "successful": 0, "failed": 0, "errors": []} # Build transaction query with optional date filtering - transaction_query = AppUserTransaction.objects.all() + transaction_query = AppUserTransaction.objects.all().order_by("created_at") + + # @TODO filter from wrt batch size from created_at like pagination ( paginate_queryset ) + if start_date: transaction_query = transaction_query.filter(created_at__gte=start_date) @@ -299,44 +480,45 @@ def bulk_sync_transactions( batch_transactions = transaction_query[ batch_start : batch_start + self.batch_size ] + # stop if above limit + if limit and batch_start >= limit: + print("Limit reached. Stopping sync.") + break + + batch_label = f"{batch_start}-{batch_start + self.batch_size}" try: contacts, accounts, deals = self.bulk_uploader.prepare_bulk_data( batch_transactions ) - # Upload contacts - if contacts: - contact_file = self.bulk_uploader.create_bulk_upload_file( - contacts, "Contacts" - ) - if dry_run: - print(f"Contacts: {contacts}") - else: - self.bulk_uploader.upload_bulk_file(contact_file, "Contacts") - - # Upload accounts if accounts: account_file = self.bulk_uploader.create_bulk_upload_file( - accounts, "Accounts" + accounts, "Accounts", batch_label + ) + + if contacts: + contact_file = self.bulk_uploader.create_bulk_upload_file( + contacts, "Contacts", batch_label ) - if dry_run: - print(f"Accounts: {accounts}") - else: - self.bulk_uploader.upload_bulk_file(account_file, "Accounts") - # Upload deals if deals: deal_file = self.bulk_uploader.create_bulk_upload_file( - deals, "Deals" + deals, "Deals", batch_label ) - if dry_run: - print(f"Deals: {deals}") - else: - self.bulk_uploader.upload_bulk_file(deal_file, "Deals") stats["successful"] += len(batch_transactions) + print(f"{batch_label} Deals CSV: {deal_file}") + print(f"{batch_label} Contacts CSV: {contact_file}") + print(f"{batch_label} Accounts CSV: {account_file}") + + if deal_file: + upload_response = self.bulk_uploader.process_bulk_upload( + [deal_file, account_file, contact_file] + ) + print(f"Deals upload response: {upload_response}") + except Exception as e: self.logger.error(f"Batch sync failed: {str(e)}") stats["failed"] += len(batch_transactions) @@ -346,13 +528,27 @@ def bulk_sync_transactions( return stats + def _get_user_confirmation(self, label: str) -> bool: + f"Are you sure you want to proceed with the upload for batch {label}?" + while True: + user_input = ( + input("Proceed with uploading this batch? (yes/no): ").strip().lower() + ) + if user_input in {"yes", "y"}: + return True + elif user_input in {"no", "n"}: + print("Skipping this batch.") + return False + else: + print("Invalid input. Please type 'yes' or 'no'.") + def run_optimized_sync( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None ): """Convenience function to run the sync""" sync_manager = ZOHOSync() - results = sync_manager.bulk_sync_transactions(start_date, end_date) + results = sync_manager.bulk_sync_transactions(start_date, end_date, limit=50) print(f"Sync completed:") print(f"Processed: {results['processed']}") @@ -366,4 +562,8 @@ def run_optimized_sync( if __name__ == "__main__": + argv = sys.argv[1:] + + # get args from command line and convert date string to datetime object + start_date = datetime.strptime(argv[0], "%Y-%m-%d") if argv else None run_optimized_sync() From 4ee270f1b73f524fe101e635e81a79be495bb0e9 Mon Sep 17 00:00:00 2001 From: anish-work Date: Tue, 28 Jan 2025 17:30:04 +0530 Subject: [PATCH 4/4] update find_by fields --- scripts/zoho_crm.py | 94 ++++++++++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 40 deletions(-) diff --git a/scripts/zoho_crm.py b/scripts/zoho_crm.py index b48e5adcd..01771a1c8 100644 --- a/scripts/zoho_crm.py +++ b/scripts/zoho_crm.py @@ -37,10 +37,10 @@ def get_field_mappings(module: str) -> List[Dict]: field_mappings = { "Deals": [ {"api_name": "Layout", "default_value": {"value": "6093802000000498176"}}, - {"api_name": "id", "index": 0}, - {"api_name": "Invoice_ID", "index": 1}, - {"api_name": "Account_Lookup", "index": 2, "find_by": "id"}, - {"api_name": "Contact_Lookup", "index": 3, "find_by": "id"}, + {"api_name": "Invoice_ID", "index": 0}, + {"api_name": "Account_Lookup", "index": 1, "find_by": "Gooey_ID"}, + {"api_name": "Contact_Lookup", "index": 2, "find_by": "Gooey_User_ID"}, + {"api_name": "Contact_Name", "index": 3, "find_by": "id"}, {"api_name": "Account_Title", "index": 4}, {"api_name": "Contact_Email", "index": 5}, {"api_name": "Amount", "index": 6}, @@ -58,30 +58,31 @@ def get_field_mappings(module: str) -> List[Dict]: {"api_name": "Primary_Workflow", "index": 18}, ], "Contacts": [ - {"api_name": "id", "index": 0}, - {"api_name": "Gooey_User_ID", "index": 1}, - # {"api_name": "Gooey_Admin_Link", "index": 2}, - {"api_name": "Contact_Name", "index": 3}, - {"api_name": "Last_Name", "index": 4}, - {"api_name": "Email", "index": 5}, - {"api_name": "Phone", "index": 6}, - {"api_name": "Not_Synced", "index": 7}, - {"api_name": "Contact_Image", "index": 8}, - {"api_name": "Gooey_Created_Date", "index": 9, "format": "yyyy-MM-dd"}, - {"api_name": "Gooey_Handle", "index": 10}, - {"api_name": "Registered_Date", "index": 11, "format": "yyyy-MM-dd"}, - {"api_name": "Description", "index": 12}, - {"api_name": "Company", "index": 13}, - {"api_name": "Personal_Url", "index": 14}, + {"api_name": "Gooey_User_ID", "index": 0}, + {"api_name": "Gooey_Admin_Link", "index": 1}, + {"api_name": "Contact_Name", "index": 2}, + {"api_name": "Last_Name", "index": 3}, + {"api_name": "Email", "index": 4}, + {"api_name": "Phone", "index": 5}, + {"api_name": "Not_Synced", "index": 6}, + {"api_name": "Contact_Image", "index": 7}, + {"api_name": "Gooey_Created_Date", "index": 8, "format": "yyyy-MM-dd"}, + {"api_name": "Gooey_Handle", "index": 9}, + {"api_name": "Registered_Date", "index": 10, "format": "yyyy-MM-dd"}, + {"api_name": "Description", "index": 11}, + {"api_name": "Company", "index": 12}, + {"api_name": "Personal_Url", "index": 13}, + {"api_name": "Account_Lookup", "find_by": "Gooey_ID", "index": 14}, ], "Accounts": [ - {"api_name": "id", "index": 0}, + {"api_name": "Gooey_ID", "index": 0}, {"api_name": "Account_Name", "index": 1}, - {"api_name": "Balance", "index": 2}, - {"api_name": "Is_Paying", "index": 3}, - {"api_name": "Gooey_Admin_Link", "index": 4}, - {"api_name": "Created_Date", "index": 5, "format": "yyyy-MM-dd"}, - {"api_name": "Updated_At", "index": 5, "format": "yyyy-MM-dd"}, + {"api_name": "Picture", "index": 2}, + {"api_name": "Balance", "index": 3}, + {"api_name": "Is_Paying", "index": 4}, + {"api_name": "Gooey_Admin_Link", "index": 5}, + {"api_name": "Created_Date", "index": 6, "format": "yyyy-MM-dd"}, + {"api_name": "Updated_At", "index": 7, "format": "yyyy-MM-dd"}, ], } return field_mappings.get(module, []) @@ -106,9 +107,9 @@ def get_unique_field(module: str) -> str: :return: Unique field name for the specified module """ unique_fields = { - "Contacts": "id", - "Accounts": "id", - "Deals": "id", + "Contacts": "Gooey_User_ID", + "Accounts": "Gooey_ID", + "Deals": "Invoice_ID", } return unique_fields.get(module, "ID") @@ -127,11 +128,10 @@ def _load_mapping_config(self) -> Dict: """ default_config = { "contact_mapping": { - "id": {"db_key": "id"}, "Gooey_User_ID": {"db_key": "uid"}, "Gooey_Admin_Link": { - "db_key": "django_appUser_url", - "transformer": lambda url: url(), + "db_key": "id", + "transformer": lambda id: f"https://admin.gooey.ai/app_users/appuser/{id}", }, "Contact_Name": {"db_key": "display_name"}, "Last_Name": { @@ -159,7 +159,6 @@ def _load_mapping_config(self) -> Dict: "Personal_URL": {"db_key": "website_url"}, }, "transaction_mapping": { - "id": {"db_key": "id"}, "Invoice_ID": {"db_key": "invoice_id"}, "Account_Lookup": { "db_key": "workspace", @@ -167,9 +166,13 @@ def _load_mapping_config(self) -> Dict: }, "Contact_Lookup": { "db_key": "user", - "transformer": lambda user: user.id, + "transformer": lambda user: user.uid, + }, + "Contact_Name": { + "db_key": "user", + "transformer": lambda user: user.display_name, }, - "Account_Name": {"db_key": "workspace"}, + "Account_Title": {"db_key": "workspace"}, "Contact_Email": { "db_key": "user", "transformer": lambda user: user.email, @@ -195,14 +198,14 @@ def _load_mapping_config(self) -> Dict: "Currency": {"db_key": "currency", "default": "USD"}, }, "workspace_mapping": { - "id": {"db_key": "id"}, + "Gooey_ID": {"db_key": "id"}, "Account_Name": {"db_key": "name"}, - "Account_Image": {"db_key": "photo_url"}, + "Picture": {"db_key": "photo_url"}, "Balance": {"db_key": "balance"}, "Is_Paying": {"db_key": "is_paying"}, "Gooey_Admin_Link": { - "db_key": "django_workspace_url", - "transformer": lambda url: url(), + "db_key": "id", + "transformer": lambda id: f"https://admin.gooey.ai/workspaces/workspace/{id}/", }, "Created_Date": { "db_key": "created_at", @@ -301,7 +304,7 @@ def prepare_bulk_data( ) deal_data.update( { - "Deal_Name": f"${transaction.amount} {transaction.workspace} {transaction.reason_note()}", + "Deal_Name": f"${transaction.amount} {transaction.workspace} {transaction.created_at.strftime(settings.SHORT_DATETIME_FORMAT)} {transaction.reason_note()}", "Stage": "Organic Closed Won", "Vertical": "Organic", "Pipeline": "Organic Deals", @@ -459,6 +462,7 @@ def bulk_sync_transactions( end_date: Optional[datetime] = None, positive_only: bool = True, limit: int | None = None, + dry_run: bool = False, ): stats = {"processed": 0, "successful": 0, "failed": 0, "errors": []} # Build transaction query with optional date filtering @@ -513,11 +517,21 @@ def bulk_sync_transactions( print(f"{batch_label} Contacts CSV: {contact_file}") print(f"{batch_label} Accounts CSV: {account_file}") - if deal_file: + if not dry_run and (deal_file and contact_file and account_file): upload_response = self.bulk_uploader.process_bulk_upload( [deal_file, account_file, contact_file] ) print(f"Deals upload response: {upload_response}") + else: + print("Dry run. Skipping upload.") + print( + "Deal File:", + deal_file, + "Contact File:", + contact_file, + "Account File:", + account_file, + ) except Exception as e: self.logger.error(f"Batch sync failed: {str(e)}")