From 8c7bc95b324356b59fd903c38107b554cea92e9b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 12 Oct 2025 00:44:19 +0000 Subject: [PATCH 1/4] Initial plan From e27890bda8486ef501316bcef0dfd84942f70dfc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 12 Oct 2025 00:54:04 +0000 Subject: [PATCH 2/4] Add RetentionDetector feature with comprehensive tests and documentation Co-authored-by: mitchelllisle <18128531+mitchelllisle@users.noreply.github.com> --- README.md | 161 ++++++++++ .../analysers/RetentionDetector.scala | 252 ++++++++++++++++ src/test/resources/retentionConfig.yaml | 12 + src/test/scala/RetentionDetectorTest.scala | 279 ++++++++++++++++++ 4 files changed, 704 insertions(+) create mode 100644 src/main/scala/org/mitchelllisle/analysers/RetentionDetector.scala create mode 100644 src/test/resources/retentionConfig.yaml create mode 100644 src/test/scala/RetentionDetectorTest.scala diff --git a/README.md b/README.md index dc22048..cb30b67 100644 --- a/README.md +++ b/README.md @@ -233,6 +233,167 @@ val redactedData = redactor(data) ``` +### Retention Detection +The `RetentionDetector` helps identify customers whose data exceeds specified retention schedules, ensuring compliance +with data retention policies (GDPR, CCPA, etc.). It scans datasets for retention-relevant fields and reports customers +who are overdue for data deletion or anonymization. + +#### Basic Usage + +```scala +import org.mitchelllisle.analysers.{RetentionDetector, RetentionPolicy} +import java.time.LocalDate + +val spark = SparkSession.builder().getOrCreate() +val userData = spark.read.option("header", "true").csv("user-activity.csv") + +// Define a retention policy +val gdprPolicy = RetentionPolicy( + name = "GDPR_DEFAULT", + retentionDays = 365, + description = Some("GDPR 1-year retention policy") +) + +// Detect violations +val violations = RetentionDetector.detectViolations( + data = userData, + idColumn = "user_id", + timestampColumn = "last_activity_date", + policy = gdprPolicy +) + +violations.show() +// Returns: customerId, daysPastRetention, lastActivity, policyName +``` + +#### Detection with Summary Statistics + +Get both violations and compliance metrics: + +```scala +val (violations, summary) = RetentionDetector.detectViolationsWithSummary( + userData, + "user_id", + "last_activity_date", + gdprPolicy +) + +println(summary) +// Prints: +// Retention Policy: GDPR_DEFAULT +// Retention Period: 365 days +// Total Records: 10000 +// Violations Found: 1234 +// Compliance Rate: 87.66% +// Average Days Past Retention: 45.2 +``` + +#### Multiple Retention Policies + +Check against multiple policies simultaneously: + +```scala +val policies = Seq( + RetentionPolicy("SHORT_TERM", 30, Some("30-day policy")), + RetentionPolicy("MEDIUM_TERM", 90, Some("90-day policy")), + RetentionPolicy("LONG_TERM", 365, Some("1-year policy")) +) + +val allViolations = RetentionDetector.detectMultiplePolicies( + userData, + "user_id", + "last_activity_date", + policies +) +``` + +#### Generate Action Plan + +Get automated recommendations for handling violations: + +```scala +val actionPlan = RetentionDetector.generateActionPlan( + violations, + actionThresholdDays = 30 +) + +actionPlan.show() +// Returns: customerId, daysPastRetention, lastActivity, policyName, +// recommendedAction (Review/Anonymize/Delete), priority (High/Medium/Low) +``` + +#### Analyze Violation Patterns + +Group violations by time ranges: + +```scala +val violationsByRange = RetentionDetector.groupViolationsByRange(violations) +violationsByRange.show() +// Returns: +// violationRange | policyName | recordCount | avgDaysPast | maxDaysPast +// 0-30 days | GDPR | 150 | 15.2 | 30 +// 31-90 days | GDPR | 234 | 62.5 | 90 +// 91-180 days | GDPR | 128 | 135.8 | 180 +// etc. +``` + +#### Supported Column Types + +The retention detector automatically handles different timestamp formats: +- **TimestampType**: Standard Spark timestamp columns +- **DateType**: Date-only columns +- **StringType**: String dates (automatically converted with `to_timestamp`) + +```scala +// Works with different column types +val timestampData = df.withColumn("ts", col("date_string").cast(TimestampType)) +val violations1 = RetentionDetector.apply(timestampData, "id", "ts", policy) + +val dateData = df.withColumn("dt", col("date_string").cast(DateType)) +val violations2 = RetentionDetector.apply(dateData, "id", "dt", policy) + +val stringData = df // date_string as is +val violations3 = RetentionDetector.apply(stringData, "id", "date_string", policy) +``` + +#### Integration Example: Automated Cleanup Pipeline + +```scala +import org.mitchelllisle.analysers.{RetentionDetector, RetentionPolicy} +import org.mitchelllisle.Anonymiser + +// Step 1: Detect violations +val policy = RetentionPolicy("GDPR", 365) +val (violations, summary) = RetentionDetector.detectViolationsWithSummary( + userData, "user_id", "last_activity_date", policy +) + +// Step 2: Generate action plan +val actionPlan = RetentionDetector.generateActionPlan(violations, 30) + +// Step 3: Handle based on action plan +val toDelete = actionPlan.filter(col("recommendedAction") === "Delete") +val toAnonymize = actionPlan.filter(col("recommendedAction") === "Anonymize") + +// Delete records (example) +val cleanedData = userData.join( + toDelete.select("customerId"), + userData("user_id") === toDelete("customerId"), + "left_anti" +) + +// Anonymize records (example) +val anonymiser = new Anonymiser("anonymization-config.yaml") +val anonymizedData = anonymiser.runAnonymisers( + userData.join(toAnonymize.select("customerId"), + userData("user_id") === toAnonymize("customerId")) +) + +// Step 4: Create audit proof +val afterProof = MerkleTree.apply(cleanedData, Seq("email", "age"), "user_id") +println(s"Cleanup completed. New data fingerprint: ${afterProof.rootHash}") +``` + ## **Merkle Tree Data Retention & Audit Trails** diff --git a/src/main/scala/org/mitchelllisle/analysers/RetentionDetector.scala b/src/main/scala/org/mitchelllisle/analysers/RetentionDetector.scala new file mode 100644 index 0000000..2702721 --- /dev/null +++ b/src/main/scala/org/mitchelllisle/analysers/RetentionDetector.scala @@ -0,0 +1,252 @@ +package org.mitchelllisle.analysers + +import org.apache.spark.sql.{DataFrame, functions => F} +import org.apache.spark.sql.types._ +import java.time.{Instant, LocalDate} +import java.time.temporal.ChronoUnit + +/** Represents a retention policy defining how long data should be kept. + * + * @param name A descriptive name for the policy (e.g., "GDPR_DEFAULT", "ACTIVE_USERS") + * @param retentionDays Number of days to retain data + * @param description Optional description of the policy + */ +case class RetentionPolicy( + name: String, + retentionDays: Int, + description: Option[String] = None +) + +/** Represents a customer or record that has exceeded its retention schedule. + * + * @param customerId The unique identifier for the customer/record + * @param daysPastRetention Number of days beyond the retention policy + * @param lastActivity The timestamp or date of last activity + * @param policyName The retention policy that was violated + */ +case class RetentionViolation( + customerId: String, + daysPastRetention: Long, + lastActivity: String, + policyName: String +) + +/** Configuration parameters for retention detection analysis. + * + * @param idColumn The column containing customer/record identifiers + * @param timestampColumn The column containing the timestamp or date to check against retention + * @param policyName The name of the retention policy to apply + */ +case class RetentionParams( + idColumn: String, + timestampColumn: String, + policyName: String +) extends AnalyserParams + +/** Analyzes datasets to detect records that exceed specified retention schedules. + * + * This analyzer helps ensure compliance with data retention policies (GDPR, CCPA, etc.) + * by identifying customers whose data should be deleted or anonymized. + */ +object RetentionDetector { + + /** Detects records that have exceeded their retention schedule. + * + * @param data The DataFrame to analyze + * @param idColumn The column containing unique identifiers + * @param timestampColumn The column containing timestamps or dates to check + * @param policy The retention policy to apply + * @param referenceDate Optional reference date for checking (defaults to current date) + * @return DataFrame containing records that exceed the retention policy + */ + def detectViolations( + data: DataFrame, + idColumn: String, + timestampColumn: String, + policy: RetentionPolicy, + referenceDate: Option[LocalDate] = None + ): DataFrame = { + val refDate = referenceDate.getOrElse(LocalDate.now()) + val retentionCutoffDate = refDate.minusDays(policy.retentionDays) + + // Convert reference date to timestamp for comparison + val cutoffTimestamp = F.lit(java.sql.Timestamp.valueOf(retentionCutoffDate.atStartOfDay())) + + // Determine the type of timestamp column and convert accordingly + val timestampExpr = data.schema(timestampColumn).dataType match { + case _: TimestampType => F.col(timestampColumn) + case _: DateType => F.col(timestampColumn).cast(TimestampType) + case _: StringType => F.to_timestamp(F.col(timestampColumn)) + case _ => throw new IllegalArgumentException( + s"Column '$timestampColumn' must be of type Timestamp, Date, or String" + ) + } + + // Calculate days past retention + val daysPastRetention = F.datediff( + F.lit(java.sql.Date.valueOf(retentionCutoffDate)), + timestampExpr + ) + + // Filter records that are past retention + data + .filter(timestampExpr < cutoffTimestamp) + .select( + F.col(idColumn).alias("customerId"), + daysPastRetention.alias("daysPastRetention"), + timestampExpr.cast(StringType).alias("lastActivity"), + F.lit(policy.name).alias("policyName") + ) + .filter(F.col("daysPastRetention") > 0) + } + + /** Detects violations and returns a summary with statistics. + * + * @param data The DataFrame to analyze + * @param idColumn The column containing unique identifiers + * @param timestampColumn The column containing timestamps or dates to check + * @param policy The retention policy to apply + * @param referenceDate Optional reference date for checking (defaults to current date) + * @return Tuple of (violations DataFrame, summary statistics) + */ + def detectViolationsWithSummary( + data: DataFrame, + idColumn: String, + timestampColumn: String, + policy: RetentionPolicy, + referenceDate: Option[LocalDate] = None + ): (DataFrame, RetentionSummary) = { + val violations = detectViolations(data, idColumn, timestampColumn, policy, referenceDate) + val totalRecords = data.count() + val violationCount = violations.count() + val avgDaysPast = if (violationCount > 0) { + violations.select(F.avg("daysPastRetention")).first().getDouble(0) + } else { + 0.0 + } + + val summary = RetentionSummary( + policyName = policy.name, + totalRecords = totalRecords, + violationsFound = violationCount, + averageDaysPastRetention = avgDaysPast, + retentionDays = policy.retentionDays + ) + + (violations, summary) + } + + /** Groups violations by time ranges for better insight. + * + * @param violations DataFrame containing retention violations + * @return DataFrame with violations grouped by time ranges + */ + def groupViolationsByRange(violations: DataFrame): DataFrame = { + violations + .withColumn("violationRange", + F.when(F.col("daysPastRetention") <= 30, "0-30 days") + .when(F.col("daysPastRetention") <= 90, "31-90 days") + .when(F.col("daysPastRetention") <= 180, "91-180 days") + .when(F.col("daysPastRetention") <= 365, "181-365 days") + .otherwise("Over 1 year") + ) + .groupBy("violationRange", "policyName") + .agg( + F.count("customerId").alias("recordCount"), + F.avg("daysPastRetention").alias("avgDaysPast"), + F.max("daysPastRetention").alias("maxDaysPast") + ) + .orderBy("violationRange") + } + + /** Applies the standard detection workflow with a single policy. + * + * @param data The DataFrame to analyze + * @param idColumn The column containing unique identifiers + * @param timestampColumn The column containing timestamps or dates to check + * @param policy The retention policy to apply + * @return DataFrame containing records that exceed the retention policy + */ + def apply( + data: DataFrame, + idColumn: String, + timestampColumn: String, + policy: RetentionPolicy + ): DataFrame = { + detectViolations(data, idColumn, timestampColumn, policy) + } + + /** Detects violations across multiple retention policies. + * + * @param data The DataFrame to analyze + * @param idColumn The column containing unique identifiers + * @param timestampColumn The column containing timestamps or dates to check + * @param policies Sequence of retention policies to check + * @return DataFrame containing all violations across all policies + */ + def detectMultiplePolicies( + data: DataFrame, + idColumn: String, + timestampColumn: String, + policies: Seq[RetentionPolicy] + ): DataFrame = { + policies.map { policy => + detectViolations(data, idColumn, timestampColumn, policy) + }.reduce(_ union _).distinct() + } + + /** Generates an anonymization/deletion action plan for violations. + * + * @param violations DataFrame containing retention violations + * @param actionThresholdDays Number of days past retention before action is recommended + * @return DataFrame with recommended actions + */ + def generateActionPlan( + violations: DataFrame, + actionThresholdDays: Int = 30 + ): DataFrame = { + violations + .withColumn("recommendedAction", + F.when(F.col("daysPastRetention") < actionThresholdDays, "Review") + .when(F.col("daysPastRetention") < actionThresholdDays * 2, "Anonymize") + .otherwise("Delete") + ) + .withColumn("priority", + F.when(F.col("daysPastRetention") >= actionThresholdDays * 3, "High") + .when(F.col("daysPastRetention") >= actionThresholdDays * 2, "Medium") + .otherwise("Low") + ) + } +} + +/** Summary statistics for retention detection results. + * + * @param policyName The name of the retention policy applied + * @param totalRecords Total number of records analyzed + * @param violationsFound Number of records exceeding retention + * @param averageDaysPastRetention Average days past the retention schedule + * @param retentionDays The retention period in days + */ +case class RetentionSummary( + policyName: String, + totalRecords: Long, + violationsFound: Long, + averageDaysPastRetention: Double, + retentionDays: Int +) { + def complianceRate: Double = { + if (totalRecords == 0) 100.0 + else ((totalRecords - violationsFound).toDouble / totalRecords) * 100.0 + } + + override def toString: String = { + s""" + |Retention Policy: $policyName + |Retention Period: $retentionDays days + |Total Records: $totalRecords + |Violations Found: $violationsFound + |Compliance Rate: ${f"$complianceRate%.2f"}% + |Average Days Past Retention: ${f"$averageDaysPastRetention%.1f"} + |""".stripMargin + } +} diff --git a/src/test/resources/retentionConfig.yaml b/src/test/resources/retentionConfig.yaml new file mode 100644 index 0000000..acc783d --- /dev/null +++ b/src/test/resources/retentionConfig.yaml @@ -0,0 +1,12 @@ +catalog: 'main' +schema: 'customer_data' +table: 'user_activity' +anonymise: [] +analyse: + - type: 'retention' + parameters: + idColumn: 'user_id' + timestampColumn: 'last_activity_date' + policyName: 'GDPR_DEFAULT' + retentionDays: '365' + description: 'GDPR default 1-year retention policy' diff --git a/src/test/scala/RetentionDetectorTest.scala b/src/test/scala/RetentionDetectorTest.scala new file mode 100644 index 0000000..ea5888d --- /dev/null +++ b/src/test/scala/RetentionDetectorTest.scala @@ -0,0 +1,279 @@ +import org.mitchelllisle.analysers.{RetentionDetector, RetentionPolicy} +import org.scalatest.flatspec.AnyFlatSpec +import java.time.LocalDate + +class RetentionDetectorTest extends AnyFlatSpec with SparkFunSuite { + import spark.implicits._ + + // Test data with various timestamps + val testDate = LocalDate.of(2024, 1, 1) + + val testData = Seq( + ("user1", "2023-12-15"), // 17 days old (at test date) + ("user2", "2023-11-01"), // 61 days old + ("user3", "2023-09-01"), // 122 days old + ("user4", "2023-06-01"), // 214 days old + ("user5", "2023-01-01"), // 365 days old + ("user6", "2022-06-01") // 579 days old + ).toDF("customer_id", "last_activity_date") + + val policy30Days = RetentionPolicy("30_DAY_POLICY", 30, Some("Retain data for 30 days")) + val policy90Days = RetentionPolicy("90_DAY_POLICY", 90, Some("Retain data for 90 days")) + val policy365Days = RetentionPolicy("365_DAY_POLICY", 365, Some("Retain data for 1 year")) + + "detectViolations" should "identify records past retention date" in { + val violations = RetentionDetector.detectViolations( + testData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + ) + + assert(violations.count() == 5) // All except user1 (17 days old) + + val customerIds = violations.select("customerId").collect().map(_.getString(0)).toSet + assert(!customerIds.contains("user1")) + assert(customerIds.contains("user2")) + assert(customerIds.contains("user6")) + } + + "detectViolations" should "calculate correct days past retention" in { + val violations = RetentionDetector.detectViolations( + testData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + ) + + val user2Violation = violations.filter($"customerId" === "user2").first() + // user2 is 61 days old, policy is 30 days, so 31 days past retention + assert(user2Violation.getInt(user2Violation.fieldIndex("daysPastRetention")) == 31) + } + + "detectViolations" should "include policy name in results" in { + val violations = RetentionDetector.detectViolations( + testData, + "customer_id", + "last_activity_date", + policy90Days, + Some(testDate) + ) + + violations.collect().foreach { row => + assert(row.getString(row.fieldIndex("policyName")) == "90_DAY_POLICY") + } + } + + "detectViolations" should "handle different retention periods correctly" in { + val violations30 = RetentionDetector.detectViolations( + testData, "customer_id", "last_activity_date", policy30Days, Some(testDate) + ) + val violations90 = RetentionDetector.detectViolations( + testData, "customer_id", "last_activity_date", policy90Days, Some(testDate) + ) + val violations365 = RetentionDetector.detectViolations( + testData, "customer_id", "last_activity_date", policy365Days, Some(testDate) + ) + + assert(violations30.count() > violations90.count()) + assert(violations90.count() > violations365.count()) + } + + "detectViolations" should "handle empty results when no violations exist" in { + val recentData = Seq( + ("user1", "2023-12-31"), + ("user2", "2023-12-30") + ).toDF("customer_id", "last_activity_date") + + val violations = RetentionDetector.detectViolations( + recentData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + ) + + assert(violations.count() == 0) + } + + "detectViolationsWithSummary" should "return violations and summary statistics" in { + val (violations, summary) = RetentionDetector.detectViolationsWithSummary( + testData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + ) + + assert(violations.count() == 5) + assert(summary.totalRecords == 6) + assert(summary.violationsFound == 5) + assert(summary.policyName == "30_DAY_POLICY") + assert(summary.retentionDays == 30) + assert(summary.averageDaysPastRetention > 0) + } + + "RetentionSummary" should "calculate compliance rate correctly" in { + val summary = RetentionDetector.detectViolationsWithSummary( + testData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + )._2 + + val expectedComplianceRate = ((6.0 - 5.0) / 6.0) * 100.0 + assert(math.abs(summary.complianceRate - expectedComplianceRate) < 0.01) + } + + "groupViolationsByRange" should "categorize violations correctly" in { + val violations = RetentionDetector.detectViolations( + testData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + ) + + val grouped = RetentionDetector.groupViolationsByRange(violations) + + assert(grouped.count() > 0) + val ranges = grouped.select("violationRange").collect().map(_.getString(0)).toSet + assert(ranges.contains("0-30 days") || ranges.contains("31-90 days")) + } + + "apply" should "work as shorthand for detectViolations" in { + val violations1 = RetentionDetector.apply( + testData, + "customer_id", + "last_activity_date", + policy30Days + ) + + val violations2 = RetentionDetector.detectViolations( + testData, + "customer_id", + "last_activity_date", + policy30Days, + None + ) + + assert(violations1.count() == violations2.count()) + } + + "detectMultiplePolicies" should "detect violations across multiple policies" in { + val policies = Seq(policy30Days, policy90Days, policy365Days) + + val violations = RetentionDetector.detectMultiplePolicies( + testData, + "customer_id", + "last_activity_date", + policies + ) + + // Should have violations from all policies, but deduplicated + assert(violations.count() > 0) + + val policyNames = violations.select("policyName").distinct().collect().map(_.getString(0)).toSet + assert(policyNames.size <= 3) + } + + "generateActionPlan" should "add recommended actions based on thresholds" in { + val violations = RetentionDetector.detectViolations( + testData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + ) + + val actionPlan = RetentionDetector.generateActionPlan(violations, actionThresholdDays = 30) + + assert(actionPlan.columns.contains("recommendedAction")) + assert(actionPlan.columns.contains("priority")) + + val actions = actionPlan.select("recommendedAction").distinct().collect().map(_.getString(0)).toSet + assert(actions.nonEmpty) + } + + "generateActionPlan" should "assign correct priority levels" in { + val violations = RetentionDetector.detectViolations( + testData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + ) + + val actionPlan = RetentionDetector.generateActionPlan(violations, actionThresholdDays = 30) + + val priorities = actionPlan.select("priority").distinct().collect().map(_.getString(0)).toSet + assert(priorities.exists(p => Seq("High", "Medium", "Low").contains(p))) + } + + "detectViolations" should "handle timestamp column type" in { + val timestampData = Seq( + ("user1", java.sql.Timestamp.valueOf("2023-11-01 10:00:00")), + ("user2", java.sql.Timestamp.valueOf("2023-06-01 15:30:00")) + ).toDF("customer_id", "last_activity_ts") + + val violations = RetentionDetector.detectViolations( + timestampData, + "customer_id", + "last_activity_ts", + policy30Days, + Some(testDate) + ) + + assert(violations.count() == 2) + } + + "detectViolations" should "handle date column type" in { + val dateData = Seq( + ("user1", java.sql.Date.valueOf("2023-11-01")), + ("user2", java.sql.Date.valueOf("2023-06-01")) + ).toDF("customer_id", "last_activity_date") + + val violations = RetentionDetector.detectViolations( + dateData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + ) + + assert(violations.count() == 2) + } + + "RetentionSummary toString" should "format output correctly" in { + val summary = RetentionDetector.detectViolationsWithSummary( + testData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + )._2 + + val output = summary.toString + assert(output.contains("30_DAY_POLICY")) + assert(output.contains("Total Records")) + assert(output.contains("Compliance Rate")) + } + + "detectViolations" should "return correct lastActivity format" in { + val violations = RetentionDetector.detectViolations( + testData, + "customer_id", + "last_activity_date", + policy30Days, + Some(testDate) + ) + + violations.collect().foreach { row => + val lastActivity = row.getString(row.fieldIndex("lastActivity")) + assert(lastActivity.nonEmpty) + } + } +} From f30bb98bff51e0b205e9e74fc369d32fb703543a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 12 Oct 2025 00:57:25 +0000 Subject: [PATCH 3/4] Add complete retention pipeline example and update documentation Co-authored-by: mitchelllisle <18128531+mitchelllisle@users.noreply.github.com> --- README.md | 4 + .../examples/RetentionPipelineExample.scala | 232 ++++++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 src/main/scala/org/mitchelllisle/examples/RetentionPipelineExample.scala diff --git a/README.md b/README.md index cb30b67..fd78a0f 100644 --- a/README.md +++ b/README.md @@ -394,6 +394,10 @@ val afterProof = MerkleTree.apply(cleanedData, Seq("email", "age"), "user_id") println(s"Cleanup completed. New data fingerprint: ${afterProof.rootHash}") ``` +> [!TIP] +> For a complete end-to-end example including deletion verification and audit logging, see +> `src/main/scala/org/mitchelllisle/examples/RetentionPipelineExample.scala` + ## **Merkle Tree Data Retention & Audit Trails** diff --git a/src/main/scala/org/mitchelllisle/examples/RetentionPipelineExample.scala b/src/main/scala/org/mitchelllisle/examples/RetentionPipelineExample.scala new file mode 100644 index 0000000..92800e0 --- /dev/null +++ b/src/main/scala/org/mitchelllisle/examples/RetentionPipelineExample.scala @@ -0,0 +1,232 @@ +package org.mitchelllisle.examples + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.functions._ +import org.mitchelllisle.analysers.{RetentionDetector, RetentionPolicy, MerkleTree} +import org.mitchelllisle.Anonymiser +import java.time.LocalDate + +/** Example pipeline demonstrating data retention detection and automated cleanup. + * + * This example shows how to: + * 1. Detect retention violations + * 2. Generate action plans + * 3. Apply anonymization or deletion + * 4. Create audit trails + */ +object RetentionPipelineExample { + + def main(args: Array[String]): Unit = { + val spark = SparkSession.builder() + .appName("RetentionPipelineExample") + .master("local[*]") + .getOrCreate() + + import spark.implicits._ + + // Step 1: Load customer data + println("=== Step 1: Loading Customer Data ===") + val customerData = spark.read + .option("header", "true") + .csv("path/to/customer-data.csv") + + val totalRecords = customerData.count() + println(s"Loaded $totalRecords customer records") + + // Step 2: Define retention policies + println("\n=== Step 2: Defining Retention Policies ===") + val policies = Seq( + RetentionPolicy( + name = "GDPR_INACTIVE_USERS", + retentionDays = 365, + description = Some("GDPR: Delete inactive users after 1 year") + ), + RetentionPolicy( + name = "CCPA_MARKETING", + retentionDays = 180, + description = Some("CCPA: Marketing data retention 6 months") + ) + ) + + policies.foreach(p => println(s" - ${p.name}: ${p.retentionDays} days")) + + // Step 3: Detect violations for primary policy + println("\n=== Step 3: Detecting Retention Violations ===") + val primaryPolicy = policies.head + val (violations, summary) = RetentionDetector.detectViolationsWithSummary( + customerData, + idColumn = "customer_id", + timestampColumn = "last_activity_date", + policy = primaryPolicy + ) + + println(summary) + + if (violations.count() == 0) { + println("✓ No violations found. All data is within retention policy.") + spark.stop() + return + } + + // Step 4: Analyze violation patterns + println("\n=== Step 4: Analyzing Violation Patterns ===") + val violationsByRange = RetentionDetector.groupViolationsByRange(violations) + println("Violations by time range:") + violationsByRange.show(false) + + // Step 5: Generate action plan + println("\n=== Step 5: Generating Action Plan ===") + val actionPlan = RetentionDetector.generateActionPlan( + violations, + actionThresholdDays = 30 + ) + + val actionSummary = actionPlan + .groupBy("recommendedAction", "priority") + .count() + .orderBy("priority", "recommendedAction") + + println("Action plan summary:") + actionSummary.show(false) + + // Step 6: Create "before" audit proof + println("\n=== Step 6: Creating Pre-Cleanup Audit Proof ===") + val beforeProof = MerkleTree.apply( + customerData, + columns = Seq("customer_id", "email", "last_activity_date"), + idColumn = "customer_id" + ) + println(s"Before cleanup - Root hash: ${beforeProof.rootHash}") + println(s"Before cleanup - Record count: ${beforeProof.recordCount}") + + // Step 7: Execute cleanup based on action plan + println("\n=== Step 7: Executing Cleanup Actions ===") + + // 7a. Delete high-priority records + val toDelete = actionPlan + .filter($"recommendedAction" === "Delete" && $"priority" === "High") + .select("customerId") + + val afterDeletion = customerData.join( + toDelete, + customerData("customer_id") === toDelete("customerId"), + "left_anti" + ) + + val deletedCount = totalRecords - afterDeletion.count() + println(s" Deleted $deletedCount high-priority records") + + // 7b. Anonymize medium-priority records + val toAnonymize = actionPlan + .filter($"recommendedAction" === "Anonymize") + .select("customerId") + .collect() + .map(_.getString(0)) + + val afterAnonymization = if (toAnonymize.nonEmpty) { + // Apply anonymization transformations + val anonymized = afterDeletion + .withColumn("email", + when($"customer_id".isin(toAnonymize: _*), + concat(lit("anon_"), sha2($"email", 256))) + .otherwise($"email") + ) + .withColumn("phone", + when($"customer_id".isin(toAnonymize: _*), lit("REDACTED")) + .otherwise($"phone") + ) + + println(s" Anonymized ${toAnonymize.length} medium-priority records") + anonymized + } else { + afterDeletion + } + + // Step 8: Create "after" audit proof and verify + println("\n=== Step 8: Creating Post-Cleanup Audit Proof ===") + val afterProof = MerkleTree.apply( + afterAnonymization, + columns = Seq("customer_id", "email", "last_activity_date"), + idColumn = "customer_id" + ) + println(s"After cleanup - Root hash: ${afterProof.rootHash}") + println(s"After cleanup - Record count: ${afterProof.recordCount}") + + // Step 9: Verify deletion if records were deleted + if (deletedCount > 0) { + println("\n=== Step 9: Verifying Deletions ===") + val deletedIds = toDelete.collect().map(_.getString(0)) + val deletionProof = MerkleTree.verifyDeletion( + beforeData = customerData, + afterData = afterAnonymization, + deletedIds = deletedIds, + columns = Seq("customer_id", "email", "last_activity_date"), + idColumn = "customer_id" + ) + + val isValid = MerkleTree.validateDeletionProof( + deletionProof, + expectedDeletions = deletedCount.toInt + ) + + if (isValid) { + println("✓ Deletion cryptographically verified") + } else { + println("✗ Deletion verification failed!") + } + } + + // Step 10: Save cleaned data and audit logs + println("\n=== Step 10: Saving Results ===") + afterAnonymization.write + .mode("overwrite") + .parquet("output/cleaned-customer-data") + + // Save audit log + val auditLog = Seq( + ( + beforeProof.rootHash, + afterProof.rootHash, + beforeProof.recordCount, + afterProof.recordCount, + deletedCount, + toAnonymize.length, + java.time.Instant.now().toString + ) + ).toDF( + "before_hash", + "after_hash", + "before_count", + "after_count", + "deleted_count", + "anonymized_count", + "timestamp" + ) + + auditLog.write + .mode("append") + .parquet("output/retention-audit-log") + + println("✓ Cleaned data saved to: output/cleaned-customer-data") + println("✓ Audit log saved to: output/retention-audit-log") + + // Step 11: Re-check compliance + println("\n=== Step 11: Verifying Compliance ===") + val (finalViolations, finalSummary) = RetentionDetector.detectViolationsWithSummary( + afterAnonymization, + idColumn = "customer_id", + timestampColumn = "last_activity_date", + policy = primaryPolicy + ) + + println(finalSummary) + + if (finalViolations.count() == 0) { + println("✓ SUCCESS: All data now complies with retention policy!") + } else { + println(s"⚠ WARNING: ${finalViolations.count()} violations remain (likely low-priority)") + } + + spark.stop() + } +} From 0ada0b3d9803ae901cce62660e38035cca320593 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 12 Oct 2025 01:00:02 +0000 Subject: [PATCH 4/4] Add comprehensive quick start guide for retention detection Co-authored-by: mitchelllisle <18128531+mitchelllisle@users.noreply.github.com> --- docs/RETENTION_DETECTION_GUIDE.md | 250 ++++++++++++++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 docs/RETENTION_DETECTION_GUIDE.md diff --git a/docs/RETENTION_DETECTION_GUIDE.md b/docs/RETENTION_DETECTION_GUIDE.md new file mode 100644 index 0000000..b05a197 --- /dev/null +++ b/docs/RETENTION_DETECTION_GUIDE.md @@ -0,0 +1,250 @@ +# Data Retention Detection - Quick Start Guide + +This guide shows how to use the RetentionDetector feature to identify and handle data that exceeds retention policies. + +## Basic Usage + +### 1. Define a Retention Policy + +```scala +import org.mitchelllisle.analysers.RetentionPolicy + +val gdprPolicy = RetentionPolicy( + name = "GDPR_DEFAULT", + retentionDays = 365, + description = Some("GDPR 1-year retention policy") +) +``` + +### 2. Detect Violations + +```scala +import org.mitchelllisle.analysers.RetentionDetector + +val violations = RetentionDetector.detectViolations( + data = customerData, + idColumn = "customer_id", + timestampColumn = "last_activity_date", + policy = gdprPolicy +) + +violations.show() +``` + +### 3. Get Summary Statistics + +```scala +val (violations, summary) = RetentionDetector.detectViolationsWithSummary( + customerData, + "customer_id", + "last_activity_date", + gdprPolicy +) + +println(summary) +// Output: +// Retention Policy: GDPR_DEFAULT +// Total Records: 10000 +// Violations Found: 1234 +// Compliance Rate: 87.66% +``` + +## Advanced Usage + +### Multiple Policies + +```scala +val policies = Seq( + RetentionPolicy("30_DAY", 30), + RetentionPolicy("90_DAY", 90), + RetentionPolicy("1_YEAR", 365) +) + +val allViolations = RetentionDetector.detectMultiplePolicies( + customerData, + "customer_id", + "last_activity_date", + policies +) +``` + +### Action Planning + +```scala +val actionPlan = RetentionDetector.generateActionPlan( + violations, + actionThresholdDays = 30 +) + +// Shows: customerId, daysPastRetention, recommendedAction, priority +actionPlan.show() +``` + +### Violation Analysis + +```scala +val violationsByRange = RetentionDetector.groupViolationsByRange(violations) + +// Shows violations grouped by time ranges: +// 0-30 days, 31-90 days, 91-180 days, 181-365 days, Over 1 year +violationsByRange.show() +``` + +## Integration with Audit Trails + +```scala +import org.mitchelllisle.analysers.MerkleTree + +// Before cleanup +val beforeProof = MerkleTree.apply( + customerData, + Seq("customer_id", "email", "last_activity_date"), + "customer_id" +) + +// Perform cleanup based on violations +val cleanedData = customerData.join( + violations.select("customerId"), + customerData("customer_id") === violations("customerId"), + "left_anti" +) + +// After cleanup +val afterProof = MerkleTree.apply( + cleanedData, + Seq("customer_id", "email", "last_activity_date"), + "customer_id" +) + +// Verify deletion +val deletionProof = MerkleTree.verifyDeletion( + beforeData = customerData, + afterData = cleanedData, + deletedIds = violations.select("customerId").collect().map(_.getString(0)), + columns = Seq("customer_id", "email", "last_activity_date"), + idColumn = "customer_id" +) +``` + +## Common Use Cases + +### 1. GDPR Compliance Check + +```scala +val gdprPolicy = RetentionPolicy("GDPR", 365) +val (violations, summary) = RetentionDetector.detectViolationsWithSummary( + userData, "user_id", "last_login", gdprPolicy +) + +if (summary.complianceRate < 95.0) { + println(s"WARNING: Compliance rate is ${summary.complianceRate}%") +} +``` + +### 2. Marketing Data Cleanup + +```scala +val marketingPolicy = RetentionPolicy("MARKETING", 180) +val violations = RetentionDetector.apply( + marketingData, + "contact_id", + "consent_date", + marketingPolicy +) + +val toDelete = violations.filter($"daysPastRetention" > 30) +``` + +### 3. Scheduled Audit Report + +```scala +val policies = Seq( + RetentionPolicy("ACTIVE_USERS", 90), + RetentionPolicy("INACTIVE_USERS", 365), + RetentionPolicy("DELETED_USERS", 30) +) + +policies.foreach { policy => + val (violations, summary) = RetentionDetector.detectViolationsWithSummary( + userData, "user_id", "last_activity", policy + ) + + println(summary) + + // Save report + violations + .withColumn("report_date", current_timestamp()) + .write + .mode("append") + .parquet(s"reports/retention-violations/${policy.name}") +} +``` + +## Column Type Support + +The RetentionDetector automatically handles different column types: + +```scala +// Timestamp columns +val tsData = df.select($"id", $"timestamp".cast(TimestampType)) +RetentionDetector.apply(tsData, "id", "timestamp", policy) + +// Date columns +val dateData = df.select($"id", $"date".cast(DateType)) +RetentionDetector.apply(dateData, "id", "date", policy) + +// String columns (automatically converted) +val stringData = df.select($"id", $"date_string") +RetentionDetector.apply(stringData, "id", "date_string", policy) +``` + +## Best Practices + +1. **Start with analysis**: Use `detectViolationsWithSummary()` to understand the scope before taking action +2. **Use action plans**: Generate action plans with thresholds to prioritize critical violations +3. **Create audit trails**: Always use MerkleTree before and after cleanup for compliance proof +4. **Test with recent date**: Pass a custom reference date for testing: `referenceDate = Some(LocalDate.of(2024, 1, 1))` +5. **Monitor compliance**: Track compliance rates over time to identify trends +6. **Automate workflows**: Integrate with scheduled Spark jobs for continuous monitoring + +## Performance Tips + +- Use partitioning on timestamp columns for faster filtering +- Cache the violations DataFrame if using it multiple times +- Use `limit()` when testing with large datasets +- Consider using Spark's broadcast join for small violation sets + +## Troubleshooting + +### Issue: No violations detected when expected + +```scala +// Check the data types +customerData.printSchema() + +// Verify the timestamp column values +customerData.select("last_activity_date").show() + +// Check the reference date (defaults to current date) +val violations = RetentionDetector.detectViolations( + customerData, "id", "timestamp", policy, + referenceDate = Some(LocalDate.now()) +) +``` + +### Issue: Column type error + +Ensure your timestamp column is one of: +- TimestampType +- DateType +- StringType (will be auto-converted) + +Convert if needed: +```scala +val converted = df.withColumn("ts", to_timestamp($"date_string")) +``` + +## Next Steps + +For a complete end-to-end example with deletion, anonymization, and audit trails, see: +`src/main/scala/org/mitchelllisle/examples/RetentionPipelineExample.scala`