Skip to content

Commit

Permalink
Unify Point's and InfluxDBMapper's POJO saving logic (#979)
Browse files Browse the repository at this point in the history
  • Loading branch information
eranl authored Dec 5, 2023
1 parent d697143 commit e93949f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 94 deletions.
28 changes: 27 additions & 1 deletion src/main/java/org/influxdb/dto/Point.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ private void addFieldByAttribute(final Object pojo, final Field field, final boo
}
} else {
if (fieldValue != null) {
this.fields.put(fieldName, fieldValue);
setField(field.getType(), fieldName, fieldValue);
}
}

Expand Down Expand Up @@ -379,6 +379,32 @@ public Point build() {
point.setTags(this.tags);
return point;
}

private void setField(
final Class<?> fieldType,
final String columnName,
final Object value) {
if (boolean.class.isAssignableFrom(fieldType) || Boolean.class.isAssignableFrom(fieldType)) {
addField(columnName, (boolean) value);
} else if (long.class.isAssignableFrom(fieldType) || Long.class.isAssignableFrom(fieldType)) {
addField(columnName, (long) value);
} else if (double.class.isAssignableFrom(fieldType) || Double.class.isAssignableFrom(fieldType)) {
addField(columnName, (double) value);
} else if (float.class.isAssignableFrom(fieldType) || Float.class.isAssignableFrom(fieldType)) {
addField(columnName, (float) value);
} else if (int.class.isAssignableFrom(fieldType) || Integer.class.isAssignableFrom(fieldType)) {
addField(columnName, (int) value);
} else if (short.class.isAssignableFrom(fieldType) || Short.class.isAssignableFrom(fieldType)) {
addField(columnName, (short) value);
} else if (String.class.isAssignableFrom(fieldType)) {
addField(columnName, (String) value);
} else if (Enum.class.isAssignableFrom(fieldType)) {
addField(columnName, ((Enum<?>) value).name());
} else {
throw new InfluxDBMapperException(
"Unsupported type " + fieldType + " for column " + columnName);
}
}
}

/**
Expand Down
100 changes: 10 additions & 90 deletions src/main/java/org/influxdb/impl/InfluxDBMapper.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
package org.influxdb.impl;

import java.lang.reflect.Field;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBMapperException;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

import java.util.List;

public class InfluxDBMapper extends InfluxDBResultMapper {

private final InfluxDB influxDB;
Expand Down Expand Up @@ -52,91 +47,16 @@ public <T> List<T> query(final Class<T> clazz) {

public <T> void save(final T model) {
throwExceptionIfMissingAnnotation(model.getClass());
cacheMeasurementClass(model.getClass());

ConcurrentMap<String, Field> colNameAndFieldMap = getColNameAndFieldMap(model.getClass());

try {
Class<?> modelType = model.getClass();
String measurement = getMeasurementName(modelType);
String database = getDatabaseName(modelType);
String retentionPolicy = getRetentionPolicy(modelType);
TimeUnit timeUnit = getTimeUnit(modelType);
long time = timeUnit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
Point.Builder pointBuilder = Point.measurement(measurement).time(time, timeUnit);

for (String key : colNameAndFieldMap.keySet()) {
Field field = colNameAndFieldMap.get(key);
Column column = field.getAnnotation(Column.class);
String columnName = column.name();
Class<?> fieldType = field.getType();

if (!field.isAccessible()) {
field.setAccessible(true);
}

Object value = field.get(model);

if (column.tag()) {
/** Tags are strings either way. */
pointBuilder.tag(columnName, value.toString());
} else if ("time".equals(columnName)) {
if (value != null) {
setTime(pointBuilder, fieldType, timeUnit, value);
}
} else {
setField(pointBuilder, fieldType, columnName, value);
}
}

Point point = pointBuilder.build();
Class<?> modelType = model.getClass();
String database = getDatabaseName(modelType);
String retentionPolicy = getRetentionPolicy(modelType);
Point.Builder pointBuilder = Point.measurementByPOJO(modelType).addFieldsFromPOJO(model);
Point point = pointBuilder.build();

if ("[unassigned]".equals(database)) {
influxDB.write(point);
} else {
influxDB.write(database, retentionPolicy, point);
}

} catch (IllegalAccessException e) {
throw new InfluxDBMapperException(e);
}
}

