24
24
import java .io .Serializable ;
25
25
import java .time .LocalDate ;
26
26
import java .util .ArrayList ;
27
- import java .util .Iterator ;
27
+ import java .util .concurrent . ArrayBlockingQueue ;
28
28
import java .util .stream .Stream ;
29
29
30
30
import org .slf4j .Logger ;
41
41
public class Paginator
42
42
{
43
43
private static final Logger LOGGER = LoggerFactory .getLogger (Paginator .class );
44
+ private static final LocalDate R_EPOCH_DATE = LocalDate .of (1970 , 1 , 1 );
44
45
private static final double R_DOUBLE_NA = Double .longBitsToDouble (0x7ff00000000007a2L );
45
46
private static final int R_INT_NA = Integer .MIN_VALUE ;
46
47
47
48
private final DataSetMetadata dataStructure ;
48
- private final Iterator <DataPoint > iterator ;
49
+ private final ArrayBlockingQueue <DataPoint > queue ;
49
50
private final DataStructureComponent <?, ?, ?>[] comps ;
50
51
private final int [] types ;
51
52
private final int size ;
52
53
private final Object [] result ;
53
-
54
+
55
+ private boolean closed = false ;
54
56
55
57
public Paginator (DataSet dataset , int size )
56
58
{
@@ -59,6 +61,8 @@ public Paginator(DataSet dataset, int size)
59
61
comps = dataStructure .stream ().toArray (DataStructureComponent <?, ?, ?>[]::new );
60
62
result = new Object [comps .length ];
61
63
types = new int [comps .length ];
64
+ queue = new ArrayBlockingQueue <>(size );
65
+
62
66
for (int i = 0 ; i < comps .length ; i ++)
63
67
{
64
68
if (comps [i ].getVariable ().getDomain () instanceof NumberDomain )
@@ -71,10 +75,28 @@ else if (comps[i].getVariable().getDomain() instanceof DateDomain)
71
75
types [i ] = 4 ;
72
76
}
73
77
74
- try (Stream <DataPoint > stream = dataset .stream ())
75
- {
76
- iterator = stream .iterator ();
77
- }
78
+ Thread t = new Thread (() -> {
79
+ try (Stream <DataPoint > stream = dataset .stream ().onClose (() -> closed = true ))
80
+ {
81
+ stream .forEach (dp -> {
82
+ try
83
+ {
84
+ if (!closed )
85
+ queue .put (dp );
86
+ }
87
+ catch (InterruptedException e )
88
+ {
89
+ closed = true ;
90
+ }
91
+ });
92
+ }
93
+ finally
94
+ {
95
+ closed = true ;
96
+ }
97
+ }, "Paginator@" + hashCode ());
98
+ t .setDaemon (true );
99
+ t .start ();
78
100
}
79
101
80
102
public DataSetMetadata getDataStructure ()
@@ -110,8 +132,19 @@ public String[] getStringColumn(int i)
110
132
public void prepareMore ()
111
133
{
112
134
ArrayList <DataPoint > dps = new ArrayList <>(size );
113
- for (int i = 0 ; i < size && iterator .hasNext (); i ++)
114
- dps .add (iterator .next ());
135
+ queue .drainTo (dps );
136
+ while (!closed && dps .size () == 0 )
137
+ {
138
+ try
139
+ {
140
+ Thread .sleep (500 );
141
+ queue .drainTo (dps );
142
+ }
143
+ catch (InterruptedException e )
144
+ {
145
+ break ;
146
+ }
147
+ }
115
148
116
149
int newSize = dps .size ();
117
150
@@ -147,7 +180,7 @@ public void prepareMore()
147
180
{
148
181
case 1 : ((double []) array )[j ] = value == null ? R_DOUBLE_NA : ((Number ) value ).doubleValue (); break ;
149
182
case 2 : ((int []) array )[j ] = value == null ? R_INT_NA : value == Boolean .TRUE ? 1 : 0 ; break ;
150
- case 3 : ((int []) array )[j ] = value == null ? R_INT_NA : (int ) DAYS .between (LocalDate . of ( 1970 , 1 , 1 ) , (LocalDate ) value ); break ;
183
+ case 3 : ((int []) array )[j ] = value == null ? R_INT_NA : (int ) DAYS .between (R_EPOCH_DATE , (LocalDate ) value ); break ;
151
184
case 4 : ((String []) array )[j ] = value == null ? null : value .toString (); break ;
152
185
}
153
186
}
0 commit comments