Skip to content

Added support for DriverManager Properties #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject backtype/cascading-dbmigrate "1.1.0"
(defproject backtype/cascading-dbmigrate "1.1.1"
:source-path "src/clj"
:java-source-path "src/jvm"
:javac-options {:debug "true" :fork "true"}
Expand Down
56 changes: 51 additions & 5 deletions src/jvm/cascading/dbmigrate/hadoop/DBConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Map;
import java.util.Iterator;

public class DBConfiguration {

Expand All @@ -25,6 +28,7 @@ public class DBConfiguration {
public static final String NUM_CHUNKS = "mapred.jdbc.num.chunks";
public static final String MIN_ID = "dbmigrate.min.id";
public static final String MAX_ID = "dbmigrate.max.id";
public static final String DRIVER_PROPERTIES = "dbmigrate.driver.";

public void configureDB(String driverClass, String dbUrl, String userName, String passwd) {
job.set(DRIVER_CLASS_PROPERTY, driverClass);
Expand Down Expand Up @@ -58,12 +62,11 @@ public Connection getConnection() throws IOException {
Connection ret;

try {
if (job.get(DBConfiguration.USERNAME_PROPERTY) == null) {
ret = DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY));
Properties props = this.getDriverProperties();
if (props != null) {
ret = DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), props);
} else {
ret = DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY), job
.get(DBConfiguration.USERNAME_PROPERTY), job
.get(DBConfiguration.PASSWORD_PROPERTY));
ret = DriverManager.getConnection(job.get(DBConfiguration.URL_PROPERTY));
}
return ret;
} catch (SQLException exception) {
Expand Down Expand Up @@ -120,5 +123,48 @@ public Long getMaxId() {
if (job.get(MAX_ID) == null) { return null; }
return job.getLong(MAX_ID, -1);
}

public void setDriverProperties(Map<String,String> props) {
Iterator<Map.Entry<String,String>> dpit = props.entrySet().iterator();
while (dpit.hasNext()) {
Map.Entry<String,String> prop = dpit.next();
this.addDriverProperty(prop.getKey(), prop.getValue());
}
}

public void addDriverProperty(String key, String value) {
job.set( DBConfiguration.DRIVER_PROPERTIES + key, value);
}

public Properties getDriverProperties() {
Properties props = null;
Iterator<Map.Entry<String,String>> jobit = job.iterator();
while (jobit.hasNext()) {
Map.Entry<String, String> entry = jobit.next();
if (entry.getKey().startsWith(DBConfiguration.DRIVER_PROPERTIES)) {
String propertyKey = entry.getKey().substring(DBConfiguration.DRIVER_PROPERTIES.length());

if (props == null)
props = new Properties();

props.setProperty(propertyKey, entry.getValue());
}
}

String username = job.get(DBConfiguration.USERNAME_PROPERTY);
if (username != null) {
if (props == null)
props = new Properties();
props.setProperty("user", username);
}
String password = job.get(DBConfiguration.PASSWORD_PROPERTY);
if (password != null) {
if (props == null)
props = new Properties();
props.setProperty("password", password);
}
return props;
}

}

5 changes: 5 additions & 0 deletions src/jvm/cascading/dbmigrate/hadoop/DBInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.*;
import java.util.Map;

public class DBInputFormat implements InputFormat<LongWritable, TupleWrapper> {

Expand Down Expand Up @@ -248,6 +249,7 @@ public InputSplit[] getSplits(JobConf job, int ignored) throws IOException {

public static void setInput(JobConf job, int numChunks, String databaseDriver, String username,
String pwd, String dburl, String tableName, String pkColumn, Long minId, Long maxId,
Map<String,String> driverProps,
String... columnNames) {
job.setInputFormat(DBInputFormat.class);

Expand All @@ -259,6 +261,9 @@ public static void setInput(JobConf job, int numChunks, String databaseDriver, S
if (maxId != null) {
dbConf.setMaxId(maxId.longValue());
}
if (driverProps != null) {
dbConf.setDriverProperties(driverProps);
}
dbConf.setInputTableName(tableName);
dbConf.setInputColumnNames(columnNames);
dbConf.setPrimaryKeyColumn(pkColumn);
Expand Down
9 changes: 5 additions & 4 deletions src/jvm/cascading/dbmigrate/tap/DBMigrateTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import java.io.Serializable;

import java.util.Map;

public class DBMigrateTap extends Tap {
public static class Options implements Serializable {
public Long minId = null;
public Long maxId = null;
public Map<String,String> driverProps;
}

public class DBMigrateScheme extends Scheme {
String dbDriver;
String dbUrl;
Expand Down Expand Up @@ -57,8 +58,8 @@ public DBMigrateScheme(int numChunks, String dbDriver, String dbUrl, String user
public void sourceInit(Tap tap, JobConf jc) throws IOException {
// a hack for MultiInputFormat to see that there is a child format
FileInputFormat.setInputPaths( jc, getPath() );
DBInputFormat.setInput(jc, numChunks, dbDriver, username, pwd, dbUrl, tableName, pkColumn, options.minId, options.maxId, columnNames);

DBInputFormat.setInput(jc, numChunks, dbDriver, username, pwd, dbUrl, tableName, pkColumn, options.minId, options.maxId, options.driverProps, columnNames);
}

@Override
Expand Down