22
33import java .io .File ;
44import java .util .ArrayList ;
5+ import java .util .HashMap ;
56import java .util .List ;
7+ import java .util .Map ;
68
79import jakarta .annotation .PreDestroy ;
810import jakarta .xml .bind .JAXB ;
@@ -55,18 +57,7 @@ public void run(String... args) {
5557 warnings .add ("Can't read file " + arg );
5658 } else {
5759 try {
58- XmlConfig xml = JAXB .unmarshal (config , XmlConfig .class );
59- for (XmlConfig .Cdc cdc : xml .getCdcs ()) {
60- Result <YqlWriter > writer = YqlWriter .parse (ydb , cdc );
61- if (!writer .isSuccess ()) {
62- logger .error ("can't create reader {} with problem {}" ,
63- cdc .getConsumer (), writer .getStatus ());
64- warnings .add ("can't create reader " + cdc .getConsumer ()
65- + " with problem: " + writer .getStatus ());
66- } else {
67- readers .add (new CdcReader (ydb , writer .getValue (), cdc .getConsumer (), cdc .getChangefeed ()));
68- }
69- }
60+ loadXmlConfig (config .toURI ().toASCIIString ());
7061 } catch (RuntimeException ex ) {
7162 logger .warn ("can't parse file {}" , arg , ex );
7263 warnings .add ("Parse exception: " + ex .getMessage ());
@@ -85,6 +76,25 @@ public void run(String... args) {
8576 logger .info ("app has started" );
8677 }
8778
79+ private void loadXmlConfig (String content ) {
80+ XmlConfig xml = JAXB .unmarshal (content , XmlConfig .class );
81+ Map <String , XmlConfig .Query > queries = new HashMap ();
82+ for (XmlConfig .Query query : xml .getQueries ()) {
83+ queries .put (query .getId (), query );
84+ }
85+
86+ for (XmlConfig .Cdc cdc : xml .getCdcs ()) {
87+ Result <CdcMsgParser > batcher = CdcMsgParser .parse (ydb , queries , cdc );
88+ if (!batcher .isSuccess ()) {
89+ logger .error ("can't create reader {} with problem {}" , cdc .getConsumer (), batcher .getStatus ());
90+ warnings .add ("can't create reader " + cdc .getConsumer () + " with problem: " + batcher .getStatus ());
91+ } else {
92+ YqlWriter writer = new YqlWriter (ydb , batcher .getValue (), cdc );
93+ readers .add (new CdcReader (ydb , writer , cdc .getConsumer (), cdc .getChangefeed ()));
94+ }
95+ }
96+ }
97+
8898 @ PreDestroy
8999 public void preDestroy () {
90100 logger .info ("app has closed" );
0 commit comments