Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JsonStreamWriter Support ISO TIMESTAMP String #1764

Open
ismailsimsek opened this issue Aug 28, 2022 · 8 comments
Open

JsonStreamWriter Support ISO TIMESTAMP String #1764

ismailsimsek opened this issue Aug 28, 2022 · 8 comments
Labels
api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@ismailsimsek
Copy link

What the problem is:

currently uploading ISO formatted Timestamp values fails with JSONObject does not have a int64 field at root.c_timestamptz.'.

currently its only accepting int values

is it possible to add support?

What you want to happen.
able to upload ISO formatted Timestamp values to TIMESTAMP field.

2022-08-28 12:56:50,656 INFO  [io.deb.ser.ConnectorLifecycle] (pool-11-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: JSONObject does not have a int64 field at root.c_timestamptz.', error = '{}': java.lang.IllegalArgumentException: JSONObject does not have a int64 field at root.c_timestamptz.
	at com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.fillField(JsonToProtoMessage.java:351)
	at com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.convertJsonToProtoMessageImpl(JsonToProtoMessage.java:176)
	at com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.convertJsonToProtoMessage(JsonToProtoMessage.java:115)
	at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:147)
	at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:106)
@product-auto-label product-auto-label bot added the api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. label Aug 28, 2022
@ismailsimsek
Copy link
Author

it might be that behavior changed between Version 2.20.0 and 2.20.1

with following dependency it works.

            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>google-cloud-bigquerystorage</artifactId>
                <version>2.20.0</version>
            </dependency>

@prash-mi prash-mi added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. priority: p2 Moderately-important priority. Fix may not be included in next release. labels Aug 29, 2022
@Neenu1995
Copy link
Contributor

Hi @ismailsimsek ,
When you say it might be that behavior changed between Version 2.20.0 and 2.20.1, does that mean 2.20.1 does not support the feature, but 2.20.0 does?

Can you also provide a code snippet to reproduce the issue?

@ismailsimsek
Copy link
Author

@Neenu1995 Correct it was working before. after upgrading dependency to 2.20.1 it started giving error

@ismailsimsek
Copy link
Author

here is code snippet

package io.debezium.server.bigquery;

import java.io.IOException;
import java.util.Arrays;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.protobuf.Descriptors;
import org.json.JSONArray;
import org.json.JSONObject;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.common.collect.ImmutableMap;

public class testStreamLoading {

  public static void main(String[] args) throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
    // [START ]
    // Table schema definition
    BigQuery bigquery = BigQueryOptions.newBuilder()
        .setCredentials(GoogleCredentials.getApplicationDefault())
        .setLocation("EU")
        .build()
        .getService();
    Field[] fields =
        new Field[] {
            Field.of("c_id", LegacySQLTypeName.INTEGER),
            Field.of("c_ts", LegacySQLTypeName.TIMESTAMP)
        };
    // Table schema definition
    Schema schema = Schema.of(fields);

    BigQueryWriteSettings bigQueryWriteSettings = BigQueryWriteSettings
        .newBuilder()
        .setCredentialsProvider(FixedCredentialsProvider.create(GoogleCredentials.getApplicationDefault()))
        .build();
    BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create(bigQueryWriteSettings);
    TableName tn = TableName.of(bigquery.getOptions().getProjectId(), "stage", "test_json_loading");
    
    JsonStreamWriter streamWriter = JsonStreamWriter
        .newBuilder(tn.toString(), BqToBqStorageSchemaConverter.convertTableSchema(schema), bigQueryWriteClient)
        .build();

    JSONArray jsonArr = new JSONArray();
    JSONObject record = new JSONObject();
    record.put("c_id", 1);
    record.put("c_ts", "2019-11-14T00:55:31.820Z");
    jsonArr.put(record);

    streamWriter.append(jsonArr);
    System.out.println("DONE");
    // [END ]
  }
}

/** Converts structure from BigQuery client to BigQueryStorage client */
class BqToBqStorageSchemaConverter {
  private static ImmutableMap<Field.Mode, TableFieldSchema.Mode> BQTableSchemaModeMap =
      ImmutableMap.of(
          Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
          Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
          Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);

  private static ImmutableMap<StandardSQLTypeName, TableFieldSchema.Type> BQTableSchemaTypeMap =
      new ImmutableMap.Builder<StandardSQLTypeName, TableFieldSchema.Type>()
          .put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
          .put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
          .put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
          .put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
          .put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
          .put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
          .put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
          .put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
          .put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
          .put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
          .put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
          .put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
          .build();

  /**
   * Converts from BigQuery client Table Schema to bigquery storage API Table Schema.
   *
   * @param schema the BigQuery client Table Schema
   * @return the bigquery storage API Table Schema
   */
  public static TableSchema convertTableSchema(Schema schema) {
    TableSchema.Builder result = TableSchema.newBuilder();
    for (int i = 0; i < schema.getFields().size(); i++) {
      result.addFields(i, convertFieldSchema(schema.getFields().get(i)));
    }
    return result.build();
  }

  /**
   * Converts from bigquery v2 Field Schema to bigquery storage API Field Schema.
   *
   * @param field the BigQuery client Field Schema
   * @return the bigquery storage API Field Schema
   */
  public static TableFieldSchema convertFieldSchema(Field field) {
    TableFieldSchema.Builder result = TableFieldSchema.newBuilder();
    if (field.getMode() == null) {
      field = field.toBuilder().setMode(Field.Mode.NULLABLE).build();
    }
    result.setMode(BQTableSchemaModeMap.get(field.getMode()));
    result.setName(field.getName());
    result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType()));
    if (field.getDescription() != null) {
      result.setDescription(field.getDescription());
    }
    if (field.getSubFields() != null) {
      for (int i = 0; i < field.getSubFields().size(); i++) {
        result.addFields(i, convertFieldSchema(field.getSubFields().get(i)));
      }
    }
    return result.build();
  }
}

@ismailsimsek
Copy link
Author

ismailsimsek commented Aug 29, 2022

libs which its working without error


[INFO] +- com.google.cloud:google-cloud-bigquery:jar:2.14.6:compile
[INFO] |  +- com.google.apis:google-api-services-bigquery:jar:v2-rev20220806-2.0.0:compile
[INFO] |  +- com.google.cloud:google-cloud-bigquerystorage:jar:2.20.0:compile
[INFO] |  +- com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1:jar:0.134.1:compile
[INFO] |  +- com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2:jar:0.134.1:compile
[INFO] |  +- com.google.api.grpc:grpc-google-cloud-bigquerystorage-v1beta1:jar:0.134.1:compile
[INFO] |  +- com.google.api.grpc:grpc-google-cloud-bigquerystorage-v1beta2:jar:0.134.1:compile
[INFO] |  +- com.google.api.grpc:grpc-google-cloud-bigquerystorage-v1:jar:2.10.1:compile
[INFO] |  +- com.google.api.grpc:proto-google-cloud-bigquerystorage-v1:jar:2.10.1:compile

@dark0dave
Copy link
Contributor

hmm, this is interesting, I wonder what broke this. As we do have tests for this.

@dark0dave
Copy link
Contributor

I'll have a look when I have some more time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

4 participants