Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][Slack] Add Slack sink connector #3226

Merged
merged 40 commits into from
Nov 21, 2022

Conversation

lianghuan-xatu
Copy link
Contributor

Purpose of this pull request

Slack sink connector #3018

Check list


## Changelog

### 2.3.0-beta 2022-10-20
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

### 2.3.0-beta 2022-10-20 need replace with new version, new version will be update to the real version number when we release a version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have done it!

xz-1.5.jar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why update this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update has been reverted.Thx

@ic4y
Copy link
Contributor

ic4y commented Nov 4, 2022

If no e2e test is added, you need to comment your local test process and results in the form of pictures, etc. Convenient for later use.

@@ -1,108 +0,0 @@
aircompressor-0.10.jar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert

Why delete this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this file carelessly, which has been reverted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good

Comment on lines 50 to 54
for (Object field : fields) {
stringBuffer.append(field.toString() + ",");
}
stringBuffer.deleteCharAt(fields.length - 1);
stringBuffer.append("\n");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion

        StringJoiner stringJoiner = new StringJoiner(",", "", "\n");
        for (Object field : fields) {
            stringJoiner.add(field.toString());
        }
        String message = stringJoiner.toString();

If you reuse StringBuffer objects, you need to clean up the previous content

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice. it's a great idea!

stringBuffer.deleteCharAt(fields.length - 1);
stringBuffer.append("\n");
try {
String conversationId = slackClient.findConversation();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does findConversation need to be executed before each message is sent?

Copy link
Contributor Author

@lianghuan-xatu lianghuan-xatu Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed it! PTAL

# Conflicts:
#	plugin-mapping.properties
#	seatunnel-connectors-v2/pom.xml
#	seatunnel-dist/pom.xml
@lianghuan-xatu
Copy link
Contributor Author

If no e2e test is added, you need to comment your local test process and results in the form of pictures, etc. Convenient for later use.

@lianghuan-xatu
Copy link
Contributor Author

If no e2e test is added, you need to comment your local test process and results in the form of pictures, etc. Convenient for later use.

Spark Local Test

);
publishMessageSuccess = chatPostMessageResponse.isOk();
} catch (IOException | SlackApiException e) {
log.error("error: {}", e.getMessage(), e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"error: {}" only can add one argu, e.getMessage(), e have two.

You can update reference this:

log.error("error: {}", ExceptionUtils.getMessage(e));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed it! Thx

private String slackChannel;

public SlackConfig(@NonNull Config pluginConfig) {
if (pluginConfig.hasPath(WEBHOOKS_URL)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of webhooks_url and oauth_token and slack_channel is required. So you use CheckConfigUtil.checkAllExists to check them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice, I have done it!

// One message can be sent as soon as one second
Thread.sleep(POST_MSG_WAITING_TIME);
} catch (Exception e) {
log.warn("Write to Slack Fail.", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.warn("Write to Slack Fail: {}",  ExceptionUtils.getMessage(e));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have fixed it! Thx

…nto connector-slack-branch

                          # Conflicts:
                          #	plugin-mapping.properties
                          #	seatunnel-dist/pom.xml
…nto connector-slack-branch

# Conflicts:
#	plugin-mapping.properties
Copy link
Member

@TyrantLucifer TyrantLucifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, please implement Factory for connectors, you can refer to #3337

Comment on lines 36 to 37
private SlackConfig slackConfig;
private MethodsClient methodsClient;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private SlackConfig slackConfig;
private MethodsClient methodsClient;
private final SlackConfig slackConfig;
private final MethodsClient methodsClient;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! PTAL

}
} catch (IOException | SlackApiException e) {
log.warn("Find Slack Conversion Fail.", e);
throw new RuntimeException("Find Slack Conversion Fail.", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should create a SlackConnectorException that extended SeaTunnelRuntimeException instead of it. You can refer to https://github.com/apache/incubator-seatunnel/tree/dev/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/exception

* Close Conversion
*/
public void closeMethodClient() {
methodsClient = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a close() method in MethodsClient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no close() method in Slack API.

Comment on lines 31 to 33
private String webHooksUrl;
private String oauthToken;
private String slackChannel;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private String webHooksUrl;
private String oauthToken;
private String slackChannel;
private final String webHooksUrl;
private final String oauthToken;
private final String slackChannel;

public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, WEBHOOKS_URL, OAUTH_TOKEN, SLACK_CHANNEL);
if (!checkResult.isSuccess()) {
throw new PrepareFailException("Slack", PluginType.SINK, checkResult.getMsg());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@Slf4j
public class SlackWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private SlackConfig slackConfig;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private SlackConfig slackConfig;
private final SlackConfig slackConfig;

Thread.sleep(POST_MSG_WAITING_TIME);
} catch (Exception e) {
log.warn("Write to Slack Fail.", ExceptionUtils.getMessage(e));
throw new RuntimeException("Write to Slack Fail.", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice. I have fixed it.

@lianghuan-xatu lianghuan-xatu requested review from TyrantLucifer and removed request for ic4y November 19, 2022 02:47
Copy link
Member

@TyrantLucifer TyrantLucifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


@Override
public String getCode() {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return code


@Override
public String getDescription() {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return description

// One message can be sent as soon as one second
Thread.sleep(POST_MSG_WAITING_TIME);
} catch (Exception e) {
log.warn("Write to Slack Fail.", ExceptionUtils.getMessage(e));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.warn("Write to Slack Fail.", ExceptionUtils.getMessage(e));
log.error("Write to Slack Fail.", ExceptionUtils.getMessage(e));

public class SlackSink extends AbstractSimpleSink<SeaTunnelRow, Void> {

private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private SeaTunnelRowType seaTunnelRowType;
private final SeaTunnelRowType seaTunnelRowType;

TyrantLucifer
TyrantLucifer previously approved these changes Nov 19, 2022
Copy link
Member

@TyrantLucifer TyrantLucifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Please wait CI complete. @TyrantLucifer

Copy link
Member

@TyrantLucifer TyrantLucifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, looking forward your next contribution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants