diff --git a/project.clj b/project.clj index 2eb2a2e..876c348 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject backtype/cascading-dbmigrate "1.1.0" +(defproject backtype/cascading-dbmigrate "1.1.1" :source-path "src/clj" :java-source-path "src/jvm" :javac-options {:debug "true" :fork "true"} diff --git a/src/jvm/cascading/dbmigrate/hadoop/DBConfiguration.java b/src/jvm/cascading/dbmigrate/hadoop/DBConfiguration.java index 9d381d9..3947a72 100644 --- a/src/jvm/cascading/dbmigrate/hadoop/DBConfiguration.java +++ b/src/jvm/cascading/dbmigrate/hadoop/DBConfiguration.java @@ -12,6 +12,9 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.Properties; +import java.util.Map; +import java.util.Iterator; public class DBConfiguration { @@ -25,6 +28,7 @@ public class DBConfiguration { public static final String NUM_CHUNKS = "mapred.jdbc.num.chunks"; public static final String MIN_ID = "dbmigrate.min.id"; public static final String MAX_ID = "dbmigrate.max.id"; + public static final String DRIVER_PROPERTIES = "dbmigrate.driver."; public void configureDB(String driverClass, String dbUrl, String userName, String passwd) { job.set(DRIVER_CLASS_PROPERTY, driverClass); @@ -58,12 +62,11 @@ public Connection getConnection() throws IOException { Connection ret; try { - if (job.get(DBConfiguration.USERNAME_PROPERTY) == null) { - ret = DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY)); + Properties props = this.getDriverProperties(); + if (props != null) { + ret = DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), props); } else { - ret = DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), job - .get(DBConfiguration.USERNAME_PROPERTY), job - .get(DBConfiguration.PASSWORD_PROPERTY)); + ret = DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY)); } return ret; } catch (SQLException exception) { @@ -120,5 +123,48 @@ public Long getMaxId() { if (job.get(MAX_ID) == null) { return null; } return job.getLong(MAX_ID, -1); } + + public void setDriverProperties(Map props) { + Iterator> dpit = props.entrySet().iterator(); + while (dpit.hasNext()) { + Map.Entry prop = dpit.next(); + this.addDriverProperty(prop.getKey(), prop.getValue()); + } + } + + public void addDriverProperty(String key, String value) { + job.set( DBConfiguration.DRIVER_PROPERTIES + key, value); + } + + public Properties getDriverProperties() { + Properties props = null; + Iterator> jobit = job.iterator(); + while (jobit.hasNext()) { + Map.Entry entry = jobit.next(); + if (entry.getKey().startsWith(DBConfiguration.DRIVER_PROPERTIES)) { + String propertyKey = entry.getKey().substring(DBConfiguration.DRIVER_PROPERTIES.length()); + + if (props == null) + props = new Properties(); + + props.setProperty(propertyKey, entry.getValue()); + } + } + + String username = job.get(DBConfiguration.USERNAME_PROPERTY); + if (username != null) { + if (props == null) + props = new Properties(); + props.setProperty("user", username); + } + String password = job.get(DBConfiguration.PASSWORD_PROPERTY); + if (password != null) { + if (props == null) + props = new Properties(); + props.setProperty("password", password); + } + return props; + } + } diff --git a/src/jvm/cascading/dbmigrate/hadoop/DBInputFormat.java b/src/jvm/cascading/dbmigrate/hadoop/DBInputFormat.java index 44e5de4..2986c82 100644 --- a/src/jvm/cascading/dbmigrate/hadoop/DBInputFormat.java +++ b/src/jvm/cascading/dbmigrate/hadoop/DBInputFormat.java @@ -19,6 +19,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.sql.*; +import java.util.Map; public class DBInputFormat implements InputFormat { @@ -248,6 +249,7 @@ public InputSplit[] getSplits(JobConf job, int ignored) throws IOException { public static void setInput(JobConf job, int numChunks, String databaseDriver, String username, String pwd, String dburl, String tableName, String pkColumn, Long minId, Long maxId, + Map driverProps, String... columnNames) { job.setInputFormat(DBInputFormat.class); @@ -259,6 +261,9 @@ public static void setInput(JobConf job, int numChunks, String databaseDriver, S if (maxId != null) { dbConf.setMaxId(maxId.longValue()); } + if (driverProps != null) { + dbConf.setDriverProperties(driverProps); + } dbConf.setInputTableName(tableName); dbConf.setInputColumnNames(columnNames); dbConf.setPrimaryKeyColumn(pkColumn); diff --git a/src/jvm/cascading/dbmigrate/tap/DBMigrateTap.java b/src/jvm/cascading/dbmigrate/tap/DBMigrateTap.java index e72c3a4..449d0ab 100644 --- a/src/jvm/cascading/dbmigrate/tap/DBMigrateTap.java +++ b/src/jvm/cascading/dbmigrate/tap/DBMigrateTap.java @@ -21,14 +21,15 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import java.io.Serializable; - +import java.util.Map; public class DBMigrateTap extends Tap { public static class Options implements Serializable { public Long minId = null; public Long maxId = null; + public Map driverProps; } - + public class DBMigrateScheme extends Scheme { String dbDriver; String dbUrl; @@ -57,8 +58,8 @@ public DBMigrateScheme(int numChunks, String dbDriver, String dbUrl, String user public void sourceInit(Tap tap, JobConf jc) throws IOException { // a hack for MultiInputFormat to see that there is a child format FileInputFormat.setInputPaths( jc, getPath() ); - - DBInputFormat.setInput(jc, numChunks, dbDriver, username, pwd, dbUrl, tableName, pkColumn, options.minId, options.maxId, columnNames); + + DBInputFormat.setInput(jc, numChunks, dbDriver, username, pwd, dbUrl, tableName, pkColumn, options.minId, options.maxId, options.driverProps, columnNames); } @Override