private void setTime(
final Point.Builder pointBuilder,
final Class<?> fieldType,
final TimeUnit timeUnit,
final Object value) {
if (Instant.class.isAssignableFrom(fieldType)) {
Instant instant = (Instant) value;
long time = timeUnit.convert(instant.toEpochMilli(), TimeUnit.MILLISECONDS);
pointBuilder.time(time, timeUnit);
} else {
throw new InfluxDBMapperException(
"Unsupported type " + fieldType + " for time: should be of Instant type");
}
}

private void setField(
final Point.Builder pointBuilder,
final Class<?> fieldType,
final String columnName,
final Object value) {
if (boolean.class.isAssignableFrom(fieldType) || Boolean.class.isAssignableFrom(fieldType)) {
pointBuilder.addField(columnName, (boolean) value);
} else if (long.class.isAssignableFrom(fieldType) || Long.class.isAssignableFrom(fieldType)) {
pointBuilder.addField(columnName, (long) value);
} else if (double.class.isAssignableFrom(fieldType)
|| Double.class.isAssignableFrom(fieldType)) {
pointBuilder.addField(columnName, (double) value);
} else if (int.class.isAssignableFrom(fieldType) || Integer.class.isAssignableFrom(fieldType)) {
pointBuilder.addField(columnName, (int) value);
} else if (String.class.isAssignableFrom(fieldType)) {
pointBuilder.addField(columnName, (String) value);
if ("[unassigned]".equals(database)) {
influxDB.write(point);
} else {
throw new InfluxDBMapperException(
"Unsupported type " + fieldType + " for column " + columnName);
influxDB.write(database, retentionPolicy, point);
}
}

}
6 changes: 3 additions & 3 deletions src/test/java/org/influxdb/dto/PointTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ public void testAddFieldsFromPOJOWithTimeColumnNull() throws NoSuchFieldExceptio
}

@Test
public void testAddFieldsFromPOJOWithData() throws NoSuchFieldException, IllegalAccessException {
public void testAddFieldsFromPOJOWithData() {
Pojo pojo = new Pojo();
pojo.booleanObject = true;
pojo.booleanPrimitive = false;
Expand All @@ -760,7 +760,6 @@ public void testAddFieldsFromPOJOWithData() throws NoSuchFieldException, Illegal
Assertions.assertEquals(pojo.integerPrimitive, p.getFields().get("integerPrimitive"));
Assertions.assertEquals(pojo.longObject, p.getFields().get("longObject"));
Assertions.assertEquals(pojo.longPrimitive, p.getFields().get("longPrimitive"));
Assertions.assertEquals(pojo.time, p.getFields().get("time"));
Assertions.assertEquals(pojo.uuid, p.getTags().get("uuid"));
}

Expand Down Expand Up @@ -815,7 +814,6 @@ public void testAddFieldsFromPOJOWithPublicAttributes() {
Assertions.assertEquals(pojo.integerPrimitive, p.getFields().get("integerPrimitive"));
Assertions.assertEquals(pojo.longObject, p.getFields().get("longObject"));
Assertions.assertEquals(pojo.longPrimitive, p.getFields().get("longPrimitive"));
Assertions.assertEquals(pojo.time, p.getFields().get("time"));
Assertions.assertEquals(pojo.uuid, p.getTags().get("uuid"));
}

Expand Down Expand Up @@ -962,6 +960,7 @@ static class Pojo {
private boolean booleanPrimitive;

@Column(name = "time")
@TimeColumn
private Instant time;

@Column(name = "uuid", tag = true)
Expand Down Expand Up @@ -1078,6 +1077,7 @@ static class PojoWithPublicAttributes {
public boolean booleanPrimitive;

@Column(name = "time")
@TimeColumn
public Instant time;

@Column(name = "uuid", tag = true)
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/org/influxdb/impl/InfluxDBMapperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.influxdb.TestUtils;
import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;
import org.influxdb.annotation.TimeColumn;
import org.influxdb.dto.Query;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -129,6 +130,7 @@ static class ServerMeasure {

/** Check the instant conversions */
@Column(name = "time")
@TimeColumn
private Instant time;

@Column(name = "name", tag = true)
Expand Down Expand Up @@ -322,6 +324,7 @@ public void setField(Integer field) {
static class NonInstantTime {

@Column(name = "time")
@TimeColumn
private long time;

public long getTime() {
Expand Down

0 comments on commit e93949f

Please sign in to comment.