diff --git a/.gitignore b/.gitignore index 1ff8666..354cc45 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ Cargo.lock **/*.rs.bk /.idea +/example/target/ +/.cl* diff --git a/Cargo.toml b/Cargo.toml index 2165153..58fba38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "schema_guard" -version = "1.6.2" +name = "schema_guard_tokio" +version = "1.10.0" authors = ["V.Krinitsyn "] edition = "2018" description = "Schema Guard: Relation Database (Schema) Management tool" @@ -11,17 +11,12 @@ repository = "https://github.com/vkrinitsyn/schema_guard" license = "MIT" [lib] -name = "schema_guard" +name = "schema_guard_tokio" path = "src/lib.rs" [dependencies] slog = { version = "^2.7.0", features=["max_level_debug"] } -slog-async = "^2.6.0" -slog-envlogger = "^2.2.0" -slog-stdlog = "^4.1.0" -slog-term = "^2.8.0" -sloggers = "^2.0.0" lazy_static = "^1.4.0" postgres = { version = "^0.19.1", features=["with-chrono-0_4", "with-time-0_2"] } @@ -35,13 +30,14 @@ yaml-rust = "^0.4.5" yaml-validator = "0.2.0" -postgres-native-tls = { version = "^0.5.0", optional = true } -native-tls = { version = "^0.2.11", optional = true } - -bb8 = { version = "0.8.3", optional = true } -bb8-postgres = {version = "0.8.0", optional = true} -tokio = { version = "^1.36.0", optional = true } -tokio-postgres = { version = "^0.7.1", optional = true } +tokio-postgres-rustls = { version = "^0.12.0" } +rustls = { version = "^0.23.0" } +bb8 = { version = "0.8.3"} +bb8-postgres = {version = "0.8.0", features = ["with-chrono-0_4","with-uuid-0_8"]} +tokio = { version = "^1.36.0", features = ["test-util", "full"]} +tokio-postgres = { version = "^0.7.1"} [features] +#default = ["slog"] +# show extra messages when do SQL execution using slog logger slog = [] diff --git a/README.md b/README.md index 351a3bd..d306eda 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ database: - column: name: test type: varchar(250) + index: true triggers: - trigger: name: uniq_name_of_trigger @@ -38,11 +39,13 @@ database: when: for each row proc: ``` +See [examples](test/example.yaml) and [template](test/example_template.yaml) One line of code: ```rust - let _ = schema_guard::migrate1(schema_guard::load_schema_from_file("file.yaml").unwrap(), &mut db)?; + let _ = schema_guard::migrate1(schema_guard::load_schema_from_file("file.yaml").unwrap(), "postgresql://")?; + ``` Will create or upgrade existing Postgres database schema with desired tables without extra table creation. @@ -52,5 +55,3 @@ Will create or upgrade existing Postgres database schema with desired tables wit Not recommended to integrate schema migrate into application for production use as such violate security concern and best practices. -Please consider to use full-featured [SchemaGuard](https://www.dbinvent.com/rdbm/) (free for personal use) - diff --git a/TODO.md b/TODO.md deleted file mode 100644 index cec03b3..0000000 --- a/TODO.md +++ /dev/null @@ -1,11 +0,0 @@ -### TODO - The Postgres schema Management tool. - -- Complete Tokio async bb8 support -- Change data type - extend sizes only -- Composite Primary Key support -- Indexes -- Table patterns in yaml to simplify table definition -- Table patterns recognition -- Diesel schema import -- Partition table definition in yaml format - diff --git a/src/column.rs b/src/column.rs index 35115d1..778a895 100644 --- a/src/column.rs +++ b/src/column.rs @@ -26,6 +26,7 @@ impl Default for Column { description: "".to_string(), sql: "".to_string(), index: None, + partition_by: None, } } } @@ -46,12 +47,34 @@ pub struct Column { pub sql: String, #[serde(skip_serializing_if = "Option::is_none")] pub index: Option, + /// Partitioning method: RANGE, LIST, or HASH + #[serde(skip_serializing_if = "Option::is_none")] + pub partition_by: Option, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, PartialEq)] pub struct Index { #[serde(skip_serializing_if = "String::is_empty")] pub name: String, + /// UNIQUE index + #[serde(skip_serializing_if = "Option::is_none")] + pub unique: Option, + /// CREATE INDEX CONCURRENTLY + #[serde(skip_serializing_if = "Option::is_none")] + pub concurrently: Option, + /// Index method: btree, hash, gist, spgist, gin, brin + #[serde(skip_serializing_if = "String::is_empty")] + pub using: String, + /// ASC or DESC + #[serde(skip_serializing_if = "String::is_empty")] + pub order: String, + /// NULLS FIRST or NULLS LAST + #[serde(skip_serializing_if = "String::is_empty")] + pub nulls: String, + /// COLLATE collation + #[serde(skip_serializing_if = "String::is_empty")] + pub collate: String, + /// Additional SQL (deprecated, use specific fields instead) #[serde(skip_serializing_if = "String::is_empty")] pub sql: String, } @@ -112,6 +135,28 @@ impl Column { None }; let index = &input["index"]; + // Parse index: can be boolean (true = default index) or object (full config) + // - index: true → create index with auto-generated name + // - index: { name: "+" } or { name: "my_idx" } → create index + // - index: false, index: {}, or not present → no index + let index = if index.is_null() { + None + } else if let Some(b) = index.as_bool() { + if b { + Some(Index::default()) // index: true creates index with auto-generated name + } else { + None // index: false means no index + } + } else { + Index::new(index) // index: { ... } returns Some only if name is set + }; + + let partition_by_val = crate::utils::as_str_esc(input, "partition_by"); + let partition_by = if partition_by_val.is_empty() { + None + } else { + Some(partition_by_val.to_uppercase()) + }; Column { name: crate::utils::safe_sql_name(crate::utils::as_str_esc(input, "name")), @@ -120,11 +165,8 @@ impl Column { description: crate::utils::as_str_esc(input, "description"), sql: crate::utils::as_str_esc(input, "sql"), constraint, - index: if index.is_null() { - None - } else { - Some(Index::new(index)) - }, + index, + partition_by, } } @@ -147,6 +189,7 @@ impl Column { sql: "".to_string(), constraint, index: None, + partition_by: None, } } @@ -210,13 +253,81 @@ impl Trig { None } } + + /// Convert to PgTrigger for storage in dbc + pub(crate) fn to_pg_trigger(&self) -> crate::loader::PgTrigger { + crate::loader::PgTrigger { + trigger_name: self.name.clone(), + event: self.event.to_uppercase(), + orientation: self.when.to_uppercase(), + proc: self.proc.clone(), + } + } + + /// Check if this trigger matches an existing PgTrigger from the database + pub(crate) fn matches_pg_trigger(&self, existing: &crate::loader::PgTrigger) -> bool { + // Normalize for comparison + let self_event = self.event.to_uppercase(); + let self_when = self.when.to_uppercase(); + + // Compare event (BEFORE INSERT, AFTER UPDATE, etc.) + if self_event != existing.event { + return false; + } + + // Compare orientation (FOR EACH ROW, FOR EACH STATEMENT) + if self_when != existing.orientation { + return false; + } + + // Compare procedure - existing includes schema, self might not + // Handle both "schema.func()" and "func()" formats + let self_proc = self.proc.trim(); + let existing_proc = existing.proc.trim(); + + // If self_proc doesn't have schema prefix, check if existing ends with it + if self_proc.contains('.') { + self_proc == existing_proc + } else { + // self_proc is just "func()", existing is "schema.func()" + existing_proc.ends_with(self_proc) || existing_proc == self_proc + } + } } -impl Index { - pub(crate) fn new(input: &Yaml) -> Self { +impl Default for Index { + fn default() -> Self { Index { - name: crate::utils::as_str_esc(input, "name"), - sql: crate::utils::as_str_esc(input, "sql"), + name: "+".to_string(), // "+" triggers auto-generated index name + unique: None, + concurrently: None, + using: String::new(), + order: String::new(), + nulls: String::new(), + collate: String::new(), + sql: String::new(), + } + } +} + +impl Index { + pub(crate) fn new(input: &Yaml) -> Option { + let name = crate::utils::as_str_esc(input, "name"); + // If name is empty, no index is created (must explicitly set name or use index: true) + if name.is_empty() { + return None; } + let unique_val = crate::utils::as_bool(input, "unique", false); + let concurrently_val = crate::utils::as_bool(input, "concurrently", false); + Some(Index { + name, + unique: if unique_val { Some(true) } else { None }, + concurrently: if concurrently_val { Some(true) } else { None }, + using: crate::utils::as_str_esc(input, "using").to_lowercase(), + order: crate::utils::as_str_esc(input, "order").to_uppercase(), + nulls: crate::utils::as_str_esc(input, "nulls").to_uppercase(), + collate: crate::utils::as_str_esc(input, "collate"), + sql: crate::utils::as_str_esc(input, "sql"), + }) } } \ No newline at end of file diff --git a/src/grant.rs b/src/grant.rs new file mode 100644 index 0000000..4d6a3ae --- /dev/null +++ b/src/grant.rs @@ -0,0 +1,201 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt::Write; + +use crate::loader::{InfoSchemaType, PgGrant}; +use crate::table::YGrant; +use crate::MigrationOptions; + +/// Collects and generates GRANT/REVOKE statements for table grants +pub struct GrantBuilder<'a> { + grants: &'a Vec, + table_name: String, +} + +impl<'a> GrantBuilder<'a> { + pub fn new(grants: &'a Vec, table_name: &str) -> Self { + GrantBuilder { + grants, + table_name: table_name.to_string(), + } + } + + /// Generate GRANT/REVOKE SQL statements and update dbc with grants + pub fn generate_sql( + &self, + schema: &str, + dbc: &mut InfoSchemaType, + opt: &MigrationOptions, + ) -> Result { + let mut grants_sql = String::new(); + let mut skipped_sql = String::new(); + + // Get existing grants from dbc + let existing_grants: HashMap = if let Some(ss) = dbc.get(schema) { + if let Some(ts) = ss.get(&self.table_name) { + ts.grants.clone() + } else { + HashMap::new() + } + } else { + HashMap::new() + }; + + // Build desired grants from YAML + let mut desired_grants: HashMap> = HashMap::new(); + let mut grant_options: HashMap = HashMap::new(); + + for yg in self.grants { + // Process each privilege type + let privileges = [ + ("all", &yg.all), + ("SELECT", &yg.select), + ("INSERT", &yg.insert), + ("UPDATE", &yg.update), + ("DELETE", &yg.delete), + ("TRUNCATE", &yg.truncate), + ("REFERENCES", &yg.references), + ("TRIGGER", &yg.trigger), + ]; + + for (priv_name, grantee) in privileges { + if !grantee.is_empty() { + let entry = desired_grants.entry(grantee.clone()).or_insert_with(HashSet::new); + if priv_name == "all" { + // ALL expands to all table privileges + entry.insert("SELECT".to_string()); + entry.insert("INSERT".to_string()); + entry.insert("UPDATE".to_string()); + entry.insert("DELETE".to_string()); + entry.insert("TRUNCATE".to_string()); + entry.insert("REFERENCES".to_string()); + entry.insert("TRIGGER".to_string()); + } else { + entry.insert(priv_name.to_string()); + } + if yg.with_grant_option { + grant_options.insert(grantee.clone(), true); + } + } + } + } + + // Compare and generate REVOKE/GRANT statements + for (grantee, desired_privs) in &desired_grants { + let existing = existing_grants.get(grantee); + + // Determine what needs to be granted (new privileges) + let privs_to_grant: Vec<&String> = match existing { + None => desired_privs.iter().collect(), + Some(ex) => desired_privs.difference(&ex.privileges).collect(), + }; + + // Determine what needs to be revoked (removed privileges) + let privs_to_revoke: Vec<&String> = match existing { + None => vec![], + Some(ex) => ex.privileges.difference(desired_privs).collect(), + }; + + // Generate REVOKE statements for changed privileges + if !privs_to_revoke.is_empty() { + let privs_str: Vec<&str> = privs_to_revoke.iter().map(|s| s.as_str()).collect(); + let revoke_stmt = format!( + "REVOKE {} ON {}.{} FROM {};\n", + privs_str.join(", "), + schema, + self.table_name, + grantee + ); + + if opt.with_revoke { + grants_sql.push_str(&revoke_stmt); + } else { + if opt.without_failfast { + // Show skipped SQL + let _ = writeln!(skipped_sql, "-- SKIPPED (with_revoke=false): {}", revoke_stmt.trim()); + } else { + return Err(format!( + "Grant changes detected for {} on {}.{} but without_failfast is enabled. SQL: {}", + grantee, schema, self.table_name, revoke_stmt.trim() + )); + } + } + } + + // Generate GRANT statements for new privileges + if !privs_to_grant.is_empty() { + let privs_str: Vec<&str> = privs_to_grant.iter().map(|s| s.as_str()).collect(); + let with_grant = if grant_options.get(grantee).unwrap_or(&false) == &true { + " WITH GRANT OPTION" + } else { + "" + }; + let _ = writeln!( + grants_sql, + "GRANT {} ON {}.{} TO {}{};", + privs_str.join(", "), + schema, + self.table_name, + grantee, + with_grant + ); + } + + // Update dbc with new grants (only if we're applying changes) + if opt.with_revoke || privs_to_revoke.is_empty() { + if let Some(ss) = dbc.get_mut(schema) { + if let Some(ts) = ss.get_mut(&self.table_name) { + ts.grants.insert(grantee.clone(), PgGrant { + grantee: grantee.clone(), + privileges: desired_privs.clone(), + with_grant_option: *grant_options.get(grantee).unwrap_or(&false), + }); + } + } + } + } + + // Handle grantees that exist in DB but not in YAML (revoke all) + for (grantee, existing) in &existing_grants { + if !desired_grants.contains_key(grantee) { + let privs_str: Vec<&str> = existing.privileges.iter().map(|s| s.as_str()).collect(); + if !privs_str.is_empty() { + let revoke_stmt = format!( + "REVOKE {} ON {}.{} FROM {};\n", + privs_str.join(", "), + schema, + self.table_name, + grantee + ); + + if opt.with_revoke { + grants_sql.push_str(&revoke_stmt); + // Remove from dbc + if let Some(ss) = dbc.get_mut(schema) { + if let Some(ts) = ss.get_mut(&self.table_name) { + ts.grants.remove(grantee); + } + } + } else { + if opt.without_failfast { + // Show skipped SQL + let _ = writeln!(skipped_sql, "-- SKIPPED (with_revoke=false): {}", revoke_stmt.trim()); + } else { + return Err(format!( + "Grant removal detected for {} on {}.{} but without_failfast is enabled. SQL: {}", + grantee, schema, self.table_name, revoke_stmt.trim() + )); + } + } + } + } + } + + // Log skipped SQL if any + if !skipped_sql.is_empty() { + #[cfg(not(feature = "slog"))] + eprintln!("Skipped grant changes:\n{}", skipped_sql); + } + + Ok(grants_sql) + } +} diff --git a/src/index.rs b/src/index.rs new file mode 100644 index 0000000..9a54d4e --- /dev/null +++ b/src/index.rs @@ -0,0 +1,368 @@ +use std::collections::HashMap; + +use crate::column::Column; +use crate::loader::{InfoSchemaType, PgIndex, PgIndexColumn}; +use crate::utils::OrderedHashMap; +use crate::MigrationOptions; + +/// Represents a desired index configuration collected from YAML +#[derive(Debug, Clone)] +pub struct DesiredIndex { + pub name: String, + pub columns: Vec, + pub is_unique: bool, + pub concurrently: bool, + pub using: String, +} + +/// Represents a column in a desired index +#[derive(Debug, Clone)] +pub struct DesiredIndexColumn { + pub column_name: String, + pub order: String, + pub nulls: String, + pub collate: String, +} + +/// Collects and generates CREATE INDEX statements for table columns +pub struct IndexBuilder { + /// index_name -> DesiredIndex + index_groups: HashMap, +} + +impl IndexBuilder { + /// Create a new IndexBuilder by collecting indexes from columns + /// - Columns with index name "+" or empty get individual indexes (auto-generated names) + /// - Columns with the same explicit index name are combined into a composite index + pub fn new(columns: &OrderedHashMap) -> Self { + let mut index_groups: HashMap = HashMap::new(); + let mut auto_index_counter = 0usize; + + for col in &columns.list { + if let Some(idx) = &col.index { + let col_info = DesiredIndexColumn { + column_name: col.name.clone(), + order: idx.order.clone(), + nulls: idx.nulls.clone(), + collate: idx.collate.clone(), + }; + + // Check if this is an auto-generated index (empty name or "+") + let is_auto_index = idx.name.is_empty() || idx.name == "+"; + + if is_auto_index { + // Create individual index for this column with unique key + auto_index_counter += 1; + let auto_key = format!("__auto_{}_{}", col.name, auto_index_counter); + index_groups.insert( + auto_key, + DesiredIndex { + name: "+".to_string(), // Will be auto-generated in generate_sql + columns: vec![col_info], + is_unique: idx.unique.unwrap_or(false) + || idx.sql.to_uppercase().contains("UNIQUE"), + concurrently: idx.concurrently.unwrap_or(false), + using: idx.using.clone(), + }, + ); + } else { + // Explicit index name - combine columns with same name + match index_groups.get_mut(&idx.name) { + Some(existing) => { + existing.columns.push(col_info); + } + None => { + index_groups.insert( + idx.name.clone(), + DesiredIndex { + name: idx.name.clone(), + columns: vec![col_info], + is_unique: idx.unique.unwrap_or(false) + || idx.sql.to_uppercase().contains("UNIQUE"), + concurrently: idx.concurrently.unwrap_or(false), + using: idx.using.clone(), + }, + ); + } + } + } + } + } + + IndexBuilder { index_groups } + } + + /// Generate CREATE/DROP INDEX SQL statements and update dbc with created indexes + pub fn generate_sql( + &self, + schema: &str, + table_name: &str, + dbc: &mut InfoSchemaType, + opt: &MigrationOptions, + ) -> Result { + let mut indexes_sql = String::new(); + let mut skipped_sql = String::new(); + + // Get existing indexes from dbc + let existing_indexes: HashMap = if let Some(ss) = dbc.get(schema) { + if let Some(ts) = ss.get(table_name) { + ts.indexes.clone() + } else { + HashMap::new() + } + } else { + HashMap::new() + }; + + for (_idx_name, desired) in &self.index_groups { + // Generate actual index name - "+" in desired.name means auto-generate. + // Note: auto-index map keys use "__auto_" prefix, so we must check desired.name, + // not idx_name, to correctly detect auto-generated indexes. + let actual_index_name = if desired.name == "+" || desired.name.is_empty() { + let col_names: Vec<&str> = desired.columns.iter().map(|c| c.column_name.as_str()).collect(); + format!("idx_{}_{}", table_name, col_names.join("_")) + } else { + desired.name.clone() + }; + + // Check if index exists and if it needs to be updated + if let Some(existing) = existing_indexes.get(&actual_index_name) { + if self.index_matches(desired, existing) { + // Index exists and matches - skip + continue; + } + } + + // Check if any existing index already covers the same columns (possibly under a + // different name). Prevents creating duplicate indexes on the same column set. + if existing_indexes.values().any(|ex| self.index_columns_match(desired, ex)) { + continue; + } + + if let Some(_existing) = existing_indexes.get(&actual_index_name) { + let drop_sql = format!("DROP INDEX IF EXISTS {}.{};", schema, actual_index_name); + // Index exists but differs + if opt.with_index_drop { + // Drop it first + indexes_sql.push_str(&format!( + "DROP INDEX IF EXISTS {}.{};\n", + schema, actual_index_name + )); + } else { + let create_sql = self.build_create_index_sql(schema, table_name, &actual_index_name, desired); + if opt.without_failfast { + // Show skipped SQL + skipped_sql.push_str(&format!( + "-- SKIPPED (with_index_drop=false):\n-- {}\n-- {}\n", + drop_sql, create_sql.trim() + )); + } else { + return Err(format!( + "Index {} on {}.{} has changed but without_failfast is enabled. SQL would be:\n{}\n{}", + actual_index_name, schema, table_name, drop_sql, create_sql.trim() + )); + } + } + } + + // Build the CREATE INDEX statement + let create_idx = self.build_create_index_sql( + schema, + table_name, + &actual_index_name, + desired, + ); + indexes_sql.push_str(&create_idx); + + // Update dbc with the new index + self.update_dbc(schema, table_name, &actual_index_name, desired, dbc); + } + + // Log skipped SQL if any + if !skipped_sql.is_empty() { + #[cfg(not(feature = "slog"))] + eprintln!("Skipped index changes:\n{}", skipped_sql); + } + + Ok(indexes_sql) + } + + /// Check if the existing index matches the desired configuration + fn index_matches(&self, desired: &DesiredIndex, existing: &PgIndex) -> bool { + // Check unique flag + if desired.is_unique != existing.is_unique { + return false; + } + + // Check index method (btree is default) + let desired_method = if desired.using.is_empty() { "btree" } else { &desired.using }; + if desired_method != existing.index_method { + return false; + } + + // Check column count + if desired.columns.len() != existing.columns.len() { + return false; + } + + // Check each column + for (i, desired_col) in desired.columns.iter().enumerate() { + let existing_col = &existing.columns[i]; + + if desired_col.column_name != existing_col.column_name { + return false; + } + + // Check order (default is ASC) + let desired_order = if desired_col.order.is_empty() { "ASC" } else { &desired_col.order }; + if desired_order != existing_col.order { + return false; + } + + // Check nulls order (default depends on sort order: ASC -> LAST, DESC -> FIRST) + let default_nulls = if desired_order == "DESC" { "FIRST" } else { "LAST" }; + let desired_nulls = if desired_col.nulls.is_empty() { default_nulls } else { &desired_col.nulls }; + if desired_nulls != existing_col.nulls { + return false; + } + + // Check collation + if desired_col.collate != existing_col.collation { + return false; + } + } + + true + } + + /// Check if an existing index already covers the same columns as the desired index. + /// Compares column names (in order), uniqueness, and index method — ignoring index name. + fn index_columns_match(&self, desired: &DesiredIndex, existing: &PgIndex) -> bool { + if desired.is_unique != existing.is_unique { + return false; + } + let desired_method = if desired.using.is_empty() { "btree" } else { &desired.using }; + if desired_method != existing.index_method { + return false; + } + if desired.columns.len() != existing.columns.len() { + return false; + } + desired.columns.iter().zip(existing.columns.iter()) + .all(|(d, e)| d.column_name == e.column_name) + } + + /// Build CREATE INDEX SQL statement + fn build_create_index_sql( + &self, + schema: &str, + table_name: &str, + index_name: &str, + desired: &DesiredIndex, + ) -> String { + let mut sql = String::from("CREATE "); + + // UNIQUE + if desired.is_unique { + sql.push_str("UNIQUE "); + } + + sql.push_str("INDEX "); + + // CONCURRENTLY + if desired.concurrently { + sql.push_str("CONCURRENTLY "); + } + + // IF NOT EXISTS and index name + sql.push_str("IF NOT EXISTS "); + sql.push_str(index_name); + sql.push_str(" ON "); + sql.push_str(schema); + sql.push('.'); + sql.push_str(table_name); + + // USING method + if !desired.using.is_empty() { + sql.push_str(" USING "); + sql.push_str(&desired.using); + } + + // Columns + sql.push_str(" ("); + let col_defs: Vec = desired + .columns + .iter() + .map(|c| { + let mut col_sql = c.column_name.clone(); + + // COLLATE + if !c.collate.is_empty() { + col_sql.push_str(" COLLATE "); + col_sql.push_str(&c.collate); + } + + // ORDER (ASC/DESC) + if !c.order.is_empty() { + col_sql.push(' '); + col_sql.push_str(&c.order); + } + + // NULLS FIRST/LAST + if !c.nulls.is_empty() { + col_sql.push_str(" NULLS "); + col_sql.push_str(&c.nulls); + } + + col_sql + }) + .collect(); + sql.push_str(&col_defs.join(", ")); + sql.push_str(");\n"); + + sql + } + + /// Update dbc with the new index information + fn update_dbc( + &self, + schema: &str, + table_name: &str, + index_name: &str, + desired: &DesiredIndex, + dbc: &mut InfoSchemaType, + ) { + if let Some(ss) = dbc.get_mut(schema) { + if let Some(ts) = ss.get_mut(table_name) { + let columns: Vec = desired + .columns + .iter() + .map(|c| { + let order = if c.order.is_empty() { "ASC".to_string() } else { c.order.clone() }; + let default_nulls = if order == "DESC" { "FIRST" } else { "LAST" }; + PgIndexColumn { + column_name: c.column_name.clone(), + order, + nulls: if c.nulls.is_empty() { default_nulls.to_string() } else { c.nulls.clone() }, + collation: c.collate.clone(), + } + }) + .collect(); + + ts.indexes.insert( + index_name.to_string(), + PgIndex { + index_name: index_name.to_string(), + columns, + is_unique: desired.is_unique, + index_method: if desired.using.is_empty() { + "btree".to_string() + } else { + desired.using.clone() + }, + }, + ); + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 9250553..b0570d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,10 +5,10 @@ extern crate yaml_rust; use std::convert::TryFrom; use std::fs; use std::result::Result; +use std::str::FromStr; use std::sync::{Arc, RwLock}; use lazy_static::lazy_static; -use postgres::Client; use slog::Logger; @@ -25,6 +25,8 @@ use self::yaml_rust::Yaml; pub mod loader; pub mod table; pub mod column; +pub mod index; +pub mod grant; pub mod schema; pub mod utils; @@ -43,48 +45,102 @@ pub fn set_logger(logger: Logger) { } #[cfg(feature = "slog")] -pub(crate) fn log_debug(msg: String) { +pub(crate) fn log_debug(prefix: &str, msg: &String, file: &str, schema: &String) { let log = LOG.read().unwrap(); if let Some(l) = &*log { - debug!(l, "{}", msg); + debug!(l, "--{} {}[{}] {}", prefix, file, schema, msg); } else { - println!("{}", msg); + println!("{} {}[{}] {}", prefix, file, schema, msg); } } +#[derive(Debug, Clone, Default)] +/// Migration option for column types, indexes, triggers and grant/revoke access. +/// By default, all false - return exception on potentially destructive changes. +///
  • Use without_failfast=true, to log those SQL without execution +/// +pub struct MigrationOptions { + /// if column type identified less than perform alter, size extend will perform anyway, otherwise ignore changes, also see failfast flag + ///
  • default: false + pub with_size_cut: bool, + /// if index change detected, then perform drop before update, otherwise ignore changes, also see failfast flag + ///
  • default: false + pub with_index_drop: bool, + /// if trigger change detected, then perform drop before update, otherwise ignore changes, also see failfast flag + ///
  • default: false + pub with_trigger_drop: bool, + /// do not perform trigger and creation. default: create trigger. + ///
  • default: false + pub exclude_triggers: bool, + /// if access(grant) change detected, then perform revoke, otherwise ignore changes, also see failfast flag + ///
  • default: false + pub with_revoke: bool, + /// if changes detected AND not set to 'true' for apply, then ignore changes and show generated skipped SQL, + /// otherwise return exception (default) + ///
  • default: false - return exception if no "with_xxx" and changes detected + pub without_failfast: bool, + /// if ddl statement execution failed, then 100 retry max with delay 1 sec, otherwise return error immediately, default: false + ///
  • default: false - do not try to retry + pub with_ddl_retry: bool, +} + pub fn get_schema() -> Vec { SCHEMA_YAMLS.clone() } -/// simplified migrate -pub fn migrate1(schema: Yaml, db: &mut Client) -> Result { - migrate(schema, db, false, None::<&dyn Fn(Vec) -> Result<(), String>>, "") +/// Migrate one yaml schema file with options +pub async fn migrate1(schema: Yaml, db_url: &str) -> Result { + let config = tokio_postgres::config::Config::from_str(db_url) + .map_err(|e| format!("parse db_url: {e}"))?; + let (mut client, conn) = config.connect(tokio_postgres::NoTls) + .await.map_err(|e| format!("connect: {e}"))?; + tokio::spawn(async move { let _ = conn.await; }); + let mut tx = client.transaction() + .await.map_err(|e| format!("transaction: {e}"))?; + let cnt = migrate(schema, &mut tx, None, "", &MigrationOptions::default()).await?; + tx.commit().await.map_err(|e| format!("commit: {e}"))?; + Ok(cnt) +} + +/// Migrate one yaml schema file with options +pub async fn migrate_opt(schema: Yaml, db_url: &str, opt: &MigrationOptions) -> Result { + let config = tokio_postgres::config::Config::from_str(db_url) + .map_err(|e| format!("parse db_url: {e}"))?; + let (mut client, conn) = config.connect(tokio_postgres::NoTls) + .await.map_err(|e| format!("connect: {e}"))?; + tokio::spawn(async move { let _ = conn.await; }); + let mut tx = client.transaction() + .await.map_err(|e| format!("transaction: {e}"))?; + let cnt = migrate(schema, &mut tx, None, "", &opt).await?; + tx.commit().await.map_err(|e| format!("commit: {e}"))?; + Ok(cnt) } /// main entry point to apply schema from yaml to the database /// return statements to execute /// -pub fn migrate(schema: Yaml, dbc: &mut Client, retry: bool, - dry_run: Option<&dyn Fn(Vec) -> Result<(), String>>, file_name: &str +pub async fn migrate(schema: Yaml, db: &mut tokio_postgres::Transaction<'_>, + dry_run: Option<&(dyn Fn(Vec) -> Result<(), String> + Sync + Send)>, file_name: &str, + opt: &MigrationOptions ) -> Result { - let mut db = dbc.transaction().map_err(|e| format!("{}", e))?; + // let mut db = dbc.transaction().map_err(|e| format!("{}", e))?; let mut cnt = 0; // check db connection let db_name: String = db.query("select current_database()", &[]) - .map_err(|e| format!("DB connection error: {}", e))?[0].get(0); + .await.map_err(|e| format!("DB connection error: {}", e))?[0].get(0); // load schema - let mut info = load_info_schema(db_name.as_str(), &mut db)?; + let mut info = load_info_schema(db_name.as_str(), db).await?; let schemas = parse_yaml_schema(schema, file_name)?; for s in &schemas.list { - cnt += s.deploy_all_tables(&mut info, &mut db, retry, dry_run)?; + cnt += s.deploy_all_tables(&mut info, db, dry_run, opt).await?; } for s in &schemas.list { - cnt += s.deploy_all_fk(&schemas, &mut info, &mut db, retry, dry_run)?; + cnt += s.deploy_all_fk(&schemas, &mut info, db, opt.with_ddl_retry, dry_run).await?; } - let _ = db.commit().map_err(|e| format!("committing error: {}", e))?; + // let _ = db.commit().map_err(|e| format!("committing error: {}", e))?; Ok(cnt) } @@ -133,6 +189,14 @@ pub fn parse_yaml_schema(yaml: Yaml, file_name: &str) -> Result ss.append(s)? } } + + // Resolve templates for all schemas + // First pass: collect all schemas for cross-schema template references + let schemas_snapshot = schema_schemas.clone(); + for schema in &mut schema_schemas.list { + schema.resolve_templates(&schemas_snapshot)?; + } + Ok(schema_schemas) } } @@ -142,10 +206,9 @@ pub fn parse_yaml_schema(yaml: Yaml, file_name: &str) -> Result, /// column name, foreign schema, table, column, fk name pub fks: HashMap, - /// trigger name, trigger's schema - pub triggers: HashMap, + /// trigger name -> PgTrigger + pub triggers: HashMap, + /// index name -> PgIndex + pub indexes: HashMap, + /// grantee -> PgGrant + pub grants: HashMap, + /// Primary key columns in order, with constraint name + pub primary_key: Option, pub sort_order: usize, pub table_comment: Option, pub owner: Option, - // pub grant: HashMap, +} + +/// Primary key information loaded from DB +#[derive(Debug, Clone, Serialize, PartialEq)] +pub struct PgPrimaryKey { + pub constraint_name: String, + /// Column names in order + pub columns: Vec, } const _PRIVILEGES: [&str; 14] = [ @@ -93,6 +106,41 @@ pub struct FKTable { pub sql: String, } +/// Index column information loaded from DB +#[derive(Debug, Clone, Serialize, PartialEq)] +pub struct PgIndexColumn { + pub column_name: String, + pub order: String, // ASC or DESC + pub nulls: String, // FIRST or LAST + pub collation: String, // collation name or empty +} + +/// Index information loaded from DB +#[derive(Debug, Clone, Serialize, PartialEq)] +pub struct PgIndex { + pub index_name: String, + pub columns: Vec, + pub is_unique: bool, + pub index_method: String, // btree, hash, gist, etc. +} + +/// Trigger information loaded from DB +#[derive(Debug, Clone, Serialize, PartialEq)] +pub struct PgTrigger { + pub trigger_name: String, + pub event: String, // BEFORE INSERT, AFTER UPDATE, etc. + pub orientation: String, // FOR EACH ROW, FOR EACH STATEMENT + pub proc: String, // function name with schema +} + +/// Grant information loaded from DB +#[derive(Debug, Clone, Serialize, PartialEq, Default)] +pub struct PgGrant { + pub grantee: String, + pub privileges: HashSet, // SELECT, INSERT, UPDATE, DELETE, etc. + pub with_grant_option: bool, +} + impl FKTable { pub(crate) fn columns(&self) -> String { let mut cs = String::new(); @@ -146,22 +194,33 @@ impl PgColumnDfn { } -pub fn load_info_schema(db_name: &str, db: &mut Transaction) -> Result { - let mut data = load_info_cc(db_name, db)?; - let _ = load_info_fk(db_name, db, &mut data)?; - let _ = load_info_tg(db_name, db, &mut data)?; +pub async fn load_info_schema(db_name: &str, db: &mut tokio_postgres::Transaction<'_>) -> Result { + let mut data = load_info_cc(db_name, db).await?; + let _ = load_info_pk(db_name, db, &mut data).await?; + let _ = load_info_idx(db_name, db, &mut data).await?; + let _ = load_info_fk(db_name, db, &mut data).await?; + let _ = load_info_tg(db_name, db, &mut data).await?; + let _ = load_info_grant(db_name, db, &mut data).await?; Ok(data) } // SELECT table_catalog, table_schema, table_name, column_name, column_default, is_nullable, data_type, udt_name, character_maximum_length, numeric_precision, numeric_scale, ordinal_position from information_schema.columns where table_schema not in ('pg_catalog', 'information_schema') and table_name = table_catalog = $1 #[inline] -fn load_info_cc(db_name: &str, db: &mut Transaction) -> Result { +async fn load_info_cc(db_name: &str, db: &mut tokio_postgres::Transaction<'_>) -> Result { let mut data: InfoSchemaType = Default::default(); - let result = db.query("SELECT table_catalog, table_schema, table_name, column_name, column_default, is_nullable, \ - data_type, udt_name, character_maximum_length, numeric_precision, numeric_scale, ordinal_position \ - from information_schema.columns where table_schema not in ('pg_catalog', 'information_schema') and table_catalog = $1 \ - order by 1,2,3, ordinal_position", &[&db_name]) + let result = db.query( + "SELECT c.table_catalog, c.table_schema, c.table_name, c.column_name, c.column_default, c.is_nullable, \ + c.data_type, c.udt_name, c.character_maximum_length, c.numeric_precision, c.numeric_scale, c.ordinal_position, \ + a.atttypmod \ + FROM information_schema.columns c \ + JOIN pg_catalog.pg_class pc ON pc.relname = c.table_name AND pc.relkind IN ('r','v','m','p') \ + JOIN pg_catalog.pg_namespace pn ON pn.nspname = c.table_schema AND pn.oid = pc.relnamespace \ + JOIN pg_catalog.pg_attribute a ON a.attrelid = pc.oid AND a.attname = c.column_name \ + AND a.attnum > 0 AND NOT a.attisdropped \ + WHERE c.table_schema NOT IN ('pg_catalog', 'information_schema') AND c.table_catalog = $1 \ + ORDER BY 1,2,3, c.ordinal_position", &[&db_name]) + .await .map_err(|e| format!("on loading information_schema [{}]: {}", db_name, e))?; let mut sort_order = 0; for r in result { @@ -177,9 +236,20 @@ fn load_info_cc(db_name: &str, db: &mut Transaction) -> Result = r.get(8); let numeric_precision: Option = r.get(9); let numeric_scale: Option = r.get(10); + // col 11: ordinal_position (used for ORDER BY only) + let atttypmod: i32 = r.get(12); let mut data_type = if udt_name.len() == 0 { data_type.to_string() } else { udt_name.to_string() }; + // PostgreSQL prefixes array udt_name with '_' (e.g. _varchar for varchar[]) + let is_array = data_type.starts_with('_'); + if is_array { + data_type = data_type[1..].to_string(); + } if data_type.to_lowercase().as_str() == "varchar" { - if let Some(varchar_len) = character_maximum_length { + // For array columns information_schema reports character_maximum_length as NULL; + // fall back to atttypmod (PostgreSQL stores length+4 there for varchar). + let varchar_len = character_maximum_length + .or_else(|| if is_array && atttypmod > 4 { Some(atttypmod - 4) } else { None }); + if let Some(varchar_len) = varchar_len { data_type.push_str(format!("({})", varchar_len).as_str()); } } else { @@ -191,6 +261,9 @@ fn load_info_cc(db_name: &str, db: &mut Transaction) -> Result Result Result Result Result<(), String> { - match db.query("SELECT trigger_catalog, trigger_schema, trigger_name, event_object_catalog, event_object_schema, event_object_table \ - from information_schema.triggers where event_object_schema not in ('pg_catalog', 'information_schema') and trigger_catalog = $1 \ - order by created", &[&db_name]) { - Err(e) => Err(format!("on loading information_schema.triggers: {}", e)), - Ok(result) => { - let mut sort_order = 0; - for r in result { - sort_order += 1; - let trigger_catalog: &str = r.get(0); - let trigger_schema: &str = r.get(1); - let trigger_name: &str = r.get(2); - let event_object_catalog: &str = r.get(3); - let event_object_schema: &str = r.get(4); - let event_object_table: &str = r.get(5); - let trigger_data = format!("{} {} {}", trigger_catalog, trigger_schema, event_object_catalog); - match data.get_mut(event_object_schema) { - None => { - let mut hd = HashMap::new(); - hd.insert(event_object_table.into(), PgTable::newt(event_object_table, trigger_name, trigger_data, sort_order)); - data.insert(event_object_schema.into(), hd); - } - Some(s) => { - match s.get_mut(event_object_table) { - None => { - s.insert(event_object_table.into(), PgTable::newt(event_object_table, trigger_name, trigger_data, sort_order)); - } - Some(hd) => { - hd.triggers.insert(trigger_name.into(), trigger_data); - } - } - } - } +async fn load_info_tg(_db_name: &str, db: &mut tokio_postgres::Transaction<'_>, data: &mut InfoSchemaType) -> Result<(), String> { + // Query triggers with full details from pg_trigger + let result = db.query( + "SELECT + n.nspname as schema_name, + c.relname as table_name, + t.tgname as trigger_name, + CASE + WHEN t.tgtype & 2 = 2 THEN 'BEFORE' + WHEN t.tgtype & 64 = 64 THEN 'INSTEAD OF' + ELSE 'AFTER' + END || + CASE WHEN t.tgtype & 4 = 4 THEN ' INSERT' ELSE '' END || + CASE WHEN t.tgtype & 8 = 8 THEN ' DELETE' ELSE '' END || + CASE WHEN t.tgtype & 16 = 16 THEN ' UPDATE' ELSE '' END || + CASE WHEN t.tgtype & 32 = 32 THEN ' TRUNCATE' ELSE '' END as event, + CASE WHEN t.tgtype & 1 = 1 THEN 'FOR EACH ROW' ELSE 'FOR EACH STATEMENT' END as orientation, + pn.nspname || '.' || p.proname || '()' as proc_name + FROM pg_trigger t + JOIN pg_class c ON c.oid = t.tgrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + JOIN pg_proc p ON p.oid = t.tgfoid + JOIN pg_namespace pn ON pn.oid = p.pronamespace + WHERE NOT t.tgisinternal + AND n.nspname NOT IN ('pg_catalog', 'information_schema') + ORDER BY n.nspname, c.relname, t.tgname", + &[] + ).await.map_err(|e| format!("on loading pg_trigger: {}", e))?; + + for r in result { + let schema_name: &str = r.get(0); + let table_name: &str = r.get(1); + let trigger_name: &str = r.get(2); + let event: &str = r.get(3); + let orientation: &str = r.get(4); + let proc_name: &str = r.get(5); + + let trigger = PgTrigger { + trigger_name: trigger_name.to_string(), + event: event.trim().to_string(), + orientation: orientation.to_string(), + proc: proc_name.to_string(), + }; + + if let Some(schema_tables) = data.get_mut(schema_name) { + if let Some(table) = schema_tables.get_mut(table_name) { + table.triggers.insert(trigger_name.to_string(), trigger); } - Ok(()) } } + Ok(()) } const NO_ACTION: &str = "NO ACTION"; @@ -367,7 +454,7 @@ const NO_ACTION: &str = "NO ACTION"; #[inline] // db: &mut Transaction, // db: &mut Client -fn load_info_fk(db_name: &str, db: &mut Transaction, data: &mut InfoSchemaType) -> Result<(), String> { +async fn load_info_fk(db_name: &str, db: &mut tokio_postgres::Transaction<'_>, data: &mut InfoSchemaType) -> Result<(), String> { match db.query("SELECT tc.table_schema, tc.table_name, kcu.column_name, ccu.table_schema AS foreign_schema_name, ccu.table_name AS foreign_table_name, ccu.column_name AS foreign_column_name, tc.constraint_name, rc.match_option, rc.update_rule, rc.delete_rule @@ -375,7 +462,7 @@ fn load_info_fk(db_name: &str, db: &mut Transaction, data: &mut InfoSchemaType) JOIN information_schema.key_column_usage AS kcu ON tc.constraint_name = kcu.constraint_name JOIN information_schema.constraint_column_usage AS ccu ON ccu.constraint_name = tc.constraint_name join information_schema.referential_constraints as rc on tc.constraint_name = rc.constraint_name - WHERE constraint_type = 'FOREIGN KEY' and tc.table_catalog = $1", &[&db_name]) { + WHERE constraint_type = 'FOREIGN KEY' and tc.table_catalog = $1", &[&db_name]).await { Err(e) => Err(format!("on loading information_schema.fk: {}", e)), Ok(result) => { for r in result { @@ -428,9 +515,231 @@ fn load_info_fk(db_name: &str, db: &mut Transaction, data: &mut InfoSchemaType) } #[inline] -pub fn load_info_schema_owner(db_name: &str, db: &mut Transaction) -> Result { +async fn load_info_idx(db_name: &str, db: &mut tokio_postgres::Transaction<'_>, data: &mut InfoSchemaType) -> Result<(), String> { + // Query indexes with detailed column information from pg_index + let result = db.query( + "SELECT + n.nspname as schema_name, + t.relname as table_name, + i.relname as index_name, + ix.indisunique as is_unique, + ix.indisprimary as is_primary, + am.amname as index_method, + a.attname as column_name, + CASE WHEN ix.indoption[array_position(ix.indkey, a.attnum) - 1] & 1 = 1 THEN 'DESC' ELSE 'ASC' END as sort_order, + CASE + WHEN ix.indoption[array_position(ix.indkey, a.attnum) - 1] & 2 = 2 THEN 'FIRST' + ELSE 'LAST' + END as nulls_order, + COALESCE(coll.collname, '') as collation, + array_position(ix.indkey, a.attnum) as col_position + FROM pg_index ix + JOIN pg_class i ON i.oid = ix.indexrelid + JOIN pg_class t ON t.oid = ix.indrelid + JOIN pg_namespace n ON n.oid = t.relnamespace + JOIN pg_am am ON am.oid = i.relam + JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) + JOIN information_schema.tables ist ON ist.table_catalog = $1 AND ist.table_schema = n.nspname AND ist.table_name = t.relname + LEFT JOIN pg_collation coll ON coll.oid = ANY(ix.indcollation) + AND array_position(ix.indcollation, coll.oid) = array_position(ix.indkey, a.attnum) + WHERE n.nspname NOT IN ('pg_catalog', 'information_schema') + ORDER BY n.nspname, t.relname, i.relname, array_position(ix.indkey, a.attnum)", + &[&db_name] + ).await.map_err(|e| format!("on loading pg_indexes: {}", e))?; + + // Group results by index + let mut current_index: Option<(String, String, String, PgIndex)> = None; + + for r in result { + let schema_name: &str = r.get(0); + let table_name: &str = r.get(1); + let index_name: &str = r.get(2); + let is_unique: bool = r.get(3); + let is_primary: bool = r.get(4); + let index_method: &str = r.get(5); + let column_name: &str = r.get(6); + let sort_order: &str = r.get(7); + let nulls_order: &str = r.get(8); + let collation: &str = r.get(9); + + // Skip primary key indexes as they are handled separately + if is_primary { + continue; + } + + let col_info = PgIndexColumn { + column_name: column_name.to_string(), + order: sort_order.to_string(), + nulls: nulls_order.to_string(), + collation: collation.to_string(), + }; + + match &mut current_index { + Some((cur_schema, cur_table, cur_idx_name, idx)) + if cur_schema == schema_name && cur_table == table_name && cur_idx_name == index_name => + { + // Same index, add column + idx.columns.push(col_info); + } + _ => { + // Save previous index if exists + if let Some((prev_schema, prev_table, _, prev_idx)) = current_index.take() { + if let Some(schema_tables) = data.get_mut(&prev_schema) { + if let Some(table) = schema_tables.get_mut(&prev_table) { + table.indexes.insert(prev_idx.index_name.clone(), prev_idx); + } + } + } + // Start new index + current_index = Some(( + schema_name.to_string(), + table_name.to_string(), + index_name.to_string(), + PgIndex { + index_name: index_name.to_string(), + columns: vec![col_info], + is_unique, + index_method: index_method.to_string(), + }, + )); + } + } + } + + // Save last index + if let Some((prev_schema, prev_table, _, prev_idx)) = current_index { + if let Some(schema_tables) = data.get_mut(&prev_schema) { + if let Some(table) = schema_tables.get_mut(&prev_table) { + table.indexes.insert(prev_idx.index_name.clone(), prev_idx); + } + } + } + + Ok(()) +} + +#[inline] +async fn load_info_pk(db_name: &str, db: &mut tokio_postgres::Transaction<'_>, data: &mut InfoSchemaType) -> Result<(), String> { + // Query primary keys with column order from pg_constraint + let result = db.query( + "SELECT + n.nspname as schema_name, + t.relname as table_name, + c.conname as constraint_name, + a.attname as column_name, + array_position(c.conkey, a.attnum) as col_position + FROM pg_constraint c + JOIN pg_class t ON t.oid = c.conrelid + JOIN pg_namespace n ON n.oid = t.relnamespace + JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(c.conkey) + JOIN information_schema.tables ist ON ist.table_catalog = $1 AND ist.table_schema = n.nspname AND ist.table_name = t.relname + WHERE c.contype = 'p' + AND n.nspname NOT IN ('pg_catalog', 'information_schema') + ORDER BY n.nspname, t.relname, array_position(c.conkey, a.attnum)", + &[&db_name] + ).await.map_err(|e| format!("on loading primary keys: {}", e))?; + + // Group results by table + let mut current_pk: Option<(String, String, String, Vec)> = None; // schema, table, constraint_name, columns + + for r in result { + let schema_name: &str = r.get(0); + let table_name: &str = r.get(1); + let constraint_name: &str = r.get(2); + let column_name: &str = r.get(3); + + match &mut current_pk { + Some((cur_schema, cur_table, _, columns)) + if cur_schema == schema_name && cur_table == table_name => + { + // Same table, add column + columns.push(column_name.to_string()); + } + _ => { + // Save previous PK if exists + if let Some((prev_schema, prev_table, prev_constraint, prev_columns)) = current_pk.take() { + if let Some(schema_tables) = data.get_mut(&prev_schema) { + if let Some(table) = schema_tables.get_mut(&prev_table) { + table.primary_key = Some(PgPrimaryKey { + constraint_name: prev_constraint, + columns: prev_columns, + }); + } + } + } + // Start new PK + current_pk = Some(( + schema_name.to_string(), + table_name.to_string(), + constraint_name.to_string(), + vec![column_name.to_string()], + )); + } + } + } + + // Save last PK + if let Some((prev_schema, prev_table, prev_constraint, prev_columns)) = current_pk { + if let Some(schema_tables) = data.get_mut(&prev_schema) { + if let Some(table) = schema_tables.get_mut(&prev_table) { + table.primary_key = Some(PgPrimaryKey { + constraint_name: prev_constraint, + columns: prev_columns, + }); + } + } + } + + Ok(()) +} + +#[inline] +async fn load_info_grant(db_name: &str, db: &mut tokio_postgres::Transaction<'_>, data: &mut InfoSchemaType) -> Result<(), String> { + // Query table grants from information_schema + let result = db.query( + "SELECT + table_schema, + table_name, + grantee, + privilege_type, + is_grantable + FROM information_schema.table_privileges + WHERE table_schema NOT IN ('pg_catalog', 'information_schema') + AND grantor != grantee + AND table_catalog = $1 + ORDER BY table_schema, table_name, grantee", + &[&db_name] + ).await.map_err(|e| format!("on loading table_privileges: {}", e))?; + + for r in result { + let schema_name: &str = r.get(0); + let table_name: &str = r.get(1); + let grantee: &str = r.get(2); + let privilege_type: &str = r.get(3); + let is_grantable: &str = r.get(4); + + if let Some(schema_tables) = data.get_mut(schema_name) { + if let Some(table) = schema_tables.get_mut(table_name) { + let grant = table.grants.entry(grantee.to_string()).or_insert_with(|| PgGrant { + grantee: grantee.to_string(), + privileges: HashSet::new(), + with_grant_option: false, + }); + grant.privileges.insert(privilege_type.to_string()); + if is_grantable == "YES" { + grant.with_grant_option = true; + } + } + } + } + Ok(()) +} + +#[inline] +pub async fn load_info_schema_owner(db_name: &str, db: &mut tokio_postgres::Transaction<'_>) -> Result { let mut res = HashMap::new(); - match db.query("select schema_name, schema_owner from information_schema.schemata where schema_name not in ('information_schema', 'pg_catalog')", &[]) { + match db.query("select schema_name, schema_owner from information_schema.schemata where schema_name not in ('information_schema', 'pg_catalog')", + &[]).await { Ok(schemas) => { for schema in schemas { let schema_name: &str = schema.get(0); @@ -440,7 +749,7 @@ pub fn load_info_schema_owner(db_name: &str, db: &mut Transaction) -> Result { return Err(format!("on loading information_schema.owner: {}", e)); } Ok(result) => { for r in result { @@ -465,6 +774,9 @@ impl Default for PgTable { columns: Default::default(), fks: Default::default(), triggers: Default::default(), + indexes: Default::default(), + grants: Default::default(), + primary_key: None, sort_order: 0, table_comment: None, owner: None, @@ -486,11 +798,17 @@ impl PgTable { } } - /// should not called + /// should not be called - kept for backward compatibility #[inline] - fn newt(table: &str, trig: &str, trig_data: String, sort_order: usize) -> Self { + #[allow(dead_code)] + fn newt(table: &str, trig_name: &str, event: &str, orientation: &str, proc: &str, sort_order: usize) -> Self { let mut tgs = HashMap::new(); - tgs.insert(trig.into(), trig_data); + tgs.insert(trig_name.into(), PgTrigger { + trigger_name: trig_name.to_string(), + event: event.to_string(), + orientation: orientation.to_string(), + proc: proc.to_string(), + }); PgTable { table_name: table.into(), triggers: tgs, diff --git a/src/schema.rs b/src/schema.rs index 8fc30d3..67780b5 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1,10 +1,11 @@ -use postgres::Transaction; +// use postgres::Transaction; use serde::Serialize; use yaml_rust::Yaml; use crate::loader::InfoSchemaType; use crate::table::Table; use crate::utils::{Named, OrderedHashMap}; +use crate::MigrationOptions; #[derive(Debug, Clone, Serialize)] pub struct Schema { @@ -86,10 +87,17 @@ impl Schema { #[inline] /// return statements to execute - pub fn deploy_all_tables(&self, schema: &mut InfoSchemaType, db: &mut Transaction, retry: bool, dry_run: Option<&dyn Fn(Vec) -> Result<(), String>>) -> Result { + pub async fn deploy_all_tables(&self, schema: &mut InfoSchemaType, + db: &mut tokio_postgres::Transaction<'_>, + dry_run: Option<&(dyn Fn(Vec) -> Result<(), String> + Send + Sync)>, + opt: &MigrationOptions) -> Result { let mut cnt = 0; for t in &self.tables.list { - if t.deploy(schema, db, &self.schema_name, retry, self.file.as_str(), dry_run)? { + // Skip template tables - they are not created in DB + if t.is_template() { + continue; + } + if t.deploy(schema, db, &self.schema_name, self.file.as_str(), dry_run, opt).await? { cnt += 1; } } @@ -98,14 +106,85 @@ impl Schema { #[inline] /// return statements to execute - pub fn deploy_all_fk(&self, schemas: &OrderedHashMap, schema: &mut InfoSchemaType, db: &mut Transaction, retry: bool, dry_run: Option<&dyn Fn(Vec) -> Result<(), String>>) -> Result { + pub async fn deploy_all_fk(&self, schemas: &OrderedHashMap, + schema: &mut InfoSchemaType, + db: &mut tokio_postgres::Transaction<'_>, + retry: bool, + dry_run: Option<&(dyn Fn(Vec) -> Result<(), String> + Send + Sync)>) -> Result { let mut cnt = 0; for t in &self.tables.list { - if t.deploy_fk(schemas, schema, db, &self.schema_name, retry, self.file.as_str(), dry_run)? { + // Skip template tables - they are not created in DB + if t.is_template() { + continue; + } + if t.deploy_fk(schemas, schema, db, &self.schema_name, retry, self.file.as_str(), dry_run).await? { cnt += 1; } } Ok(cnt) } + /// Resolve templates for all tables in this schema + /// Templates are referenced by name: "schema.table" or just "table" for same schema + pub fn resolve_templates(&mut self, all_schemas: &OrderedHashMap) -> Result<(), String> { + // Collect tables that need template resolution + let table_names: Vec = self.tables.list.iter() + .filter(|t| !t.use_templates.is_empty()) + .map(|t| t.table_name.clone()) + .collect(); + + for table_name in table_names { + let use_templates = if let Some(t) = self.tables.get(&table_name) { + t.use_templates.clone() + } else { + continue; + }; + + // Collect all templates to merge + let mut templates_to_merge: Vec = Vec::new(); + for template_ref in &use_templates { + let (template_schema, template_table) = if template_ref.contains('.') { + let parts: Vec<&str> = template_ref.splitn(2, '.').collect(); + (parts[0].to_string(), parts[1].to_string()) + } else { + (self.schema_name.clone(), template_ref.clone()) + }; + + // Find the template table + let template = if template_schema == self.schema_name { + self.tables.get(&template_table).cloned() + } else { + all_schemas.get(&template_schema) + .and_then(|s| s.tables.get(&template_table).cloned()) + }; + + match template { + Some(t) => { + if !t.is_template() { + return Err(format!( + "Table {}.{} references '{}' as template, but it is not marked as template: true", + self.schema_name, table_name, template_ref + )); + } + templates_to_merge.push(t); + } + None => { + return Err(format!( + "Table {}.{} references template '{}' which does not exist", + self.schema_name, table_name, template_ref + )); + } + } + } + + // Apply templates in order + if let Some(table) = self.tables.get_mut(&table_name) { + for template in templates_to_merge { + table.merge_from(&template); + } + } + } + + Ok(()) + } } diff --git a/src/schema.yaml b/src/schema.yaml index 39b6d35..c27da3f 100644 --- a/src/schema.yaml +++ b/src/schema.yaml @@ -114,12 +114,44 @@ schema: sql: type: string index: + oneOf: + # simple index on a column + - type: boolean + - type: object + items: + name: + # if multiple columns having same index name, then the single index will be created for the set of those columns + # if name is just a "+" or not set, than generate a default index name + type: string + unique: + # create UNIQUE index + type: boolean + concurrently: + # create index CONCURRENTLY (non-blocking) + type: boolean + using: + # index method: btree, hash, gist, spgist, gin, brin + type: string + order: + # ASC or DESC + type: string + nulls: + # FIRST or LAST (NULLS FIRST / NULLS LAST) + type: string + collate: + # collation name for the index + type: string + sql: + # additional SQL (appended to index creation, prefer specific fields, ignored on index changes) + type: string + partition_by: + # partitioning method: RANGE, LIST, HASH - please use pg_partman for more complex partitioning strategies + type: string + pg_partman: type: object items: - name: - # if multiple columns having same index name, then the single index will be created for the set of those columns - type: string - sql: + create_partition: + # pg_partman parameters like: p_interval := '1 day', p_start_partition etc for create_partition call except p_parent_table and p_control type: string description: type: string @@ -200,7 +232,7 @@ schema: type: array items: $ref: grant - template: # hold for future usage + template: oneOf: # this table will not create - type: boolean diff --git a/src/table.rs b/src/table.rs index 7a82ee8..da51419 100644 --- a/src/table.rs +++ b/src/table.rs @@ -1,13 +1,13 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Write; -use postgres::Transaction; use serde::Serialize; use yaml_rust::Yaml; use yaml_rust::yaml::Array; use crate::column::{Column, Trig}; use crate::loader::{FKTable, InfoSchemaType, PgTable}; +use crate::MigrationOptions; #[cfg(feature = "slog")] use crate::log_debug; use crate::schema::Schema; @@ -44,6 +44,16 @@ pub struct Table { #[serde(skip_serializing_if = "Vec::is_empty")] pub grant: Vec, + /// Template configuration: + /// - None: regular table, no template inheritance + /// - Some(true): this table IS a template (won't be created in DB) + /// - Some(false): regular table (same as None) + #[serde(skip_serializing_if = "Option::is_none")] + pub is_template: Option, + + /// List of template names to inherit from (format: "schema.table" or just "table" for same schema) + #[serde(skip_serializing_if = "Vec::is_empty")] + pub use_templates: Vec, } @@ -160,6 +170,8 @@ impl Default for Table { data: vec![], owner: "".to_string(), grant: vec![], + is_template: None, + use_templates: vec![], } } } @@ -179,18 +191,6 @@ impl Table { if !c.is_null() { if let Some(_name) = c["name"].as_str() { let yc = Column::new(c); - /* - match file { - None => { - // println!("{}:\n{:?}\n{:?}", _name, yc, c); - } - Some(_f) => { - if let Some(log) = log { - trace!(log, "+\n{:?}\n{:?}", yc, c); - } - } - } - */ if let Err(e) = columns.append(yc) { return Err(format!( "{} (column name) {}/{} on table: {}{}{}", @@ -240,7 +240,54 @@ impl Table { } } } + // Validate partition_by: only one column allowed, must be RANGE/LIST/HASH + let mut partition_count = 0usize; + for col in &mut columns.list { + if let Some(ref pv) = col.partition_by { + match pv.as_str() { + "RANGE" | "LIST" | "HASH" => {} + _ => { + return Err(format!( + "Invalid partition_by value '{}' on column '{}' in table '{}'. Expected one of: RANGE, LIST, HASH{}{}", + pv, col.name, table_name, + match file { None => "", Some(_) => ", found in file: " }, + match file { None => "", Some(f) => f.as_str() }, + )); + } + } + partition_count += 1; + if partition_count > 1 { + return Err(format!( + "Only one column can have partition_by on table '{}'. Found multiple columns with partition_by{}{}", + table_name, + match file { None => "", Some(_) => ", found in file: " }, + match file { None => "", Some(f) => f.as_str() }, + )); + } + // Auto-create index if none declared + if col.index.is_none() { + col.index = Some(crate::column::Index::default()); + } + } + } + let etl = &input["data_file"]; + // Parse template field - can be boolean or array of strings + let template_field = &input["template"]; + let (is_template, use_templates) = if template_field.is_null() { + (None, vec![]) + } else if let Some(b) = template_field.as_bool() { + (Some(b), vec![]) + } else if let Some(arr) = template_field.as_vec() { + let templates: Vec = arr + .iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect(); + (None, templates) + } else { + (None, vec![]) + }; + Ok(Table { table_name: table_name.into(), description: crate::utils::as_str(input, "description", ""), @@ -252,29 +299,27 @@ impl Table { data_file: if etl.is_null() { None } else { - // match etl.as_str() { - // Some(s) => Some(s.to_string()), - // None => None, - // } etl.as_str().map(|s| s.to_string()) }, data: crate::utils::as_vec(input, "data"), owner: crate::utils::as_str(input, "owner", ""), grant: YGrant::new(input["grant"].as_vec()), + is_template, + use_templates, }) } /// build a create or alter sql #[allow(unused_mut)] - pub fn deploy( + pub async fn deploy( &self, dbc: &mut InfoSchemaType, - db: &mut Transaction, + db: &mut tokio_postgres::Transaction<'_>, schema: &String, // this - is_retry: bool, file: &str, - dry_run: Option<&dyn Fn(Vec) -> Result<(), String>>, + dry_run: Option<&(dyn Fn(Vec) -> Result<(), String> + Send + Sync)>, + opt: &MigrationOptions, ) -> Result { let mut sql = String::new(); let mut comments = String::new(); @@ -285,33 +330,273 @@ impl Table { None => TableOnly, Some(mut ts) => { let pks = ts.pks(); + let mut skipped_columns = String::new(); for dc in &self.columns.list { if !ts.columns.contains_key(&dc.name) { let def = dc.column_def(schema, &self.table_name, file)?; - append(format!( - "ALTER TABLE {}.{} ADD COLUMN {}", - schema, self.table_name, def.def(pks.is_some()) - ).as_str(), &mut sql, is_retry); + if opt.with_ddl_retry { + if let Some(base_type) = serial_to_base_type(&def.column_type) { + // serial/bigserial/smallserial are not valid inside PL/pgSQL + // EXECUTE '...'. Expand them: create the sequence separately + // and reference it via nextval with escaped quotes. + let seq_name = format!("{}_{}_seq", self.table_name, dc.name); + let seq_fqn = format!("{}.{}", schema, seq_name); + // CREATE SEQUENCE is idempotent with IF NOT EXISTS + sql.push_str(&format!("CREATE SEQUENCE IF NOT EXISTS {};\n", seq_fqn)); + // Build column def for use inside EXECUTE '...' — single + // quotes inside the EXECUTE string must be doubled. + let mut col_parts = format!("{} {}", dc.name, base_type); + if def.pk && !pks.is_some() { + col_parts.push_str(" primary key"); + } + if !def.nullable { + col_parts.push_str(" not null"); + } + col_parts.push_str(&format!(" default nextval(''{}''::regclass)", seq_fqn)); + if let Some(ssql) = &def.sql { + if !ssql.is_empty() { + col_parts.push(' '); + col_parts.push_str(ssql); + } + } + append(&format!( + "ALTER TABLE {}.{} ADD COLUMN {}", + schema, self.table_name, col_parts + ), &mut sql, true); + // Bind sequence ownership to the column + sql.push_str(&format!( + "ALTER SEQUENCE {} OWNED BY {}.{}.{};\n", + seq_fqn, schema, self.table_name, dc.name + )); + } else { + append(format!( + "ALTER TABLE {}.{} ADD COLUMN {}", + schema, self.table_name, def.def(pks.is_some()) + ).as_str(), &mut sql, true); + } + } else { + append(format!( + "ALTER TABLE {}.{} ADD COLUMN {}", + schema, self.table_name, def.def(pks.is_some()) + ).as_str(), &mut sql, false); + } let _ = ts.columns.insert(dc.get_name(), def); exec = true; + } else { + // Column exists - check for type changes + // Extract scalar values immediately to release the immutable borrow + // before any mutable borrow (ts.columns.insert) is needed. + let (existing_type, existing_nullable) = { + let existing_col = ts.columns.get(&dc.name).unwrap(); + (existing_col.column_type.to_lowercase(), existing_col.nullable) + }; + let desired_def = dc.column_def(schema, &self.table_name, file)?; + let desired_nullable = desired_def.nullable; + + // Normalize types for comparison + let desired_type = desired_def.column_type.to_lowercase(); + + if existing_type != desired_type { + // Types differ - check if it's a size change + let type_change = analyze_type_change(&existing_type, &desired_type); + + match type_change { + TypeChange::SizeExtension | TypeChange::Compatible => { + // Safe change - always apply + let alter_sql = format!( + "ALTER TABLE {}.{} ALTER COLUMN {} TYPE {}", + schema, self.table_name, dc.name, desired_def.column_type + ); + append(&alter_sql, &mut sql, opt.with_ddl_retry); + let _ = ts.columns.insert(dc.get_name(), desired_def); + exec = true; + } + TypeChange::SizeReduction => { + // Potentially destructive - requires with_size_cut + let alter_sql = format!( + "ALTER TABLE {}.{} ALTER COLUMN {} TYPE {}", + schema, self.table_name, dc.name, desired_def.column_type + ); + if opt.with_size_cut { + append(&alter_sql, &mut sql, opt.with_ddl_retry); + let _ = ts.columns.insert(dc.get_name(), desired_def); + exec = true; + } else { + if opt.without_failfast { + let _ = writeln!(skipped_columns, + "-- SKIPPED (with_size_cut=false): {};\n-- Column {} type change from {} to {}", + alter_sql, dc.name, existing_type, desired_type); + } else { + return Err(format!( + "Column {} on {}.{} type change from {} to {} requires size reduction but with_size_cut is disabled. SQL: {};", + dc.name, schema, self.table_name, existing_type, desired_type, alter_sql + )); + } + } + } + TypeChange::Incompatible => { + // Incompatible type change - requires with_size_cut as it's destructive + let alter_sql = format!( + "ALTER TABLE {}.{} ALTER COLUMN {} TYPE {} USING {}::{}", + schema, self.table_name, dc.name, desired_def.column_type, dc.name, desired_def.column_type + ); + if opt.with_size_cut { + append(&alter_sql, &mut sql, opt.with_ddl_retry); + let _ = ts.columns.insert(dc.get_name(), desired_def); + exec = true; + } else { + if opt.without_failfast { + let _ = writeln!(skipped_columns, + "-- SKIPPED (with_size_cut=false): {};\n-- Column {} incompatible type change from {} to {}", + alter_sql, dc.name, existing_type, desired_type); + } else { + return Err(format!( + "Column {} on {}.{} has incompatible type change from {} to {} but with_size_cut is disabled. SQL: {};", + dc.name, schema, self.table_name, existing_type, desired_type, alter_sql + )); + } + } + } + TypeChange::NoChange => { + // No change needed + } + } + } + + // Check nullability: DB has NOT NULL but schema is nullable -> drop constraint + // Skip for primary key columns - they are always NOT NULL by definition + if !existing_nullable && desired_nullable && !dc.is_pk() { + let alter_sql = format!( + "ALTER TABLE {}.{} ALTER COLUMN {} DROP NOT NULL", + schema, self.table_name, dc.name + ); + append(&alter_sql, &mut sql, opt.with_ddl_retry); + exec = true; + } } } + // Log skipped column changes if any + if !skipped_columns.is_empty() { + #[cfg(not(feature = "slog"))] + eprintln!("Skipped column type changes:\n{}", skipped_columns); + } if let Some(o) = &ts.owner { if self.owner.len() > 0 && &self.owner != o { append(format!("ALTER TABLE {}.{} OWNER TO {}", schema, self.table_name, self.owner - ).as_str(), &mut sql, is_retry); + ).as_str(), &mut sql, opt.with_ddl_retry); } } - for dt in &self.triggers.list { - if !ts.triggers.contains_key(&dt.name) { - if let Some(def) = dt.trig_def(schema, &self.table_name) { - let _ = writeln!(sql, "{}\n", def); - let _ = ts.triggers.insert(dt.get_name(), def); - exec = true; + if !opt.exclude_triggers { + let mut skipped_triggers = String::new(); + for dt in &self.triggers.list { + let desired_trigger = dt.to_pg_trigger(); + let existing_trigger = ts.triggers.get(&dt.name); + let trigger_exists = existing_trigger.is_some(); + let trigger_changed = existing_trigger.map_or(false, |ex| !dt.matches_pg_trigger(ex)); + + if !trigger_exists { + // New trigger - create it + if let Some(def) = dt.trig_def(schema, &self.table_name) { + let _ = writeln!(sql, "{}\n", def); + let _ = ts.triggers.insert(dt.get_name(), desired_trigger); + exec = true; + } + } else if trigger_changed { + // Trigger changed + let drop_sql = format!("DROP TRIGGER IF EXISTS {} ON {}.{};", dt.name, schema, self.table_name); + let create_sql = dt.trig_def(schema, &self.table_name).unwrap_or_default(); + + if opt.with_trigger_drop { + // Drop and recreate + let _ = writeln!(sql, "{}", drop_sql); + let _ = writeln!(sql, "{}\n", create_sql); + let _ = ts.triggers.insert(dt.get_name(), desired_trigger); + exec = true; + } else { + if opt.without_failfast { + // Show skipped SQL + let _ = writeln!(skipped_triggers, "-- SKIPPED (with_trigger_drop=false):\n-- {}\n-- {}", drop_sql, create_sql); + } else { + return Err(format!( + "Trigger {} on {}.{} has changed but without_failfast is enabled. SQL would be:\n{}\n{}", + dt.name, schema, self.table_name, drop_sql, create_sql + )); + } + } + } + } + // Log skipped triggers if any + if !skipped_triggers.is_empty() { + #[cfg(not(feature = "slog"))] + eprintln!("Skipped trigger changes:\n{}", skipped_triggers); + } + } + + // Check for primary key changes + let desired_pk = self.get_primary_key_columns(); + let existing_pk = ts.primary_key.as_ref().map(|pk| pk.columns.clone()).unwrap_or_default(); + + if desired_pk != existing_pk { + let pk_constraint_name = ts.primary_key.as_ref() + .map(|pk| pk.constraint_name.clone()) + .unwrap_or_else(|| format!("{}_pkey", self.table_name)); + + let drop_pk_sql = if ts.primary_key.is_some() { + format!("ALTER TABLE {}.{} DROP CONSTRAINT {};", schema, self.table_name, pk_constraint_name) + } else { + String::new() + }; + + let add_pk_sql = if !desired_pk.is_empty() { + format!("ALTER TABLE {}.{} ADD PRIMARY KEY ({});", + schema, self.table_name, desired_pk.join(", ")) + } else { + String::new() + }; + + // PK change requires with_index_drop since PK is backed by a unique index + if opt.with_index_drop { + if !drop_pk_sql.is_empty() { + append(&drop_pk_sql, &mut sql, opt.with_ddl_retry); + } + if !add_pk_sql.is_empty() { + append(&add_pk_sql, &mut sql, opt.with_ddl_retry); + } + // Update dbc with new PK + ts.primary_key = if desired_pk.is_empty() { + None + } else { + Some(crate::loader::PgPrimaryKey { + constraint_name: format!("{}_pkey", self.table_name), + columns: desired_pk.clone(), + }) + }; + exec = true; + } else { + let mut skipped_pk = String::new(); + if !drop_pk_sql.is_empty() { + let _ = writeln!(skipped_pk, "-- {}", drop_pk_sql); + } + if !add_pk_sql.is_empty() { + let _ = writeln!(skipped_pk, "-- {}", add_pk_sql); + } + + if opt.without_failfast { + let _ = writeln!(skipped_pk, + "-- SKIPPED (with_index_drop=false): Primary key change from {:?} to {:?}", + existing_pk, desired_pk); + #[cfg(not(feature = "slog"))] + eprintln!("Skipped primary key changes:\n{}", skipped_pk); + } else { + return Err(format!( + "Primary key on {}.{} has changed from {:?} to {:?} but with_index_drop is disabled. SQL would be:\n{}{}", + schema, self.table_name, existing_pk, desired_pk, drop_pk_sql, add_pk_sql + )); } } } + CreateST::None } }, @@ -327,11 +612,26 @@ impl Table { } { // let owner = ""; let mut columns = String::new(); + + // Get primary key columns from column-level definitions (constraint.primaryKey: true) + let pk_columns = self.get_primary_key_columns(); + let has_composite_pk = pk_columns.len() > 1; + let mut st = PgTable { table_name: self.table_name.clone(), columns: HashMap::new(), fks: Default::default(), triggers: HashMap::new(), + indexes: HashMap::new(), + grants: HashMap::new(), + primary_key: if pk_columns.is_empty() { + None + } else { + Some(crate::loader::PgPrimaryKey { + constraint_name: format!("{}_pkey", self.table_name), + columns: pk_columns.clone(), + }) + }, sort_order: 0, table_comment: None, owner: if self.owner.len() > 0 { Some(self.owner.clone()) } else { None }, @@ -339,22 +639,32 @@ impl Table { for dc in &self.columns.list { let cd = dc.column_def(schema, &self.table_name, file)?; - // let _ = write!(columns, "{}, ", cd.def(true)); let _ = st.columns.insert(dc.get_name(), cd); self.comments(&mut comments, schema, &dc.name, &dc.description); } - // if columns.len() > 0 { - let pks = st.pks(); + // Build column definitions + // If composite PK or table-level PK, don't include pk in column def (will add as separate constraint) + let skip_column_pk = has_composite_pk; for dc in &self.columns.list { if let Some(cd) = st.columns.get(dc.name.as_str()) { - columns.push_str(cd.def(pks.is_some()).as_str()); + columns.push_str(cd.def(skip_column_pk).as_str()); columns.push_str(", "); } } - if let Some(pks) = pks { - columns.push_str(pks.as_str()); + + // Add PRIMARY KEY constraint if composite or table-level PK + if !pk_columns.is_empty() && has_composite_pk { + columns.push_str(&format!("PRIMARY KEY ({})", pk_columns.join(", "))); columns.push_str(", "); + } else if !pk_columns.is_empty() { + // Single column PK from column definition - handled in column def + // But we still need to add it if st.pks() returns something + let pks = st.pks(); + if let Some(pks) = pks { + columns.push_str(pks.as_str()); + columns.push_str(", "); + } } if let SchemaAndTable = do_create { if schema.as_str() != "public" { @@ -368,13 +678,20 @@ impl Table { if let Some(idx) = columns.rfind(",") { columns.remove(idx); } + + // Build partition clause from column-level partition_by + let partition_clause = self.columns.list.iter() + .find_map(|c| c.partition_by.as_ref().map(|pv| format!(" PARTITION BY {} ({})", pv, c.name))) + .unwrap_or_default(); + + let suffix = format!("{}{}", partition_clause, self.sql); let csql = format!("CREATE TABLE {}.{} ({}{}{}){}; \n", schema, self.table_name, columns, if self.constraint.len() > 0 { ", " } else { "" }, self.constraint, - self.sql + suffix ); sql.push_str(csql.as_str()); @@ -383,13 +700,15 @@ impl Table { append(format!( "ALTER TABLE {}.{} OWNER TO {}", schema, self.table_name, self.owner - ).as_str(), &mut sql, is_retry); + ).as_str(), &mut sql, opt.with_ddl_retry); } // } - for dt in &self.triggers.list { - if let Some(td) = dt.trig_def(schema, &self.table_name) { - let _ = writeln!(sql, "{}\n", td); - st.triggers.insert(dt.get_name(), td); + if !opt.exclude_triggers { + for dt in &self.triggers.list { + if let Some(td) = dt.trig_def(schema, &self.table_name) { + let _ = writeln!(sql, "{}\n", td); + st.triggers.insert(dt.get_name(), dt.to_pg_trigger()); + } } } dbc.get_mut(schema) @@ -407,31 +726,70 @@ impl Table { } } + + // Generate CREATE INDEX statements for indexes defined in YAML + let index_builder = crate::index::IndexBuilder::new(&self.columns); + let indexes_sql = index_builder.generate_sql(schema, &self.table_name, dbc, opt)?; + + // Generate GRANT/REVOKE statements for grants defined in YAML + let grant_builder = crate::grant::GrantBuilder::new(&self.grant, &self.table_name); + let grants_sql = grant_builder.generate_sql(schema, dbc, opt)?; + let mut data = String::new(); for row in &self.data { self.insert(&mut data, row, schema); } + let exec = exec || !indexes_sql.is_empty() || !grants_sql.is_empty(); + match dry_run { Some(store) => { - store(vec![sql, comments, data]).map(|_| false) + store(vec![sql, comments, indexes_sql, grants_sql, data]).map(|_| false) } None => { - #[cfg(feature = "slog")] log_debug(format!("deploy SQL {:?}[{}:{}]> {}", exec, file, schema, sql)); if exec { - let source = if file.len() > 0 { format!(", source: {}", file)} else {"".to_string()}; - let _ = db.batch_execute(sql.as_str()) - .map_err(|e| format!("DB execute [{}]: {} {}", sql, e, source))?; - let _ = db.batch_execute(comments.as_str()) - .map_err(|e| format!("DB execute [{}]: {} {}", comments, e, source))?; - let _ = db.batch_execute(data.as_str()) - .map_err(|e| format!("DB execute [{}]: {} {}", data, e, source))?; + if !sql.is_empty() { + #[cfg(feature = "slog")] log_debug("deploy SQL", &sql, file, schema); + let _ = db.batch_execute(sql.as_str()).await + .map_err(|e| Self::format_it("DB execute", sql, e, file))?; + } + if !comments.is_empty() { + #[cfg(feature = "slog")] log_debug("deploy SQL", &comments, file, schema); + let _ = db.batch_execute(comments.as_str()).await + .map_err(|e| Self::format_it("DB execute comments", comments, e, file))?; + } + if !indexes_sql.is_empty() { + #[cfg(feature = "slog")] log_debug("deploy SQL", &indexes_sql, file, schema); + let _ = db.batch_execute(indexes_sql.as_str()).await + .map_err(|e| Self::format_it("DB execute indexes", indexes_sql, e, file))?; + } + if !grants_sql.is_empty() { + #[cfg(feature = "slog")] log_debug("deploy SQL", &grants_sql, file, schema); + let _ = db.batch_execute(grants_sql.as_str()).await + .map_err(|e| Self::format_it("DB execute grants", grants_sql, e, file))?; + } + if !data.is_empty() { + #[cfg(feature = "slog")] log_debug("deploy SQL", &data, file, schema); + let _ = db.batch_execute(data.as_str()).await + .map_err(|e| Self::format_it("DB execute data upserts", data, e, file))?; + } } Ok(exec) } } } + fn format_it(msg: &str, sql: String, e: tokio_postgres::Error, file: &str) -> String { + format!("{} [{}] {} {} \nThe error is: {}", msg, sql, + if file.len() > 0 { ", source: " } else { "" }, + file, + match e.as_db_error() { + None => e.to_string(), + Some(e) => e.to_string() + } + ) + } + fn insert(&self, data: &mut String, row: &Vec, schema: &String) { let mut names = String::new(); let mut vals = String::new(); @@ -484,16 +842,16 @@ impl Table { /// build a create or alter sql #[allow(unused, unused_mut)] - pub fn deploy_fk( + pub async fn deploy_fk( &self, // target: &FileVersion, schemas: &OrderedHashMap, //FilesMap, dbc: &mut InfoSchemaType, - db: &mut Transaction, + db: &mut tokio_postgres::Transaction<'_>, schema: &String, is_retry: bool, file: &str, - dry_run: Option<&dyn Fn(Vec) -> Result<(), String>>, + dry_run: Option<&(dyn Fn(Vec) -> Result<(), String> + Send + Sync)>, ) -> Result { let mut sql = String::new(); let mut fk_list = HashMap::new(); @@ -541,9 +899,9 @@ impl Table { } } append(format!( - "ALTER TABLE {}.{} ADD CONSTRAINT fk_{}_{}_{} FOREIGN KEY ({}) REFERENCES {}.{} ({}) {}", + "ALTER TABLE {}.{} ADD CONSTRAINT fk_{}_{}_{}_{} FOREIGN KEY ({}) REFERENCES {}.{} ({}) {}", schema, self.table_name, schema, self.table_name, ff.table, - ff.name, ff.schema, &ff.table, ff.columns(), ff.sql + ff.name, ff.name, ff.schema, &ff.table, ff.columns(), ff.sql ).as_str(), &mut sql, is_retry); } @@ -553,7 +911,7 @@ impl Table { } None => { if exec { - if let Err(e) = db.batch_execute(sql.as_str()) { + if let Err(e) = db.batch_execute(sql.as_str()).await { return Err(format!("DB FK execute [{}]: {} source: {}", sql, e, file)); } Ok(exec) @@ -582,6 +940,84 @@ impl Table { self.transaction.as_str() == "table" || self.transaction.as_str() == "retry" } + + /// Check if this table is a template (should not be created in DB) + pub fn is_template(&self) -> bool { + self.is_template.unwrap_or(false) + } + + /// Merge another table's definition into this table (for template inheritance) + /// Template columns, triggers, and grants are added first, then this table's definitions override + pub fn merge_from(&mut self, template: &Table) { + // Merge columns - template columns come first, table columns override if same name + let mut merged_columns = OrderedHashMap::new(); + for col in &template.columns.list { + let _ = merged_columns.append(col.clone()); + } + for col in &self.columns.list { + if merged_columns.map.contains_key(&col.name) { + // Override existing column from template + if let Some(existing) = merged_columns.get_mut(&col.name) { + *existing = col.clone(); + } + } else { + let _ = merged_columns.append(col.clone()); + } + } + self.columns = merged_columns; + + // Merge triggers - template triggers come first, table triggers override if same name + let mut merged_triggers = OrderedHashMap::new(); + for trig in &template.triggers.list { + let _ = merged_triggers.append(trig.clone()); + } + for trig in &self.triggers.list { + if merged_triggers.map.contains_key(&trig.name) { + // Override existing trigger from template + if let Some(existing) = merged_triggers.get_mut(&trig.name) { + *existing = trig.clone(); + } + } else { + let _ = merged_triggers.append(trig.clone()); + } + } + self.triggers = merged_triggers; + + // Merge grants - template grants come first, then table grants are added + let mut merged_grants = template.grant.clone(); + merged_grants.extend(self.grant.clone()); + self.grant = merged_grants; + + // Inherit owner if not set + if self.owner.is_empty() && !template.owner.is_empty() { + self.owner = template.owner.clone(); + } + + // Inherit description if not set + if self.description.is_empty() && !template.description.is_empty() { + self.description = template.description.clone(); + } + + // Inherit sql suffix if not set + if self.sql.is_empty() && !template.sql.is_empty() { + self.sql = template.sql.clone(); + } + + // Inherit constraint if not set + if self.constraint.is_empty() && !template.constraint.is_empty() { + self.constraint = template.constraint.clone(); + } + // Note: primary key columns are inherited through merged columns with constraint.primaryKey + } + + /// Get the primary key columns from columns with constraint.primaryKey = true + /// Returns columns in order (preserves column order from the columns list) + pub fn get_primary_key_columns(&self) -> Vec { + self.columns.list.iter() + .filter(|c| c.is_pk()) + .map(|c| c.name.clone()) + .collect() + } } impl YGrant { @@ -663,6 +1099,162 @@ $do$; "#; +/// Map serial pseudo-types to their base integer type. +/// Returns `Some(base_type)` for serial/bigserial/smallserial, `None` otherwise. +fn serial_to_base_type(col_type: &str) -> Option<&'static str> { + match col_type.to_lowercase().as_str() { + "serial" | "serial4" => Some("integer"), + "bigserial" | "serial8" => Some("bigint"), + "smallserial" | "serial2" => Some("smallint"), + _ => None, + } +} + +/// Type of column type change +#[derive(Debug, PartialEq)] +enum TypeChange { + /// No change needed + NoChange, + /// Size extension (safe) - e.g., varchar(50) -> varchar(100) + SizeExtension, + /// Size reduction (potentially destructive) - e.g., varchar(100) -> varchar(50) + SizeReduction, + /// Compatible type change (safe) - e.g., int4 -> int8 + Compatible, + /// Incompatible type change (destructive) - e.g., int -> varchar + Incompatible, +} + +/// Parse size from type string, returns (base_type, size, scale) +/// Examples: +/// - varchar(100) -> ("varchar", Some(100), None) +/// - NUMERIC(10,2) -> ("numeric", Some(10), Some(2)) +/// - int4 -> ("int4", None, None) +fn parse_type_size(type_str: &str) -> (String, Option, Option) { + let type_lower = type_str.to_lowercase().trim().to_string(); + + if let Some(paren_pos) = type_lower.find('(') { + let base_type = type_lower[..paren_pos].trim().to_string(); + let params = &type_lower[paren_pos + 1..type_lower.len() - 1]; + + if params.contains(',') { + // NUMERIC(precision, scale) + let parts: Vec<&str> = params.split(',').collect(); + let precision = parts.get(0).and_then(|s| s.trim().parse::().ok()); + let scale = parts.get(1).and_then(|s| s.trim().parse::().ok()); + (base_type, precision, scale) + } else { + // varchar(length) + let size = params.trim().parse::().ok(); + (base_type, size, None) + } + } else { + (type_lower, None, None) + } +} + +/// Analyze the type change between existing and desired column types +fn analyze_type_change(existing: &str, desired: &str) -> TypeChange { + let (existing_base, existing_size, existing_scale) = parse_type_size(existing); + let (desired_base, desired_size, desired_scale) = parse_type_size(desired); + + // Normalize base types (int4 == integer == int == serial, etc.) + // serial/bigserial/smallserial are pseudo-types that create int4/int8/int2 with sequences + let normalize_base = |t: &str| -> String { + match t { + "int" | "int4" | "integer" | "serial" => "int4".to_string(), + "int8" | "bigint" | "bigserial" | "serial8" => "int8".to_string(), + "int2" | "smallint" | "smallserial" | "serial2" => "int2".to_string(), + "float4" | "real" => "float4".to_string(), + "float" | "float8" | "double precision" | "double" => "float8".to_string(), + "bool" | "boolean" => "bool".to_string(), + "varchar" | "character varying" => "varchar".to_string(), + "char" | "character" | "bpchar" => "char".to_string(), + "text" => "text".to_string(), + "numeric" | "decimal" => "numeric".to_string(), + "timestamptz" | "timestamp with time zone" => "timestamptz".to_string(), + "timestamp" | "timestamp without time zone" => "timestamp".to_string(), + "timetz" | "time with time zone" => "timetz".to_string(), + "time" | "time without time zone" => "time".to_string(), + "json" | "jsonb" => t.to_string(), // keep distinct, jsonb is different from json + other => other.to_string(), + } + }; + + let norm_existing = normalize_base(&existing_base); + let norm_desired = normalize_base(&desired_base); + + // Same base type - check size changes + if norm_existing == norm_desired { + match (&existing_size, &desired_size) { + (Some(e), Some(d)) => { + if e == d { + // Check scale for NUMERIC + match (&existing_scale, &desired_scale) { + (Some(es), Some(ds)) if es == ds => TypeChange::NoChange, + (Some(es), Some(ds)) if ds > es => TypeChange::SizeExtension, + (Some(_), Some(_)) => TypeChange::SizeReduction, + (None, None) => TypeChange::NoChange, + _ => TypeChange::SizeExtension, // Scale added or removed + } + } else if d > e { + TypeChange::SizeExtension + } else { + TypeChange::SizeReduction + } + } + (None, Some(_)) => TypeChange::SizeExtension, // Adding size constraint + (Some(_), None) => TypeChange::SizeExtension, // Removing size constraint (e.g., varchar(100) -> text) + (None, None) => TypeChange::NoChange, + } + } else { + // Different base types - check compatibility + // Safe promotions: int2 -> int4 -> int8, float4 -> float8 + let is_safe_promotion = matches!( + (norm_existing.as_str(), norm_desired.as_str()), + ("int2", "int4") | ("int2", "int8") | ("int4", "int8") | + ("float4", "float8") | + ("varchar", "text") | ("char", "text") | ("char", "varchar") + ); + + if is_safe_promotion { + TypeChange::Compatible + } else { + // Check if converting to varchar/text with sufficient size + // These conversions are safe if varchar is large enough to hold the string representation + let min_varchar_size = match norm_existing.as_str() { + "int2" => Some(6), // -32768 to 32767 + "int4" => Some(11), // -2147483648 to 2147483647 + "int8" => Some(20), // -9223372036854775808 to 9223372036854775807 + "float4" => Some(15), // ~7 decimal digits + sign + decimal point + exponent + "float8" => Some(25), // ~15 decimal digits + sign + decimal point + exponent + "numeric" => Some(existing_size.map(|s| s + 2).unwrap_or(40)), // precision + sign + decimal + "bool" => Some(5), // "false" + "timestamp" | "timestamptz" => Some(32), // '2024-01-22 12:34:56.123456+00' + "time" | "timetz" => Some(18), // '12:34:56.123456+00' + "date" => Some(10), // '2024-01-22' + "uuid" => Some(36), // 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx' + "json" | "jsonb" => None, // variable, can't determine safe size + _ => None, + }; + + // Check if target is varchar/text with sufficient size + let is_safe_to_varchar = match (norm_desired.as_str(), desired_size) { + ("text", _) => min_varchar_size.is_some(), // text has no limit + ("varchar", Some(size)) => min_varchar_size.map_or(false, |min| size >= min), + ("varchar", None) => min_varchar_size.is_some(), // varchar without size = unlimited + _ => false, + }; + + if is_safe_to_varchar { + TypeChange::Compatible + } else { + TypeChange::Incompatible + } + } + } +} + #[inline] fn pks(schema: &String, table: &String, sks: &OrderedHashMap) -> HashSet { let mut pk = HashSet::new(); diff --git a/tests/example_template.yaml b/tests/example_template.yaml new file mode 100644 index 0000000..8e3cbba --- /dev/null +++ b/tests/example_template.yaml @@ -0,0 +1,29 @@ + +database: + - schemaName: public + tables: + # Define a template (won't be created in DB) + - table: + tableName: base_table + template: true # This is a template + columns: + - column: { name: id, type: serial, constraint: { primaryKey: true } } + - column: { name: created_at, type: timestamp, defaultValue: now() } + - column: { name: updated_at, type: timestamp } + grant: + - all: app_user + + # Use the template + - table: + tableName: users + template: [base_table] # Inherit from base_table + columns: + - column: { name: email, type: varchar(255) } + - column: { name: name, type: varchar(100) } + + # Cross-schema template reference + - table: + tableName: orders + template: [public.base_table, audit_template] + columns: + - column: { name: total, type: numeric(10,2) } \ No newline at end of file