diff --git a/src/com/vertica/jdbc/nativebinary/ColumnSpec.java b/src/com/vertica/jdbc/nativebinary/ColumnSpec.java index 7fbddfe..399cd07 100644 --- a/src/com/vertica/jdbc/nativebinary/ColumnSpec.java +++ b/src/com/vertica/jdbc/nativebinary/ColumnSpec.java @@ -4,14 +4,11 @@ import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.util.Calendar; -import java.util.Date; import java.util.TimeZone; import org.pentaho.di.core.exception.KettleValueException; -import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.row.ValueMetaInterface; public class ColumnSpec { @@ -25,7 +22,6 @@ public enum ConstantWidthType { INTEGER_32(ColumnType.INTEGER, 4), INTEGER_64(ColumnType.INTEGER, 8), BOOLEAN(ColumnType.BOOLEAN, 1), - NUMERIC(ColumnType.NUMERIC, 8), FLOAT(ColumnType.FLOAT, 8), DATE(ColumnType.DATE, 8), TIME(ColumnType.TIME, 8), @@ -33,9 +29,10 @@ public enum ConstantWidthType { TIMESTAMP(ColumnType.TIMESTAMP, 8), TIMESTAMPTZ(ColumnType.TIMESTAMPTZ, 8), INTERVAL(ColumnType.INTERVAL, 8); - + private final ColumnType type; private final int bytes; + private ConstantWidthType(ColumnType type, int bytes) { this.type = type; this.bytes = bytes; @@ -45,18 +42,18 @@ private ConstantWidthType(ColumnType type, int bytes) { public enum VariableWidthType { VARCHAR(ColumnType.VARCHAR), VARBINARY(ColumnType.VARBINARY); - + private final ColumnType type; private final int bytes = -1; private VariableWidthType(ColumnType type) { this.type = type; } } - + public enum UserDefinedWidthType { CHAR(ColumnType.CHAR), BINARY(ColumnType.BINARY); - + private final ColumnType type; private UserDefinedWidthType(ColumnType type) { this.type = type; @@ -65,32 +62,36 @@ private UserDefinedWidthType(ColumnType type) { public enum PrecisionScaleWidthType { NUMERIC(ColumnType.NUMERIC); - + private final ColumnType type; private PrecisionScaleWidthType(ColumnType type) { this.type = type; } } - + public final ColumnType type; public int bytes; public final int scale; private CharBuffer charBuffer; private CharsetEncoder charEncoder; private ByteBuffer mainBuffer; - private final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - private static final Calendar julianStartDateCalendar; - - static { - julianStartDateCalendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - julianStartDateCalendar.clear(); - julianStartDateCalendar.set(2000, 0, 1, 0, 0, 0); + private final Calendar calendarLocalTZ = Calendar.getInstance(); + private final Calendar calendarUTC = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + private static final Calendar julianStartDateCalendarUTC; + private static final Calendar julianStartDateCalendarLocalTZ; + + static { + julianStartDateCalendarUTC = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + julianStartDateCalendarUTC.clear(); + julianStartDateCalendarUTC.set(2000, 0, 1, 0, 0, 0); + julianStartDateCalendarLocalTZ = Calendar.getInstance(); + julianStartDateCalendarLocalTZ.clear(); + julianStartDateCalendarLocalTZ.set(2000, 0, 1, 0, 0, 0); + } - } - public ColumnSpec(PrecisionScaleWidthType precisionScaleWidthType, int precision, int scale) { this.type = precisionScaleWidthType.type; - this.bytes = Math.round((precision / 19 + 1) * 8); + this.bytes = -1 ; // NUMERIC is encoded as VARCHAR (length = -1) this.scale = scale; } @@ -99,13 +100,13 @@ public ColumnSpec(UserDefinedWidthType userDefinedWidthType, int bytes) { this.bytes = bytes; this.scale = 0; } - + public ColumnSpec(ConstantWidthType constantWidthType) { this.type = constantWidthType.type; this.bytes = constantWidthType.bytes; this.scale = 0; } - + public ColumnSpec(VariableWidthType variableWidthType) { this.type = variableWidthType.type; this.bytes = variableWidthType.bytes; @@ -127,14 +128,13 @@ public void setMainBuffer(ByteBuffer buffer) { public void encode(ValueMetaInterface valueMeta, Object value) throws CharacterCodingException, UnsupportedEncodingException, KettleValueException { if (value == null) return; int prevPosition, length, sizePosition; - ByteBuffer inputBinary; + byte[] inputBinary; long milliSeconds; switch (this.type) { case BINARY: - //TODO: Validate - inputBinary = (ByteBuffer)value; - length = inputBinary.limit(); + inputBinary = valueMeta.getBinaryString(value); + length = inputBinary.length; this.mainBuffer.put(inputBinary); for (int i = 0; i < (this.bytes - length); i++) { this.mainBuffer.put(BYTE_ZERO); @@ -158,11 +158,11 @@ public void encode(ValueMetaInterface valueMeta, Object value) throws CharacterC case DATE: //Get Julian date for 01/01/2000 long julianStart = toJulian(2000, 1, 1); - calendar.setTime(valueMeta.getDate(value)); + calendarLocalTZ.setTime(valueMeta.getDate(value)); long julianEnd = toJulian( - calendar.get(Calendar.YEAR), - calendar.get(Calendar.MONTH)+1, - calendar.get(Calendar.DAY_OF_MONTH) + calendarLocalTZ.get(Calendar.YEAR), + calendarLocalTZ.get(Calendar.MONTH)+1, + calendarLocalTZ.get(Calendar.DAY_OF_MONTH) ); this.mainBuffer.putLong(new Long(julianEnd - julianStart)); break; @@ -190,47 +190,49 @@ public void encode(ValueMetaInterface valueMeta, Object value) throws CharacterC case INTERVAL: this.mainBuffer.putLong(valueMeta.getInteger(value)); break; - case NUMERIC: - throw new UnsupportedOperationException("Encoding for NUMERIC data type is not implemented"); - // break; case TIME: // 64-bit integer in little-endian format containing the number of microseconds since midnight in the UTC time zone. - calendar.setTime(valueMeta.getDate(value)); - milliSeconds = calendar.get(Calendar.HOUR_OF_DAY) * 3600 * 1000 - + calendar.get(Calendar.MINUTE) * 60 * 1000 - + calendar.get(Calendar.SECOND) * 1000 - + calendar.get(Calendar.MILLISECOND); + // We actually use the local time instead of the UTC time because UTC time was giving wrong results. + calendarLocalTZ.setTime(valueMeta.getDate(value)); + milliSeconds = calendarLocalTZ.get(Calendar.HOUR_OF_DAY) * 3600 * 1000 + + calendarLocalTZ.get(Calendar.MINUTE) * 60 * 1000 + + calendarLocalTZ.get(Calendar.SECOND) * 1000 + + calendarLocalTZ.get(Calendar.MILLISECOND); this.mainBuffer.putLong(milliSeconds*1000); break; case TIMETZ: // 64-bit value where Upper 40 bits contain the number of microseconds since midnight and Lower 24 bits contain time zone as the UTC offset in microseconds calculated as follows: Time zone is logically from -24hrs to +24hrs from UTC. Instead it is represented here as a number between 0hrs to 48hrs. Therefore, 24hrs should be added to the actual time zone to calculate it. - calendar.setTime(valueMeta.getDate(value)); - milliSeconds = calendar.get(Calendar.HOUR_OF_DAY) * 3600 * 1000 - + calendar.get(Calendar.MINUTE) * 60 * 1000 - + calendar.get(Calendar.SECOND) * 1000 - + calendar.get(Calendar.MILLISECOND); + calendarUTC.setTime(valueMeta.getDate(value)); + milliSeconds = calendarUTC.get(Calendar.HOUR_OF_DAY) * 3600 * 1000 + + calendarUTC.get(Calendar.MINUTE) * 60 * 1000 + + calendarUTC.get(Calendar.SECOND) * 1000 + + calendarUTC.get(Calendar.MILLISECOND); final long timeZoneOffsetMicroseconds = 24 * 3600 ; this.mainBuffer.putLong(((milliSeconds * 1000) << 8*3) + timeZoneOffsetMicroseconds); break; case TIMESTAMP: - calendar.setTime(valueMeta.getDate(value)); - milliSeconds = calendar.getTimeInMillis() - julianStartDateCalendar.getTimeInMillis(); + // 64-bit integer in little-endian format containing the number of microseconds since Julian day: Jan 01 2000 00:00:00. + calendarLocalTZ.setTime(valueMeta.getDate(value)); + milliSeconds = calendarLocalTZ.getTimeInMillis() - julianStartDateCalendarLocalTZ.getTimeInMillis(); this.mainBuffer.putLong(new Long(milliSeconds * 1000)); break; case TIMESTAMPTZ: - calendar.setTime(valueMeta.getDate(value)); - milliSeconds = calendar.getTimeInMillis() - julianStartDateCalendar.getTimeInMillis(); + // A 64-bit integer in little-endian format containing the number of microseconds since Julian day: Jan 01 2000 00:00:00 in the UTC timezone. + calendarUTC.setTime(valueMeta.getDate(value)); + milliSeconds = calendarUTC.getTimeInMillis() - julianStartDateCalendarUTC.getTimeInMillis(); this.mainBuffer.putLong(new Long(milliSeconds * 1000)); break; case VARBINARY: - inputBinary = (ByteBuffer)value; sizePosition = this.mainBuffer.position(); this.mainBuffer.putInt(0); prevPosition = this.mainBuffer.position(); - this.mainBuffer.put(inputBinary); + this.mainBuffer.put(valueMeta.getBinaryString(value)); this.mainBuffer.putInt(sizePosition, this.mainBuffer.position() - prevPosition); this.bytes = this.mainBuffer.position() - sizePosition; break; + case NUMERIC: + // Numeric is encoded as VARCHAR. COPY statement uses is as a FILLER column for Vertica itself + // to convert into internal NUMERIC data format. case VARCHAR: this.charBuffer.clear(); this.charEncoder.reset(); @@ -252,39 +254,36 @@ public void encode(ValueMetaInterface valueMeta, Object value) throws CharacterC } -/** - * Returns the Julian day number that begins at noon of - * this day, Positive year signifies A.D., negative year B.C. - * Remember that the year after 1 B.C. was 1 A.D. - * NOTE THAT day and month are base 1 (January == 1) - * ref : - * Numerical Recipes in C, 2nd ed., Cambridge University Press 1992 - */ - // Gregorian Calendar adopted Oct. 15, 1582 (2299161) - public static int JGREG= 15 + 31*(10+12*1582); - public static double HALFSECOND = 0.5; + /** + * Returns the Julian Day Number from a Julian or Gregorian calendar date. + * The result is rounded to a Long. + *

+ * @param year + * @param month (Jan=1, ...) + * @param day (1, ...) + * @return julian day number + * @see http://en.wikipedia.org/wiki/Julian_day + */ + private static long toJulian(int year, int month, int day) { + // 1582.10.15 the day the Gregorian calendar went into effect: + final long chronologicalJulianDayNumber = 1582*10000 + 10*100 + 15; + + long a = (long)((14 - month)/12); + long y = year + 4800 - a; + long m = month + 12*a - 3; + double jdn; - private static long toJulian(int year, int month, int day) { - int julianYear = year; - if (year < 0) julianYear++; - int julianMonth = month; - if (month > 2) { - julianMonth++; - } - else { - julianYear--; - julianMonth += 13; - } + if((year*10000 + month*100 + day) >= chronologicalJulianDayNumber) { + // if starting from a Gregorian calendar date compute: + jdn = day + (long)((153*m+2)/5) + 365*y + (long)(y/4) - (long)(y/100) + (long)(y/400) - 32045; + } + else { + // Otherwise, if starting from a Julian calendar date compute: + jdn = day + (long)((153*m+2)/5) + 365*y + (long)(y/4) - 32083; + } - double julian = (java.lang.Math.floor(365.25 * julianYear) - + java.lang.Math.floor(30.6001*julianMonth) + day + 1720995.0); - if (day + 31 * (month + 12 * year) >= JGREG) { - // change over to Gregorian calendar - int ja = (int)(0.01 * julianYear); - julian += 2 - ja + (0.25 * ja); - } - return (long) java.lang.Math.floor(julian); - } + return Math.round(jdn); + } diff --git a/src/com/vertica/jdbc/nativebinary/StreamEncoder.java b/src/com/vertica/jdbc/nativebinary/StreamEncoder.java index 28e69a9..adce53c 100644 --- a/src/com/vertica/jdbc/nativebinary/StreamEncoder.java +++ b/src/com/vertica/jdbc/nativebinary/StreamEncoder.java @@ -67,6 +67,7 @@ public StreamEncoder(List columns, PipedInputStream inputStream) thr for (ColumnSpec column : columns) { switch (column.type) { + case NUMERIC: case CHAR: case VARCHAR: column.setCharBuffer(charBuffer); @@ -76,6 +77,7 @@ public StreamEncoder(List columns, PipedInputStream inputStream) thr break; } switch (column.type) { + case NUMERIC: case VARCHAR: case VARBINARY: this.rowMaxSize += MAX_CHAR_LENGTH; diff --git a/src/plugin/com/vertica/kettle/bulkloader/VerticaBulkLoader.java b/src/plugin/com/vertica/kettle/bulkloader/VerticaBulkLoader.java index aba611d..24cdf23 100644 --- a/src/plugin/com/vertica/kettle/bulkloader/VerticaBulkLoader.java +++ b/src/plugin/com/vertica/kettle/bulkloader/VerticaBulkLoader.java @@ -39,6 +39,7 @@ import com.vertica.jdbc.VerticaConnection; import com.vertica.jdbc.VerticaCopyStream; import com.vertica.jdbc.nativebinary.ColumnSpec; +import com.vertica.jdbc.nativebinary.ColumnType; import com.vertica.jdbc.nativebinary.StreamEncoder; @@ -54,6 +55,7 @@ public VerticaBulkLoader(StepMeta stepMeta, StepDataInterface stepDataInterface, super(stepMeta, stepDataInterface, copyNr, transMeta, trans); } + @Override public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { meta = (VerticaBulkLoaderMeta) smi; @@ -94,6 +96,8 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K ValueMetaInterface inputValueMeta = data.insertRowMeta.getValueMeta(insertFieldIdx); ValueMetaInterface insertValueMeta = inputValueMeta.clone(); ValueMetaInterface targetValueMeta = tableMeta.getValueMeta(insertFieldIdx); + insertValueMeta.setName(targetValueMeta.getName()); + data.insertRowMeta.setValueMeta(insertFieldIdx, insertValueMeta); ColumnSpec cs = getColumnSpecFromField(inputValueMeta, insertValueMeta, targetValueMeta); data.colSpecs.add(insertFieldIdx, cs); } @@ -169,6 +173,7 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K return true; } + private ColumnSpec getColumnSpecFromField(ValueMetaInterface inputValueMeta, ValueMetaInterface insertValueMeta, ValueMetaInterface targetValueMeta) { logBasic("Mapping input field " + inputValueMeta.getName() + " (" + inputValueMeta.getTypeDesc() + ")" + " to target column " + insertValueMeta.getName() + " (" + targetValueMeta.getOriginalColumnTypeName() + ") " ); @@ -186,23 +191,41 @@ private ColumnSpec getColumnSpecFromField(ValueMetaInterface inputValueMeta, Val } else if (targetColumnTypeName.equals("VARCHAR")) { return new ColumnSpec(ColumnSpec.VariableWidthType.VARCHAR); } else if (targetColumnTypeName.equals("DATE")) { - return new ColumnSpec(ColumnSpec.ConstantWidthType.DATE); + if (inputValueMeta.isDate() == false) + throw new IllegalArgumentException("Field " + inputValueMeta.getName() + " must be a Date compatible type to match target column " + insertValueMeta.getName()); + else + return new ColumnSpec(ColumnSpec.ConstantWidthType.DATE); } else if (targetColumnTypeName.equals("TIME")) { - return new ColumnSpec(ColumnSpec.ConstantWidthType.TIME); + if (inputValueMeta.isDate() == false) + throw new IllegalArgumentException("Field " + inputValueMeta.getName() + " must be a Date compatible type to match target column " + insertValueMeta.getName()); + else + return new ColumnSpec(ColumnSpec.ConstantWidthType.TIME); } else if (targetColumnTypeName.equals("TIMETZ")) { - return new ColumnSpec(ColumnSpec.ConstantWidthType.TIMETZ); + if (inputValueMeta.isDate() == false) + throw new IllegalArgumentException("Field " + inputValueMeta.getName() + " must be a Date compatible type to match target column " + insertValueMeta.getName()); + else + return new ColumnSpec(ColumnSpec.ConstantWidthType.TIMETZ); } else if (targetColumnTypeName.equals("TIMESTAMP")) { - return new ColumnSpec(ColumnSpec.ConstantWidthType.TIMESTAMP); + if (inputValueMeta.isDate() == false) + throw new IllegalArgumentException("Field " + inputValueMeta.getName() + " must be a Date compatible type to match target column " + insertValueMeta.getName()); + else + return new ColumnSpec(ColumnSpec.ConstantWidthType.TIMESTAMP); } else if (targetColumnTypeName.equals("TIMESTAMPTZ")) { - return new ColumnSpec(ColumnSpec.ConstantWidthType.TIMESTAMPTZ); + if (inputValueMeta.isDate() == false) + throw new IllegalArgumentException("Field " + inputValueMeta.getName() + " must be a Date compatible type to match target column " + insertValueMeta.getName()); + else + return new ColumnSpec(ColumnSpec.ConstantWidthType.TIMESTAMPTZ); } else if (targetColumnTypeName.equals("INTERVAL") || targetColumnTypeName.equals("INTERVAL DAY TO SECOND")) { - return new ColumnSpec(ColumnSpec.ConstantWidthType.INTERVAL); + if (inputValueMeta.isDate() == false) + throw new IllegalArgumentException("Field " + inputValueMeta.getName() + " must be a Date compatible type to match target column " + insertValueMeta.getName()); + else + return new ColumnSpec(ColumnSpec.ConstantWidthType.INTERVAL); } else if (targetColumnTypeName.equals("BINARY")) { return new ColumnSpec(ColumnSpec.VariableWidthType.VARBINARY); } else if (targetColumnTypeName.equals("VARBINARY")) { return new ColumnSpec(ColumnSpec.VariableWidthType.VARBINARY); } else if (targetColumnTypeName.equals("NUMERIC")) { - return new ColumnSpec(ColumnSpec.ConstantWidthType.NUMERIC); + return new ColumnSpec(ColumnSpec.PrecisionScaleWidthType.NUMERIC, targetValueMeta.getLength(),targetValueMeta.getPrecision()); } throw new IllegalArgumentException("Column type " + targetColumnTypeName + " not supported."); //$NON-NLS-1$ } @@ -261,17 +284,26 @@ private String buildCopyStatementSqlString() ) ); - if (meta.specifyFields()) { - final RowMetaInterface fields = data.insertRowMeta; - sb.append(" ("); - for (int i = 0; i < fields.size(); i++) - { - if (i > 0) sb.append(", "); - - sb.append(databaseMeta.quoteField(fields.getValueMeta(i).getName())); + sb.append(" ("); + final RowMetaInterface fields = data.insertRowMeta; + for (int i = 0; i < fields.size(); i++) + { + if (i > 0) sb.append(", "); + ColumnType columnType = data.colSpecs.get(i).type; + switch (columnType) { + case NUMERIC: + sb.append("TMPFILLERCOL").append(i).append(" FILLER VARCHAR(1000), "); + // Force columns to be quoted: + sb.append(databaseMeta.getStartQuote() + fields.getValueMeta(i).getName() + databaseMeta.getEndQuote()); + sb.append(" as TO_NUMBER(").append("TMPFILLERCOL").append(i).append(")"); + break; + default: + // Force columns to be quoted: + sb.append(databaseMeta.getStartQuote() + fields.getValueMeta(i).getName() + databaseMeta.getEndQuote()); + break; } - sb.append(")"); } + sb.append(")"); sb.append(" FROM STDIN NATIVE "); @@ -302,6 +334,8 @@ private String buildCopyStatementSqlString() // NO COMMIT does not seem to work even when the transformation setting 'make the transformation database transactional' is on // sb.append("NO COMMIT"); + logDebug("copy stmt: "+sb.toString()); + return sb.toString(); }