62
62
import org .eclipse .microprofile .config .inject .ConfigProperty ;
63
63
import org .jboss .logging .Logger ;
64
64
import org .neo4j .driver .Driver ;
65
+ import org .neo4j .driver .reactive .RxSession ;
65
66
import org .reactivestreams .FlowAdapters ;
66
67
import org .reactivestreams .Processor ;
67
68
@@ -86,6 +87,10 @@ public class ImportService {
86
87
87
88
@ Inject protected SourceService sourceService ;
88
89
90
+ static Uni <Void > sessionFinalizer (RxSession session ) {
91
+ return Uni .createFrom ().publisher (session .close ());
92
+ }
93
+
89
94
public Uni <String > runImport (UUID domainId ) {
90
95
91
96
if (pipelineMap .containsKey (domainId )) {
@@ -155,7 +160,6 @@ private Cancellable startImportTask(UUID domainId, Domain domain, Graph graph) {
155
160
private MultiFlatten <JsonNode , Long > createImportTask (
156
161
UUID domainId , Domain domain , String cypher , Processor <List <String >, JsonNode > publisher ) {
157
162
var counter = new AtomicLong (1L );
158
- var session = driver .asyncSession ();
159
163
return Multi .createFrom ()
160
164
.publisher (publisher )
161
165
.emitOn (Infrastructure .getDefaultWorkerPool ())
@@ -165,19 +169,36 @@ private MultiFlatten<JsonNode, Long> createImportTask(
165
169
var entry =
166
170
new ObjectMapper ()
167
171
.convertValue (node , new TypeReference <HashMap <String , Object >>() {});
168
- var s = driver .asyncSession ();
169
172
return Uni .createFrom ()
170
- .completionStage (
171
- s .runAsync (cypher , Map .of ("model" , entry , "domainId" , domainId .toString ()))
172
- .exceptionally (
173
- e -> {
174
- log .errorf (
175
- " Failed item import in file: %s on lines: %s msg: %s" ,
176
- domain .getFile (), node .get ("lines" ), e .getMessage (), e );
177
- return null ;
178
- })
179
- .thenCompose (x1 -> session .closeAsync ())
180
- .thenApply (v -> counter .getAndIncrement ()));
173
+ .emitter (
174
+ emitter ->
175
+ Multi .createFrom ()
176
+ .resource (
177
+ driver ::rxSession ,
178
+ session ->
179
+ session .writeTransaction (
180
+ tx ->
181
+ tx .run (
182
+ cypher ,
183
+ Map .of (
184
+ "model" ,
185
+ entry ,
186
+ "domainId" ,
187
+ domainId .toString ()))
188
+ .records ()))
189
+ .withFinalizer (ImportService ::sessionFinalizer )
190
+ .map (v -> counter .getAndIncrement ())
191
+ .onFailure ()
192
+ .invoke (
193
+ (throwable ) ->
194
+ log .errorf (
195
+ " Failed item import in file: %s on lines: %s msg: %s" ,
196
+ domain .getFile (),
197
+ node .get ("lines" ),
198
+ throwable .getMessage (),
199
+ throwable ))
200
+ .subscribe ()
201
+ .with (emitter ::complete ));
181
202
});
182
203
}
183
204
0 commit comments