Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: fix the issue of Raft registry exception without waiting for retries #6049

Merged
merged 6 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The version is updated as follows:
- [[#6018](https://github.com/seata/seata/pull/6018)] fix incorrect metric report
- [[#6024](https://github.com/seata/seata/pull/6024)] fix the white screen after click the "View Global Lock" button on the transaction info page in the console
- [[#6015](https://github.com/seata/seata/pull/6015)] fix can't integrate dubbo with spring
- [[#6049](https://github.com/seata/seata/pull/6049)] fix registry type for raft under the network interruption did not carry out the sleep 1s
- [[#6050](https://github.com/seata/seata/pull/6050)] change RaftServer#destroy to wait all shutdown procedures done

### optimize:
Expand Down
1 change: 1 addition & 0 deletions changes/zh-cn/2.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#6018](https://github.com/seata/seata/pull/6018)] 修复错误的 metric 上报
- [[#6024](https://github.com/seata/seata/pull/6024)] 修复控制台点击事务信息页面中的"查看全局锁"按钮之后白屏的问题
- [[#6015](https://github.com/seata/seata/pull/6015)] 修复在spring环境下无法集成dubbo
- [[#6049](https://github.com/seata/seata/pull/6049)] 修复客户端在raft注册中心类型下,网络中断时,watch线程未暂停一秒等待重试的问题
- [[#6050](https://github.com/seata/seata/pull/6050)] 修改 RaftServer#destroy 为等待所有关闭流程结束


Expand Down
5 changes: 5 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,10 @@
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
127 changes: 127 additions & 0 deletions common/src/main/java/io/seata/common/util/HttpClientUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.common.util;


import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author funkye
*/
public class HttpClientUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpClientUtil.class);

private static final Map<Integer/*timeout*/, CloseableHttpClient> HTTP_CLIENT_MAP = new ConcurrentHashMap<>();

private static final PoolingHttpClientConnectionManager POOLING_HTTP_CLIENT_CONNECTION_MANAGER =
new PoolingHttpClientConnectionManager();

static {
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(10);
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setDefaultMaxPerRoute(10);
Runtime.getRuntime().addShutdownHook(new Thread(() -> HTTP_CLIENT_MAP.values().parallelStream().forEach(client -> {
try {
client.close();
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
})));
}

// post request
public static CloseableHttpResponse doPost(String url, Map<String, String> params, Map<String, String> header,
int timeout) throws IOException {
try {
URIBuilder builder = new URIBuilder(url);
URI uri = builder.build();
HttpPost httpPost = new HttpPost(uri);
if (header != null) {
header.forEach(httpPost::addHeader);
}
List<NameValuePair> nameValuePairs = new ArrayList<>();
params.forEach((k, v) -> {
nameValuePairs.add(new BasicNameValuePair(k, v));
});
String requestBody = URLEncodedUtils.format(nameValuePairs, StandardCharsets.UTF_8);

StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_FORM_URLENCODED);
httpPost.setEntity(stringEntity);
httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
CloseableHttpClient client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout).setConnectTimeout(timeout).build())
.build());
return client.execute(httpPost);
} catch (URISyntaxException | ClientProtocolException e) {
LOGGER.error(e.getMessage(), e);
}
return null;
}

// get request
public static CloseableHttpResponse doGet(String url, Map<String, String> param, Map<String, String> header,
int timeout) throws IOException {
try {
URIBuilder builder = new URIBuilder(url);
if (param != null) {
for (String key : param.keySet()) {
builder.addParameter(key, param.get(key));
}
}
URI uri = builder.build();
HttpGet httpGet = new HttpGet(uri);
if (header != null) {
header.forEach(httpGet::addHeader);
}
CloseableHttpClient client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout).setConnectTimeout(timeout).build())
.build());
return client.execute(httpGet);
} catch (URISyntaxException | ClientProtocolException e) {
LOGGER.error(e.getMessage(), e);
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -41,26 +40,15 @@
import io.seata.common.metadata.Node;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.CollectionUtils;
import io.seata.common.util.HttpClientUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigChangeListener;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
import io.seata.discovery.registry.RegistryService;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -90,11 +78,6 @@ public class RaftRegistryServiceImpl implements RegistryService<ConfigChangeList

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final PoolingHttpClientConnectionManager POOLING_HTTP_CLIENT_CONNECTION_MANAGER =
new PoolingHttpClientConnectionManager();

private static final Map<Integer/*timeout*/, CloseableHttpClient> HTTP_CLIENT_MAP = new ConcurrentHashMap<>();

private static volatile String CURRENT_TRANSACTION_SERVICE_GROUP;

private static volatile String CURRENT_TRANSACTION_CLUSTER_NAME;
Expand All @@ -108,11 +91,6 @@ public class RaftRegistryServiceImpl implements RegistryService<ConfigChangeList
*/
private static final Map<String, List<InetSocketAddress>> ALIVE_NODES = new ConcurrentHashMap<>();

static {
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setMaxTotal(10);
POOLING_HTTP_CLIENT_CONNECTION_MANAGER.setDefaultMaxPerRoute(10);
}

private RaftRegistryServiceImpl() {}

/**
Expand Down Expand Up @@ -191,13 +169,6 @@ protected static void startQueryMetadata() {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
CLOSED.compareAndSet(false, true);
REFRESH_METADATA_EXECUTOR.shutdown();
HTTP_CLIENT_MAP.values().parallelStream().forEach(client -> {
try {
client.close();
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
});
}));
}
}
Expand Down Expand Up @@ -274,13 +245,13 @@ private static boolean watch() {
for (String group : groupTerms.keySet()) {
String tcAddress = queryHttpAddress(clusterName, group);
try (CloseableHttpResponse response =
doPost("http://" + tcAddress + "/metadata/v1/watch", param, null, 30000)) {
HttpClientUtil.doPost("http://" + tcAddress + "/metadata/v1/watch", param, null, 30000)) {
if (response != null) {
StatusLine statusLine = response.getStatusLine();
return statusLine != null && statusLine.getStatusCode() == HttpStatus.SC_OK;
}
} catch (Exception e) {
LOGGER.error("watch cluster fail: {}", e.getMessage());
} catch (IOException e) {
LOGGER.error("watch cluster node: {}, fail: {}", tcAddress, e.getMessage());
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
Expand Down Expand Up @@ -320,7 +291,7 @@ private static void acquireClusterMetaData(String clusterName, String group) {
param.put("group", group);
String response = null;
try (CloseableHttpResponse httpResponse =
doGet("http://" + tcAddress + "/metadata/v1/cluster", param, null, 1000)) {
HttpClientUtil.doGet("http://" + tcAddress + "/metadata/v1/cluster", param, null, 1000)) {
if (httpResponse != null && httpResponse.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
response = EntityUtils.toString(httpResponse.getEntity(), StandardCharsets.UTF_8);
}
Expand All @@ -339,64 +310,6 @@ private static void acquireClusterMetaData(String clusterName, String group) {
}
}

public static CloseableHttpResponse doGet(String url, Map<String, String> param, Map<String, String> header,
int timeout) {
CloseableHttpClient client;
try {
URIBuilder builder = new URIBuilder(url);
if (param != null) {
for (String key : param.keySet()) {
builder.addParameter(key, param.get(key));
}
}
URI uri = builder.build();
HttpGet httpGet = new HttpGet(uri);
if (header != null) {
header.forEach(httpGet::addHeader);
}
client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout).setConnectTimeout(timeout).build())
.build());
return client.execute(httpGet);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
return null;
}

public static CloseableHttpResponse doPost(String url, Map<String, String> params, Map<String, String> header,
int timeout) {
CloseableHttpClient client = null;
try {
URIBuilder builder = new URIBuilder(url);
URI uri = builder.build();
HttpPost httpPost = new HttpPost(uri);
if (header != null) {
header.forEach(httpPost::addHeader);
}
List<NameValuePair> nameValuePairs = new ArrayList<>();
params.forEach((k, v) -> {
nameValuePairs.add(new BasicNameValuePair(k, v));
});
String requestBody = URLEncodedUtils.format(nameValuePairs, StandardCharsets.UTF_8);

StringEntity stringEntity = new StringEntity(requestBody, ContentType.APPLICATION_FORM_URLENCODED);
httpPost.setEntity(stringEntity);
httpPost.setHeader("Content-Type", "application/x-www-form-urlencoded");
client = HTTP_CLIENT_MAP.computeIfAbsent(timeout,
k -> HttpClients.custom().setConnectionManager(POOLING_HTTP_CLIENT_CONNECTION_MANAGER)
.setDefaultRequestConfig(RequestConfig.custom().setConnectionRequestTimeout(timeout)
.setSocketTimeout(timeout).setConnectTimeout(timeout).build())
.build());
return client.execute(httpPost);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
return null;
}

@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
String clusterName = getServiceGroup(key);
Expand Down
Loading