Skip to content

Commit beb3a93

Browse files
committed
Workaround for java to r conversion
1 parent 7cf79a2 commit beb3a93

File tree

4 files changed

+144
-151
lines changed

4 files changed

+144
-151
lines changed

vtl-bundles/vtl-r/RVTL/src/main/resources/R/R/VTLSession.R

+6-7
Original file line numberDiff line numberDiff line change
@@ -115,13 +115,12 @@ VTLSession <- R6Class("VTLSession",
115115
}
116116
else if (jnode %instanceof% "it.bancaditalia.oss.vtl.model.data.DataSet") {
117117
pager <- .jnew("it.bancaditalia.oss.vtl.util.Paginator",
118-
.jcast(jnode, "it.bancaditalia.oss.vtl.model.data.DataSet"))
119-
df <- convertToDF(tryCatch({ pager$more(-1L) },
120-
error = function(e) {
121-
e$jobj$printStackTrace()
122-
signalCondition(e)
123-
}, finally = { pager$close() })
124-
)
118+
.jcast(jnode, "it.bancaditalia.oss.vtl.model.data.DataSet"), 100L)
119+
nc <- jnode$getMetadata()$size()
120+
df <- tryCatch(convertDF(pager, nc), error = function(e) {
121+
e$jobj$printStackTrace()
122+
signalCondition(e)
123+
})
125124
attr(df, 'measures') <- sapply(jnode$getMetadata()$getMeasures(), function(x) { x$getVariable()$getName() })
126125
attr(df, 'identifiers') <- sapply(jnode$getMetadata()$getIDs(), function(x) { x$getVariable()$getName() })
127126
}

vtl-bundles/vtl-r/RVTL/src/main/resources/R/R/util.R

+22-9
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,26 @@ vtlTryCatch <- function(expr) {
4343
}))
4444
}
4545

