From b94d342e15393e5343a0cc6225504240e4eab186 Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Mon, 2 Jan 2023 19:38:41 +0800 Subject: [PATCH 1/6] [Feature][Connector V2] Add GoogleSheets Sink --- .../Error-Quick-Reference-Manual.md | 6 ++ docs/en/connector-v2/sink/GoogleSheets.md | 55 +++++++++++ plugin-mapping.properties | 1 + .../google/sheets/config/RangePosition.java | 57 +++++++++++ .../google/sheets/config/SheetsConfig.java | 7 +- .../sheets/config/SheetsParameters.java | 40 ++++++++ .../GoogleSheetsConnectorErrorCode.java | 48 ++++++++++ .../serialize/GoogleSheetsSerializer.java | 40 ++++++++ .../serialize/SeaTunnelRowSerializer.java | 27 ++++++ .../google/sheets/sink/SheetsSink.java | 81 ++++++++++++++++ .../google/sheets/sink/SheetsSinkFactory.java | 43 +++++++++ .../google/sheets/sink/SheetsSinkWriter.java | 95 +++++++++++++++++++ .../sheets/source/SheetsSourceReader.java | 30 +----- 13 files changed, 500 insertions(+), 30 deletions(-) create mode 100644 docs/en/connector-v2/sink/GoogleSheets.md create mode 100644 seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/RangePosition.java create mode 100644 seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsConnectorErrorCode.java create mode 100644 seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/GoogleSheetsSerializer.java create mode 100644 seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/SeaTunnelRowSerializer.java create mode 100644 seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSink.java create mode 100644 seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkFactory.java create mode 100644 seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md index cff518050d7..31244459c54 100644 --- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md +++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md @@ -212,3 +212,9 @@ problems encountered by users. |-------------|-----------------------------------------|----------------------------------------------------------------------------------------------------------------------| | DINGTALK-01 | Send response to DinkTalk server failed | When users encounter this error code, it means that send response message to DinkTalk server failed, please check it | | DINGTALK-02 | Get sign from DinkTalk server failed | When users encounter this error code, it means that get signature from DinkTalk server failed , please check it | + +## GoogleSheets Connector Error Codes + +| code | description | solution | +|-----------------|--------------------------------------------|----------------------------------------------------------------------------------------------------------------------| +| GOOGLESHEETS-01 | Build google sheets http request exception | When users encounter this error code, it means that send http request to build google sheets failed, please check it | diff --git a/docs/en/connector-v2/sink/GoogleSheets.md b/docs/en/connector-v2/sink/GoogleSheets.md new file mode 100644 index 00000000000..2c25445ad5c --- /dev/null +++ b/docs/en/connector-v2/sink/GoogleSheets.md @@ -0,0 +1,55 @@ +# GoogleSheets + +> GoogleSheets sink connector +## Description + +Used to write data to GoogleSheets. + +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [schema projection](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|------------------- |--------------|----------|---------------| +| service_account_key | string | yes | - | +| sheet_id | string | yes | - | +| sheet_name | string | yes | - | +| range | string | yes | - | + +### service_account_key [string] + +google cloud service account, base64 required + +### sheet_id [string] + +sheet id in a Google Sheets URL + +### sheet_name [string] + +the name of the sheet you want to output + +### range [string] + +the range of the sheet you want to output + +## Example + +simple: + +```hocon + GoogleSheets { + service_account_key = "seatunnel-test" + sheet_id = "1VI0DvyZK-NIdssSdsDSsSSSC-_-rYMi7ppJiI_jhE" + sheet_name = "sheets01" + range = "A1:C3" + } +``` + +## Changelog + +### next version + +- Add GoogleSheets Sink Connector \ No newline at end of file diff --git a/plugin-mapping.properties b/plugin-mapping.properties index ea4d4e29757..8989f7b9242 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -149,6 +149,7 @@ seatunnel.sink.StarRocks = connector-starrocks seatunnel.source.MyHours = connector-http-myhours seatunnel.sink.InfluxDB = connector-influxdb seatunnel.source.GoogleSheets = connector-google-sheets +seatunnel.sink.GoogleSheets = connector-google-sheets seatunnel.sink.Tablestore = connector-tablestore seatunnel.source.Lemlist = connector-http-lemlist seatunnel.source.Klaviyo = connector-http-klaviyo diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/RangePosition.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/RangePosition.java new file mode 100644 index 00000000000..465150a4232 --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/RangePosition.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.config; + +import lombok.Data; + +import java.io.Serializable; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@Data +public class RangePosition implements Serializable { + + private String startX; + private Integer startY; + private String endX; + private Integer endY; + + public RangePosition buildWithRange(String range) { + RangePosition rangePosition = new RangePosition(); + String[] ranges = range.split(":"); + Pattern xPattern = Pattern.compile("[A-Z]+"); + Pattern yPattern = Pattern.compile("[0-9]+"); + Matcher startXMatch = xPattern.matcher(ranges[0]); + if (startXMatch.find()) { + rangePosition.setStartX(startXMatch.group()); + } + Matcher startYMatch = yPattern.matcher(ranges[0]); + if (startYMatch.find()) { + rangePosition.setStartY(Integer.parseInt(startYMatch.group())); + } + Matcher endXMatch = xPattern.matcher(ranges[1]); + if (endXMatch.find()) { + rangePosition.setEndX(endXMatch.group()); + } + Matcher endYMatch = yPattern.matcher(ranges[1]); + if (endYMatch.find()) { + rangePosition.setEndY(Integer.parseInt(endYMatch.group())); + } + return rangePosition; + } +} diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsConfig.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsConfig.java index c9cf01fa9d0..49f22037f77 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsConfig.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsConfig.java @@ -26,16 +26,19 @@ public class SheetsConfig { .stringType() .noDefaultValue() .withDescription("Google Sheets login service account key"); + public static final Option SHEET_ID = Options.key("sheet_id") .stringType() .noDefaultValue() .withDescription("Google Sheets sheet id"); + public static final Option SHEET_NAME = Options.key("sheet_name") .stringType() .noDefaultValue() - .withDescription("Google Sheets sheet name that you want to import"); + .withDescription("Google Sheets sheet name that you want to input/output"); + public static final Option RANGE = Options.key("range") .stringType() .noDefaultValue() - .withDescription("Google Sheets sheet range that you want to import"); + .withDescription("Google Sheets sheet range that you want to input/output"); } diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java index 612b8a53fad..6a0672c261b 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java @@ -17,15 +17,36 @@ package org.apache.seatunnel.connectors.seatunnel.google.sheets.config; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.exception.GoogleSheetsConnectorErrorCode; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.exception.GoogleSheetsConnectorException; + import org.apache.seatunnel.shade.com.typesafe.config.Config; +import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.HttpRequestInitializer; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.api.client.json.JsonFactory; +import com.google.api.client.json.gson.GsonFactory; +import com.google.api.services.sheets.v4.Sheets; +import com.google.api.services.sheets.v4.SheetsScopes; +import com.google.auth.http.HttpCredentialsAdapter; +import com.google.auth.oauth2.ServiceAccountCredentials; import lombok.Data; +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.Serializable; +import java.security.GeneralSecurityException; +import java.util.Base64; +import java.util.Collections; @Data public class SheetsParameters implements Serializable { + private static final String APPLICATION_NAME = "SeaTunnel Google Sheets"; + + private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); + private byte[] serviceAccountKey; private String sheetId; @@ -42,4 +63,23 @@ public SheetsParameters buildWithConfig(Config config) { return this; } + public Sheets buildSheets() throws IOException { + byte[] keyBytes = Base64.getDecoder().decode(this.serviceAccountKey); + ServiceAccountCredentials sourceCredentials = ServiceAccountCredentials + .fromStream(new ByteArrayInputStream(keyBytes)); + sourceCredentials = (ServiceAccountCredentials) sourceCredentials + .createScoped(Collections.singletonList(SheetsScopes.SPREADSHEETS)); + HttpRequestInitializer requestInitializer = new HttpCredentialsAdapter(sourceCredentials); + NetHttpTransport httpTransport = null; + try { + httpTransport = GoogleNetHttpTransport.newTrustedTransport(); + } catch (GeneralSecurityException e) { + throw new GoogleSheetsConnectorException(GoogleSheetsConnectorErrorCode.BUILD_SHEETS_REQUEST_EXCEPTION, + "Build google sheets http request exception", e); + } + return new Sheets.Builder(httpTransport, JSON_FACTORY, requestInitializer) + .setApplicationName(APPLICATION_NAME) + .build(); + + } } diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsConnectorErrorCode.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsConnectorErrorCode.java new file mode 100644 index 00000000000..57449f1abfa --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsConnectorErrorCode.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.exception; + +import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; + +public enum GoogleSheetsConnectorErrorCode implements SeaTunnelErrorCode { + BUILD_SHEETS_REQUEST_EXCEPTION("GOOGLESHEETS-01", "Build google sheets http request exception"); + + private final String code; + + private final String description; + + GoogleSheetsConnectorErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String getErrorMessage() { + return SeaTunnelErrorCode.super.getErrorMessage(); + } +} diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/GoogleSheetsSerializer.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/GoogleSheetsSerializer.java new file mode 100644 index 00000000000..73c0b7bf40b --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/GoogleSheetsSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class GoogleSheetsSerializer implements SeaTunnelRowSerializer { + + public GoogleSheetsSerializer() { + } + + @Override + public List> deserializeRow(List input) { + List> result = new ArrayList<>(); + for (SeaTunnelRow seaTunnelRow : input) { + List row = new ArrayList<>(Arrays.asList(seaTunnelRow.getFields())); + result.add(row); + } + return result; + } +} diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/SeaTunnelRowSerializer.java new file mode 100644 index 00000000000..702904c82da --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/SeaTunnelRowSerializer.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.serialize; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.util.List; + +public interface SeaTunnelRowSerializer { + + List> deserializeRow(List input); +} diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSink.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSink.java new file mode 100644 index 00000000000..704f7942331 --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSink.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.sink; + +import org.apache.seatunnel.api.common.PrepareFailException; +import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.config.CheckConfigUtil; +import org.apache.seatunnel.common.config.CheckResult; +import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.config.RangePosition; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.config.SheetsConfig; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.config.SheetsParameters; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.exception.GoogleSheetsConnectorException; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import com.google.auto.service.AutoService; + +import java.io.IOException; + +@AutoService(SeaTunnelSink.class) +public class SheetsSink extends AbstractSimpleSink { + + private SheetsParameters sheetsParameters; + private SeaTunnelRowType seaTunnelRowType; + private RangePosition rangePosition; + + @Override + public String getPluginName() { + return "GoogleSheets"; + } + + @Override + public void prepare(Config pluginConfig) throws PrepareFailException { + CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, SheetsConfig.SERVICE_ACCOUNT_KEY.key(), SheetsConfig.SHEET_ID.key(), SheetsConfig.SHEET_NAME.key(), SheetsConfig.RANGE.key()); + if (!checkResult.isSuccess()) { + throw new GoogleSheetsConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format("PluginName: %s, PluginType: %s, Message: %s", + getPluginName(), PluginType.SINK, checkResult.getMsg())); + } + this.sheetsParameters = new SheetsParameters().buildWithConfig(pluginConfig); + this.rangePosition = new RangePosition().buildWithRange(this.sheetsParameters.getRange()); + } + + @Override + public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { + this.seaTunnelRowType = seaTunnelRowType; + } + + @Override + public SeaTunnelDataType getConsumedType() { + return seaTunnelRowType; + } + + @Override + public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + return new SheetsSinkWriter(this.sheetsParameters, this.rangePosition); + } +} diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkFactory.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkFactory.java new file mode 100644 index 00000000000..25fecbdf7bc --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.config.SheetsConfig; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class SheetsSinkFactory implements TableSourceFactory { + @Override + public String factoryIdentifier() { + return "GoogleSheets"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(SheetsConfig.SERVICE_ACCOUNT_KEY) + .required(SheetsConfig.SHEET_ID) + .required(SheetsConfig.SHEET_NAME) + .required(SheetsConfig.RANGE) + .build(); + } +} diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java new file mode 100644 index 00000000000..c9e6dd60417 --- /dev/null +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.google.sheets.sink; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.config.RangePosition; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.config.SheetsParameters; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.serialize.GoogleSheetsSerializer; +import org.apache.seatunnel.connectors.seatunnel.google.sheets.serialize.SeaTunnelRowSerializer; + +import com.google.api.services.sheets.v4.Sheets; +import com.google.api.services.sheets.v4.model.BatchUpdateValuesRequest; +import com.google.api.services.sheets.v4.model.ValueRange; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class SheetsSinkWriter extends AbstractSinkWriter { + + private final SheetsParameters sheetsParameters; + private final SeaTunnelRowSerializer seaTunnelRowSerializer; + private final Sheets service; + private final List seaTunnelRowList; + private final RangePosition rangePosition; + private final Integer targetRowCount; + private final Integer batchSize = 100; + private Integer totalCount = 0; + + public SheetsSinkWriter(SheetsParameters sheetsParameters, RangePosition rangePosition) throws IOException { + this.sheetsParameters = sheetsParameters; + this.seaTunnelRowSerializer = new GoogleSheetsSerializer(); + this.service = sheetsParameters.buildSheets(); + this.rangePosition = rangePosition; + this.targetRowCount = rangePosition.getEndY() - rangePosition.getStartY() + 1; + this.seaTunnelRowList = new ArrayList<>(this.targetRowCount); + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + seaTunnelRowList.add(element); + totalCount++; + if (totalCount % batchSize == 0 || totalCount >= targetRowCount) { + flush(); + } + } + + public void flush() throws IOException { + List> values = seaTunnelRowSerializer.deserializeRow(seaTunnelRowList); + List data = new ArrayList<>(); + + String start = rangePosition.getStartX(); + String end = rangePosition.getEndX() + (rangePosition.getStartY() + totalCount - 1); + if (targetRowCount >= batchSize) { + // If it is the last batch + if (totalCount >= targetRowCount) { + start += rangePosition.getEndY() - (totalCount % batchSize) + 1; + } else { + start += rangePosition.getStartY() + totalCount - batchSize; + } + } else { + start += rangePosition.getStartY(); + } + + data.add(new ValueRange() + .setRange(start + ":" + end) + .setValues(values)); + BatchUpdateValuesRequest body = new BatchUpdateValuesRequest() + .setValueInputOption("RAW") + .setData(data); + service.spreadsheets().values().batchUpdate(sheetsParameters.getSheetId(), body).execute(); + seaTunnelRowList.clear(); + } + + @Override + public void close() throws IOException { + // not need close + } +} diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java index 47e96e95521..2938f85dd68 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java @@ -27,21 +27,10 @@ import org.apache.seatunnel.connectors.seatunnel.google.sheets.deserialize.GoogleSheetsDeserializer; import org.apache.seatunnel.connectors.seatunnel.google.sheets.deserialize.SeaTunnelRowDeserializer; -import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.client.http.javanet.NetHttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.gson.GsonFactory; import com.google.api.services.sheets.v4.Sheets; -import com.google.api.services.sheets.v4.SheetsScopes; import com.google.api.services.sheets.v4.model.ValueRange; -import com.google.auth.http.HttpCredentialsAdapter; -import com.google.auth.oauth2.ServiceAccountCredentials; -import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.Base64; -import java.util.Collections; import java.util.List; public class SheetsSourceReader extends AbstractSingleSplitReader { @@ -50,14 +39,9 @@ public class SheetsSourceReader extends AbstractSingleSplitReader private SeaTunnelRowType seaTunnelRowType; - private HttpRequestInitializer requestInitializer; - - private static final String APPLICATION_NAME = "SeaTunnel Google Sheets"; - - private static final JsonFactory JSON_FACTORY = GsonFactory.getDefaultInstance(); - private final SingleSplitReaderContext context; + private Sheets service; private final SeaTunnelRowDeserializer seaTunnelRowDeserializer; @@ -70,13 +54,7 @@ public SheetsSourceReader(SheetsParameters sheetsParameters, SingleSplitReaderCo @Override public void open() throws Exception { - byte[] keyBytes = Base64.getDecoder().decode(sheetsParameters.getServiceAccountKey()); - ServiceAccountCredentials sourceCredentials = ServiceAccountCredentials - .fromStream(new ByteArrayInputStream(keyBytes)); - sourceCredentials = (ServiceAccountCredentials) sourceCredentials - .createScoped(Collections.singletonList(SheetsScopes.SPREADSHEETS)); - requestInitializer = new HttpCredentialsAdapter(sourceCredentials); - + this.service = sheetsParameters.buildSheets(); } @Override @@ -86,10 +64,6 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { - final NetHttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport(); - Sheets service = new Sheets.Builder(httpTransport, JSON_FACTORY, requestInitializer) - .setApplicationName(APPLICATION_NAME) - .build(); ValueRange response = service.spreadsheets().values() .get(sheetsParameters.getSheetId(), sheetsParameters.getSheetName() + "!" + sheetsParameters.getRange()) .execute(); From e4e5cc876c69a98932d83d6537f838895608edea Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Mon, 2 Jan 2023 19:44:01 +0800 Subject: [PATCH 2/6] [Feature][Connector V2] fix doc --- docs/en/connector-v2/sink/GoogleSheets.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/GoogleSheets.md b/docs/en/connector-v2/sink/GoogleSheets.md index 2c25445ad5c..70d6127a616 100644 --- a/docs/en/connector-v2/sink/GoogleSheets.md +++ b/docs/en/connector-v2/sink/GoogleSheets.md @@ -52,4 +52,4 @@ simple: ### next version -- Add GoogleSheets Sink Connector \ No newline at end of file +- Add GoogleSheets Sink Connector [3848](https://github.com/apache/incubator-seatunnel/pull/3848) \ No newline at end of file From 1e98c1c3a78bb2e1f2c22e318d27bd33f779b955 Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Thu, 5 Jan 2023 17:44:10 +0800 Subject: [PATCH 3/6] [Feature][Connector-V2][GoogleSheets]Add e2e test --- docs/en/connector-v2/sink/GoogleSheets.md | 2 +- .../connector-googlesheets-e2e/pom.xml | 43 +++++++++++++++ .../googlesheets/GoogleSheetsIT.java | 53 +++++++++++++++++++ .../resources/fakesource_to_googlesheets.conf | 43 +++++++++++++++ .../seatunnel-connector-v2-e2e/pom.xml | 1 + 5 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/pom.xml create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/java/org/apache/seatunnel/e2e/connector/googlesheets/GoogleSheetsIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/resources/fakesource_to_googlesheets.conf diff --git a/docs/en/connector-v2/sink/GoogleSheets.md b/docs/en/connector-v2/sink/GoogleSheets.md index 70d6127a616..9caa452b72a 100644 --- a/docs/en/connector-v2/sink/GoogleSheets.md +++ b/docs/en/connector-v2/sink/GoogleSheets.md @@ -41,7 +41,7 @@ simple: ```hocon GoogleSheets { - service_account_key = "seatunnel-test" + service_account_key = "Your account key" sheet_id = "1VI0DvyZK-NIdssSdsDSsSSSC-_-rYMi7ppJiI_jhE" sheet_name = "sheets01" range = "A1:C3" diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/pom.xml new file mode 100644 index 00000000000..9a55f99d12e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/pom.xml @@ -0,0 +1,43 @@ + + + + + org.apache.seatunnel + seatunnel-connector-v2-e2e + ${revision} + + 4.0.0 + + connector-googlesheets-e2e + + + + + org.apache.seatunnel + connector-fake + ${project.version} + test + + + org.apache.seatunnel + connector-google-sheets + ${project.version} + test + + + \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/java/org/apache/seatunnel/e2e/connector/googlesheets/GoogleSheetsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/java/org/apache/seatunnel/e2e/connector/googlesheets/GoogleSheetsIT.java new file mode 100644 index 00000000000..4ba310534b5 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/java/org/apache/seatunnel/e2e/connector/googlesheets/GoogleSheetsIT.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.googlesheets; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +@Disabled("Disabled because it needs user's personal google account to run this test") +public class GoogleSheetsIT extends TestSuiteBase implements TestResource { + + @BeforeEach + @Override + public void startUp() throws Exception { + + } + + @AfterEach + @Override + public void tearDown() throws Exception { + + } + + @TestTemplate + public void testDatahub(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/fakesource_to_googlesheets.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/resources/fakesource_to_googlesheets.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/resources/fakesource_to_googlesheets.conf new file mode 100644 index 00000000000..6128732cda1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/resources/fakesource_to_googlesheets.conf @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + row.num = 3 + schema = { + fields { + name = "string" + address = "string" + age = "int" + } + } + } +} + +sink { + GoogleSheets { + service_account_key = "Your account key" + sheet_id = "1VI0DvyZK-NIdssSdsDSsSSSC-_-rYMi7ppJiI_jhE" + sheet_name = "sheets01" + range = "A1:C3" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 07f21f36d9d..95350984ad1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -43,6 +43,7 @@ connector-elasticsearch-e2e connector-iotdb-e2e connector-cdc-mysql-e2e + connector-googlesheets-e2e seatunnel-connector-v2-e2e From 9e5812b8da7eec1ab02cb4d4b744e99927f1ea03 Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Thu, 5 Jan 2023 21:15:47 +0800 Subject: [PATCH 4/6] [Feature][Connector-V2][GoogleSheets] change sink factory --- .../seatunnel/google/sheets/sink/SheetsSinkFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkFactory.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkFactory.java index 25fecbdf7bc..689ed1cc20b 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkFactory.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkFactory.java @@ -19,13 +19,13 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.connectors.seatunnel.google.sheets.config.SheetsConfig; import com.google.auto.service.AutoService; @AutoService(Factory.class) -public class SheetsSinkFactory implements TableSourceFactory { +public class SheetsSinkFactory implements TableSinkFactory { @Override public String factoryIdentifier() { return "GoogleSheets"; From 09f86040643b8aed1e38d1649b5c84b5b84acab6 Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Wed, 8 Feb 2023 23:25:25 +0800 Subject: [PATCH 5/6] fix doc --- docs/en/connector-v2/sink/GoogleSheets.md | 1 - .../seatunnel/google/sheets/sink/SheetsSinkWriter.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/en/connector-v2/sink/GoogleSheets.md b/docs/en/connector-v2/sink/GoogleSheets.md index 9caa452b72a..f1c72723513 100644 --- a/docs/en/connector-v2/sink/GoogleSheets.md +++ b/docs/en/connector-v2/sink/GoogleSheets.md @@ -8,7 +8,6 @@ Used to write data to GoogleSheets. ## Key features - [ ] [exactly-once](../../concept/connector-v2-features.md) -- [ ] [schema projection](../../concept/connector-v2-features.md) ## Options diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java index c9e6dd60417..4eceef84647 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java @@ -90,6 +90,6 @@ public void flush() throws IOException { @Override public void close() throws IOException { - // not need close + flush(); } } From 23f4efd8933ce05803058f91f168eb3a8df39ceb Mon Sep 17 00:00:00 2001 From: TaoZex <45089228+TaoZex@users.noreply.github.com> Date: Tue, 14 Feb 2023 14:35:53 +0800 Subject: [PATCH 6/6] fix --- .../Error-Quick-Reference-Manual.md | 3 +- docs/en/connector-v2/sink/GoogleSheets.md | 30 ++++++++++--------- .../sheets/config/SheetsParameters.java | 21 +++++++------ .../GoogleSheetsConnectorErrorCode.java | 2 +- .../serialize/GoogleSheetsSerializer.java | 3 +- .../google/sheets/sink/SheetsSink.java | 21 +++++++++---- .../google/sheets/sink/SheetsSinkWriter.java | 12 ++++---- .../sheets/source/SheetsSourceReader.java | 10 +++++-- .../connector-googlesheets-e2e/pom.xml | 7 ++--- .../googlesheets/GoogleSheetsIT.java | 8 ++--- 10 files changed, 64 insertions(+), 53 deletions(-) diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md index b2a8464632e..463df2735a5 100644 --- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md +++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md @@ -224,7 +224,7 @@ problems encountered by users. ## GoogleSheets Connector Error Codes -| code | description | solution | +| code | description | solution | |-----------------|--------------------------------------------|----------------------------------------------------------------------------------------------------------------------| | GOOGLESHEETS-01 | Build google sheets http request exception | When users encounter this error code, it means that send http request to build google sheets failed, please check it | @@ -240,3 +240,4 @@ problems encountered by users. | code | description | solution | |----------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------| | EMAIL-01 | Send email failed | When users encounter this error code, it means that send email to target server failed, please adjust the network environment according to the abnormal information | + diff --git a/docs/en/connector-v2/sink/GoogleSheets.md b/docs/en/connector-v2/sink/GoogleSheets.md index f1c72723513..26a10286b3d 100644 --- a/docs/en/connector-v2/sink/GoogleSheets.md +++ b/docs/en/connector-v2/sink/GoogleSheets.md @@ -1,7 +1,8 @@ # GoogleSheets > GoogleSheets sink connector -## Description +> + ## Description Used to write data to GoogleSheets. @@ -11,12 +12,12 @@ Used to write data to GoogleSheets. ## Options -| name | type | required | default value | -|------------------- |--------------|----------|---------------| -| service_account_key | string | yes | - | -| sheet_id | string | yes | - | -| sheet_name | string | yes | - | -| range | string | yes | - | +| name | type | required | default value | +|---------------------|--------|----------|---------------| +| service_account_key | string | yes | - | +| sheet_id | string | yes | - | +| sheet_name | string | yes | - | +| range | string | yes | - | ### service_account_key [string] @@ -39,16 +40,17 @@ the range of the sheet you want to output simple: ```hocon - GoogleSheets { - service_account_key = "Your account key" - sheet_id = "1VI0DvyZK-NIdssSdsDSsSSSC-_-rYMi7ppJiI_jhE" - sheet_name = "sheets01" - range = "A1:C3" - } +GoogleSheets { + service_account_key = "Your account key" + sheet_id = "1VI0DvyZK-NIdssSdsDSsSSSC-_-rYMi7ppJiI_jhE" + sheet_name = "sheets01" + range = "A1:C3" +} ``` ## Changelog ### next version -- Add GoogleSheets Sink Connector [3848](https://github.com/apache/incubator-seatunnel/pull/3848) \ No newline at end of file +- Add GoogleSheets Sink Connector [3848](https://github.com/apache/incubator-seatunnel/pull/3848) + diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java index 2ab54be0dc1..3f70f6416e2 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java @@ -17,11 +17,11 @@ package org.apache.seatunnel.connectors.seatunnel.google.sheets.config; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + import org.apache.seatunnel.connectors.seatunnel.google.sheets.exception.GoogleSheetsConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.google.sheets.exception.GoogleSheetsConnectorException; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.javanet.NetHttpTransport; @@ -66,21 +66,24 @@ public SheetsParameters buildWithConfig(Config config) { public Sheets buildSheets() throws IOException { byte[] keyBytes = Base64.getDecoder().decode(this.serviceAccountKey); - ServiceAccountCredentials sourceCredentials = ServiceAccountCredentials - .fromStream(new ByteArrayInputStream(keyBytes)); - sourceCredentials = (ServiceAccountCredentials) sourceCredentials - .createScoped(Collections.singletonList(SheetsScopes.SPREADSHEETS)); + ServiceAccountCredentials sourceCredentials = + ServiceAccountCredentials.fromStream(new ByteArrayInputStream(keyBytes)); + sourceCredentials = + (ServiceAccountCredentials) + sourceCredentials.createScoped( + Collections.singletonList(SheetsScopes.SPREADSHEETS)); HttpRequestInitializer requestInitializer = new HttpCredentialsAdapter(sourceCredentials); NetHttpTransport httpTransport = null; try { httpTransport = GoogleNetHttpTransport.newTrustedTransport(); } catch (GeneralSecurityException e) { - throw new GoogleSheetsConnectorException(GoogleSheetsConnectorErrorCode.BUILD_SHEETS_REQUEST_EXCEPTION, - "Build google sheets http request exception", e); + throw new GoogleSheetsConnectorException( + GoogleSheetsConnectorErrorCode.BUILD_SHEETS_REQUEST_EXCEPTION, + "Build google sheets http request exception", + e); } return new Sheets.Builder(httpTransport, JSON_FACTORY, requestInitializer) .setApplicationName(APPLICATION_NAME) .build(); - } } diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsConnectorErrorCode.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsConnectorErrorCode.java index 57449f1abfa..1d4a489015e 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/exception/GoogleSheetsConnectorErrorCode.java @@ -20,7 +20,7 @@ import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; public enum GoogleSheetsConnectorErrorCode implements SeaTunnelErrorCode { - BUILD_SHEETS_REQUEST_EXCEPTION("GOOGLESHEETS-01", "Build google sheets http request exception"); + BUILD_SHEETS_REQUEST_EXCEPTION("GOOGLESHEETS-01", "Build google sheets http request exception"); private final String code; diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/GoogleSheetsSerializer.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/GoogleSheetsSerializer.java index 73c0b7bf40b..9775bd2a992 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/GoogleSheetsSerializer.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/serialize/GoogleSheetsSerializer.java @@ -25,8 +25,7 @@ public class GoogleSheetsSerializer implements SeaTunnelRowSerializer { - public GoogleSheetsSerializer() { - } + public GoogleSheetsSerializer() {} @Override public List> deserializeRow(List input) { diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSink.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSink.java index 704f7942331..418b7646674 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSink.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSink.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.google.sheets.sink; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + import org.apache.seatunnel.api.common.PrepareFailException; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.sink.SeaTunnelSink; @@ -34,8 +36,6 @@ import org.apache.seatunnel.connectors.seatunnel.google.sheets.config.SheetsParameters; import org.apache.seatunnel.connectors.seatunnel.google.sheets.exception.GoogleSheetsConnectorException; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - import com.google.auto.service.AutoService; import java.io.IOException; @@ -54,10 +54,18 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) throws PrepareFailException { - CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, SheetsConfig.SERVICE_ACCOUNT_KEY.key(), SheetsConfig.SHEET_ID.key(), SheetsConfig.SHEET_NAME.key(), SheetsConfig.RANGE.key()); + CheckResult checkResult = + CheckConfigUtil.checkAllExists( + pluginConfig, + SheetsConfig.SERVICE_ACCOUNT_KEY.key(), + SheetsConfig.SHEET_ID.key(), + SheetsConfig.SHEET_NAME.key(), + SheetsConfig.RANGE.key()); if (!checkResult.isSuccess()) { - throw new GoogleSheetsConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format("PluginName: %s, PluginType: %s, Message: %s", + throw new GoogleSheetsConnectorException( + SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, + String.format( + "PluginName: %s, PluginType: %s, Message: %s", getPluginName(), PluginType.SINK, checkResult.getMsg())); } this.sheetsParameters = new SheetsParameters().buildWithConfig(pluginConfig); @@ -75,7 +83,8 @@ public SeaTunnelDataType getConsumedType() { } @Override - public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { + public AbstractSinkWriter createWriter(SinkWriter.Context context) + throws IOException { return new SheetsSinkWriter(this.sheetsParameters, this.rangePosition); } } diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java index 4eceef84647..940899a8bc5 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/sink/SheetsSinkWriter.java @@ -43,7 +43,8 @@ public class SheetsSinkWriter extends AbstractSinkWriter { private final Integer batchSize = 100; private Integer totalCount = 0; - public SheetsSinkWriter(SheetsParameters sheetsParameters, RangePosition rangePosition) throws IOException { + public SheetsSinkWriter(SheetsParameters sheetsParameters, RangePosition rangePosition) + throws IOException { this.sheetsParameters = sheetsParameters; this.seaTunnelRowSerializer = new GoogleSheetsSerializer(); this.service = sheetsParameters.buildSheets(); @@ -78,12 +79,9 @@ public void flush() throws IOException { start += rangePosition.getStartY(); } - data.add(new ValueRange() - .setRange(start + ":" + end) - .setValues(values)); - BatchUpdateValuesRequest body = new BatchUpdateValuesRequest() - .setValueInputOption("RAW") - .setData(data); + data.add(new ValueRange().setRange(start + ":" + end).setValues(values)); + BatchUpdateValuesRequest body = + new BatchUpdateValuesRequest().setValueInputOption("RAW").setData(data); service.spreadsheets().values().batchUpdate(sheetsParameters.getSheetId(), body).execute(); seaTunnelRowList.clear(); } diff --git a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java index 820a407e524..6602a781c31 100644 --- a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java +++ b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java @@ -71,9 +71,13 @@ public void close() throws IOException { @Override public void pollNext(Collector output) throws Exception { - ValueRange response = service.spreadsheets().values() - .get(sheetsParameters.getSheetId(), sheetsParameters.getSheetName() + "!" + sheetsParameters.getRange()) - .execute(); + ValueRange response = + service.spreadsheets() + .values() + .get( + sheetsParameters.getSheetId(), + sheetsParameters.getSheetName() + "!" + sheetsParameters.getRange()) + .execute(); List> values = response.getValues(); if (values != null) { for (List row : values) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/pom.xml index 9a55f99d12e..1d5181f5029 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/pom.xml @@ -13,15 +13,14 @@ See the License for the specific language governing permissions and limitations under the License. --> - + 4.0.0 org.apache.seatunnel seatunnel-connector-v2-e2e ${revision} - 4.0.0 connector-googlesheets-e2e @@ -40,4 +39,4 @@ test - \ No newline at end of file + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/java/org/apache/seatunnel/e2e/connector/googlesheets/GoogleSheetsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/java/org/apache/seatunnel/e2e/connector/googlesheets/GoogleSheetsIT.java index 4ba310534b5..0a413403f1b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/java/org/apache/seatunnel/e2e/connector/googlesheets/GoogleSheetsIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-googlesheets-e2e/src/test/java/org/apache/seatunnel/e2e/connector/googlesheets/GoogleSheetsIT.java @@ -35,15 +35,11 @@ public class GoogleSheetsIT extends TestSuiteBase implements TestResource { @BeforeEach @Override - public void startUp() throws Exception { - - } + public void startUp() throws Exception {} @AfterEach @Override - public void tearDown() throws Exception { - - } + public void tearDown() throws Exception {} @TestTemplate public void testDatahub(TestContainer container) throws IOException, InterruptedException {