46-
convertToDF <- function(jnode) {
47-
nodenames <- sapply(jnode$keySet(), .jstrVal)
48-
nodesvals <- lapply(nodenames, jnode$get)
49-
names(nodesvals) <- nodenames
50-
bools <- which(sapply(nodesvals, function(array) is.integer(array) && all(array %in% as.integer(c(1L, 0L, NA)))))
51-
dates <- which(sapply(nodesvals, function(array) is.integer(array) && !all(array %in% as.integer(c(1L, 0L, NA)))))
52-
nodesvals[dates] <- lapply(nodesvals[dates], as.Date, as.Date("1970-01-01"))
53-
nodesvals[bools] <- lapply(nodesvals[bools], as.logical)
54-
return(as.data.frame(nodesvals))
46+
convertDF <- function(pager, nc) {
47+
total <- NULL
48+
repeat {
49+
pager$prepareMore()
50+
part <- lapply(0:(nc - 1), function(i) {
51+
switch (pager$getType(i),
52+
pager$getDoubleColumn(i),
53+
lapply(pager$getIntColumn(i), as.logical),
54+
lapply(pager$getIntColumn(i), as.Date, as.Date("1970-01-01")),
55+
sapply(pager$getStringColumn(i), .jstrVal)
56+
)
57+
})
58+
names(part) <- sapply(0:(nc - 1), function(i) .jstrVal(pager$getName(i)))
59+
part <- as.data.frame(part)
60+
if (nrow(part) > 0) {
61+
total <- if (is.null(total)) part else rbind(total, part)
62+
} else {
63+
break
64+
}
65+
}
66+
67+
invisible(total)
5568
}

vtl-bundles/vtl-r/vtl-java2r/src/main/java/it/bancaditalia/oss/vtl/util/Paginator.java

+81-103
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,11 @@
2020
package it.bancaditalia.oss.vtl.util;
2121

2222
import static java.time.temporal.ChronoUnit.DAYS;
23-
import static java.util.concurrent.TimeUnit.SECONDS;
2423

2524
import java.io.Serializable;
2625
import java.time.LocalDate;
2726
import java.util.ArrayList;
28-
import java.util.HashMap;
29-
import java.util.List;
30-
import java.util.Map;
31-
import java.util.concurrent.ArrayBlockingQueue;
32-
import java.util.concurrent.BlockingQueue;
27+
import java.util.Iterator;
3328
import java.util.stream.Stream;
3429

3530
import org.slf4j.Logger;
@@ -43,138 +38,121 @@
4338
import it.bancaditalia.oss.vtl.model.domain.DateDomain;
4439
import it.bancaditalia.oss.vtl.model.domain.NumberDomain;
4540

46-
public class Paginator implements AutoCloseable
41+
public class Paginator
4742
{
4843
private static final Logger LOGGER = LoggerFactory.getLogger(Paginator.class);
4944
private static final double R_DOUBLE_NA = Double.longBitsToDouble(0x7ff00000000007a2L);
5045
private static final int R_INT_NA = Integer.MIN_VALUE;
5146

52-
private final BlockingQueue<DataPoint> queue = new ArrayBlockingQueue<>(1000);
5347
private final DataSetMetadata dataStructure;
48+
private final Iterator<DataPoint> iterator;
49+
private final DataStructureComponent<?, ?, ?>[] comps;
50+
private final int[] types;
51+
private final int size;
52+
private final Object[] result;
5453

55-
private boolean closed = false;
56-
private RuntimeException lastException = null;
5754

58-
public Paginator(DataSet dataset)
55+
public Paginator(DataSet dataset, int size)
5956
{
57+
this.size = size;
6058
dataStructure = dataset.getMetadata();
61-
Thread thread = new Thread(() -> {
62-
try (Stream<DataPoint> stream = dataset.stream())
63-
{
64-
for (DataPoint dp: (Iterable<DataPoint>) stream::iterator)
65-
while (!isClosed())
66-
try
67-
{
68-
queue.put(dp);
69-
break;
70-
}
71-
catch (InterruptedException e)
72-
{
73-
close();
74-
Thread.currentThread().interrupt();
75-
}
76-
}
77-
catch (Throwable t)
78-
{
79-
LOGGER.error(t.getMessage(), t);
80-
}
81-
finally
82-
{
83-
close();
84-
}
85-
});
86-
thread.setDaemon(true);
87-
thread.start();
59+
comps = dataStructure.stream().toArray(DataStructureComponent<?, ?, ?>[]::new);
60+
result = new Object[comps.length];
61+
types = new int[comps.length];
62+
for (int i = 0; i < comps.length; i++)
63+
{
64+
if (comps[i].getVariable().getDomain() instanceof NumberDomain)
65+
types[i] = 1;
66+
else if (comps[i].getVariable().getDomain() instanceof BooleanDomain)
67+
types[i] = 2;
68+
else if (comps[i].getVariable().getDomain() instanceof DateDomain)
69+
types[i] = 3;
70+
else // StringDomain, TimeDomain, TimePeriodDomain
71+
types[i] = 4;
72+
}
73+
74+
try (Stream<DataPoint> stream = dataset.stream())
75+
{
76+
iterator = stream.iterator();
77+
}
8878
}
8979

90-
public boolean isClosed()
80+
public DataSetMetadata getDataStructure()
9181
{
92-
return closed;
82+
return dataStructure;
9383
}
9484

95-
@Override
96-
public void close()
85+
public int getType(int i)
9786
{
98-
closed = true;
87+
return types[i];
9988
}
10089

101-
public DataSetMetadata getDataStructure()
90+
public String getName(int i)
10291
{
103-
return dataStructure;
92+
return comps[i].getVariable().getName();
10493
}
10594

106-
public List<DataPoint> moreDataPoints()
95+
public int[] getIntColumn(int i)
10796
{
108-
return moreDataPoints(20);
97+
return (int[]) result[i];
10998
}
110-
111-
public List<DataPoint> moreDataPoints(int size)
99+
100+
public double[] getDoubleColumn(int i)
112101
{
113-
List<DataPoint> result = new ArrayList<>();
114-
115-
while ((!isClosed() || !queue.isEmpty()) && (size <= 0 || result.size() < size))
116-
try
117-
{
118-
DataPoint element = queue.poll(1, SECONDS);
119-
if (element != null)
120-
result.add(element);
121-
}
122-
catch (InterruptedException e)
123-
{
124-
close();
125-
Thread.currentThread().interrupt();
126-
}
127-
128-
if (lastException != null)
129-
throw lastException;
130-
131-
return result;
102+
return (double[]) result[i];
132103
}
133-
134-
public Map<String, Object> more()
104+
105+
public String[] getStringColumn(int i)
135106
{
136-
return more(20);
107+
return (String[]) result[i];
137108
}
138-
139-
public Map<String, Object> more(int size)
109+
110+
public void prepareMore()
140111
{
141-
List<DataPoint> datapoints = moreDataPoints(size);
142-
Map<String, Object> result = new HashMap<>();
112+
ArrayList<DataPoint> dps = new ArrayList<>(size);
113+
for (int i = 0; i < size && iterator.hasNext(); i++)
114+
dps.add(iterator.next());
115+
116+
int newSize = dps.size();
117+
118+
LOGGER.info("Retrieving {} rows from dataset.", newSize);
143119

144-
for (DataStructureComponent<?, ?, ?> c: dataStructure)
145-
if (c.getVariable().getDomain() instanceof NumberDomain)
120+
boolean test = result[0] == null;
121+
if (!test)
122+
switch (types[0])
146123
{
147-
double[] array = (double[]) result.computeIfAbsent(c.getVariable().getName(), n -> new double[datapoints.size()]);
148-
for (int i = 0; i < datapoints.size(); i++)
149-
{
150-
Serializable v = datapoints.get(i).get(c).get();
151-
array[i] = v == null ? R_DOUBLE_NA : ((Number) v).doubleValue();
152-
}
124+
case 1: test = ((double[]) result[0]).length != newSize; break;
125+
case 2:
126+
case 3: test = ((int[]) result[0]).length != newSize; break;
127+
case 4: test = ((String[]) result[0]).length != newSize; break;
153128
}
154-
else if (c.getVariable().getDomain() instanceof BooleanDomain || c.getVariable().getDomain() instanceof DateDomain)
155-
{
156-
int[] array = (int[]) result.computeIfAbsent(c.getVariable().getName(), n -> new int[datapoints.size()]);
157-
for (int i = 0; i < datapoints.size(); i++)
129+
if (test)
130+
for (int i = 0; i < comps.length; i++)
131+
switch (types[i])
158132
{
159-
Serializable v = datapoints.get(i).get(c).get();
160-
if (v == null)
161-
array[i] = R_INT_NA;
162-
else if (c.getVariable().getDomain() instanceof BooleanDomain)
163-
array[i] = (Boolean) v ? 1 : 0;
164-
else
165-
array[i] = (int) DAYS.between(LocalDate.of(1970, 1, 1), (LocalDate) v);
133+
case 1: result[i] = new double[newSize]; break;
134+
case 2: result[i] = new int[newSize]; break;
135+
case 3: result[i] = new int[newSize]; break;
136+
case 4: result[i] = new String[newSize]; break;
137+
default: throw new IllegalStateException();
166138
}
167-
}
168-
else // StringDomain, TimeDomain, TimePeriodDomain
139+
140+
for (int i = 0; i < result.length; i++)
141+
{
142+
Object array = result[i];
143+
for (int j = 0; j < newSize; j++)
169144
{
170-
String[] array = (String[]) result.computeIfAbsent(c.getVariable().getName(), n -> new String[datapoints.size()]);
171-
for (int i = 0; i < datapoints.size(); i++)
145+
Serializable value = dps.get(j).get(comps[i]).get();
146+
switch (types[i])
172147
{
173-
Serializable v = datapoints.get(i).get(c).get();
174-
array[i] = v == null ? null : v.toString();
148+
case 1: ((double[]) array)[j] = value == null ? R_DOUBLE_NA : ((Number) value).doubleValue(); break;
149+
case 2: ((int[]) array)[j] = value == null ? R_INT_NA : value == Boolean.TRUE ? 1 : 0; break;
150+
case 3: ((int[]) array)[j] = value == null ? R_INT_NA : (int) DAYS.between(LocalDate.of(1970, 1, 1), (LocalDate) value); break;
151+
case 4: ((String[]) array)[j] = value == null ? null : value.toString(); break;
175152
}
176-
}
177-
178-
return result;
153+
}
154+
}
155+
156+
LOGGER.debug("Retrieving {} rows from dataset finished.", newSize);
179157
}
180158
}

vtl-envs/vtl-sdmxenv/src/main/java/it/bancaditalia/oss/vtl/impl/environment/SDMXEnvironment.java

+35-32
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public class SDMXEnvironment implements Environment, Serializable
131131
private static final Logger LOGGER = LoggerFactory.getLogger(SDMXEnvironment.class);
132132
private static final Map<DateTimeFormatter, TemporalQuery<? extends TemporalAccessor>> FORMATTERS = new HashMap<>();
133133
private static final DataStructureComponent<Identifier,EntireTimeDomainSubset,TimeDomain> TIME_PERIOD = DataStructureComponentImpl.of("TIME_PERIOD", Identifier.class, TIMEDS);
134-
private static final DataReaderManager DR_MANAGER = new DataReaderManagerImpl(new DataFormatManagerImpl(null, new InformationFormatManager()));
134+
// private static final DataReaderManager DR_MANAGER = new DataReaderManagerImpl(new DataFormatManagerImpl(null, new InformationFormatManager()));
135135
private static final SdmxSourceReadableDataLocationFactory RDL_FACTORY = new SdmxSourceReadableDataLocationFactory();
136136

137137
private final SDMXRepository repo;
@@ -214,42 +214,45 @@ public Optional<VTLValue> getValue(String alias)
214214
String resource = query.length > 1 ? "/" + query[1] : "";
215215
String[] dims = query.length > 1 ? query[1].split("\\.") : new String[] {};
216216

217-
String path = endpoint + "/data/" + dataflow + resource;
218-
ReadableDataLocation rdl;
219-
try
220-
{
221-
rdl = RDL_FACTORY.getReadableDataLocation(path);
222-
}
223-
catch (SdmxUnauthorisedException e)
224-
{
225-
try
226-
{
227-
URL url = new URI(path).toURL();
228-
URLConnection urlc = url.openConnection();
229-
urlc.setDoOutput(true);
230-
urlc.setAllowUserInteraction(false);
231-
urlc.addRequestProperty("Accept-Encoding", "gzip");
232-
urlc.addRequestProperty("Accept", "*/*;q=1.0");
233-
urlc.addRequestProperty("Authorization", "Basic " + Base64.getEncoder().encodeToString((SDMX_DATA_USERNAME.getValue() + ":" + SDMX_DATA_PASSWORD.getValue()).getBytes()));
234-
((HttpURLConnection) urlc).setInstanceFollowRedirects(true);
235-
InputStream is = urlc.getInputStream();
236-
rdl = RDL_FACTORY.getReadableDataLocation("gzip".equals(urlc.getContentEncoding()) ? new GZIPInputStream(is) : is);
237-
}
238-
catch (IOException | URISyntaxException e1)
239-
{
240-
throw new VTLException("Error in creating readableDataLocation", e);
241-
}
242-
}
243-
244-
DataReaderEngine dre = DR_MANAGER.getDataReaderEngine(rdl, repo.getBeanRetrievalManager(), new FirstFailureErrorHandler());
245-
246217
return Optional.of(new AbstractDataSet(structure) {
247218
private static final long serialVersionUID = 1L;
248-
219+
249220
@Override
250221
protected Stream<DataPoint> streamDataPoints()
251222
{
252-
return StreamSupport.stream(Spliterators.spliterator(new ObsIterator(alias, dre, structure, dims), 20, IMMUTABLE), false);
223+
synchronized (SDMXEnvironment.this)
224+
{
225+
String path = endpoint + "/data/" + dataflow + resource;
226+
ReadableDataLocation rdl;
227+
try
228+
{
229+
rdl = RDL_FACTORY.getReadableDataLocation(path);
230+
}
231+
catch (SdmxUnauthorisedException e)
232+
{
233+
try
234+
{
235+
URL url = new URI(path).toURL();
236+
URLConnection urlc = url.openConnection();
237+
urlc.setDoOutput(true);
238+
urlc.setAllowUserInteraction(false);
239+
urlc.addRequestProperty("Accept-Encoding", "gzip");
240+
urlc.addRequestProperty("Accept", "*/*;q=1.0");
241+
urlc.addRequestProperty("Authorization", "Basic " + Base64.getEncoder().encodeToString((SDMX_DATA_USERNAME.getValue() + ":" + SDMX_DATA_PASSWORD.getValue()).getBytes()));
242+
((HttpURLConnection) urlc).setInstanceFollowRedirects(true);
243+
InputStream is = urlc.getInputStream();
244+
rdl = RDL_FACTORY.getReadableDataLocation("gzip".equals(urlc.getContentEncoding()) ? new GZIPInputStream(is) : is);
245+
}
246+
catch (IOException | URISyntaxException e1)
247+
{
248+
throw new VTLException("Error in creating readableDataLocation", e);
249+
}
250+
}
251+
252+
DataReaderManager manager = new DataReaderManagerImpl(new DataFormatManagerImpl(null, new InformationFormatManager()));
253+
DataReaderEngine dre = manager.getDataReaderEngine(rdl, repo.getBeanRetrievalManager(), new FirstFailureErrorHandler());
254+
return StreamSupport.stream(Spliterators.spliterator(new ObsIterator(alias, dre, structure, dims), 20, IMMUTABLE), false);
255+
}
253256
}
254257
});
255258
}

0 commit comments

Comments
 (0)