Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize : Eliminate RpcMessage and Encoder/Decoder dependencies #6209

Merged
merged 29 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f1bacdd
Unwinding RpcMessage and Encoder/Decoder dependencies
Bughue Dec 25, 2023
ee3c3aa
license
Bughue Dec 26, 2023
94cb0cf
v0
Bughue Dec 28, 2023
289ed97
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Dec 29, 2023
8c197af
v0
Bughue Dec 29, 2023
7d1a6b3
test
Bughue Jan 2, 2024
33e6e4e
test
Bughue Jan 2, 2024
379ba28
test
Bughue Jan 2, 2024
a9b4d5d
test
Bughue Jan 3, 2024
95573d3
fix test
Bughue Jan 4, 2024
8da65ff
Merge branch '2.x' into dev-mlv-rpc-msg
xingfudeshi Jan 18, 2024
91c171d
style
Bughue Jan 22, 2024
72d47a1
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Jan 22, 2024
c1c7f3f
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Feb 4, 2024
cf4fe41
resolve conflict
Bughue Feb 4, 2024
9607bcd
resolve conflict
Bughue Feb 4, 2024
5b433cd
import
Bughue Feb 7, 2024
91418a1
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Mar 4, 2024
00a4e8e
style
Bughue Mar 4, 2024
c728968
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Mar 4, 2024
df9ea50
optimize version
Bughue Mar 7, 2024
3841379
optimize version
Bughue Mar 8, 2024
d876d10
optimize version
Bughue Mar 8, 2024
3f0abf0
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Mar 8, 2024
c0dfb51
optimize version
Bughue Mar 8, 2024
8e70690
optimize version
Bughue Mar 8, 2024
fa23e23
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Apr 30, 2024
fc1b602
Merge branch '2.x' of https://github.com/seata/seata into dev-mlv-rpc…
Bughue Jun 24, 2024
ade6ff7
style
Bughue Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Add changes here for all PR submitted to the 2.x branch.
### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version
- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate RpcMessage and Encoder/Decoder dependencies

### refactor:
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: send async response
Expand Down
2 changes: 1 addition & 1 deletion changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
### optimize:
- [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池
- [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化

- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage 和 Encoder/Decoder 的互相依赖

### refactor:
- [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,13 @@ class ClientHandler extends ChannelDuplexHandler {

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
RpcMessage rpcMessage = null;
if (msg instanceof ProtocolRpcMessage) {
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
processMessage(ctx, rpcMessage);
} else {
LOGGER.error("rpcMessage type error");
}
processMessage(ctx, (RpcMessage) msg);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,13 @@ class ServerHandler extends ChannelDuplexHandler {
*/
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
RpcMessage rpcMessage = null;
if (msg instanceof ProtocolRpcMessage) {
rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
processMessage(ctx, rpcMessage);
} else {
LOGGER.error("rpcMessage type error");
}
processMessage(ctx, (RpcMessage) msg);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty;

import org.apache.seata.core.protocol.RpcMessage;

/**
* The protocol RPC message.
*/
public interface ProtocolRpcMessage {

/**
* The protocol message to rpc message.
* @return
*/
RpcMessage protocolMsg2RpcMsg();

/**
* The rpc message to protocol message.
* @param rpcMessage
*/
void rpcMsg2ProtocolMsg(RpcMessage rpcMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty.v0;

import org.apache.seata.core.compressor.CompressorType;
import org.apache.seata.core.protocol.ProtocolConstants;
import org.apache.seata.core.protocol.RpcMessage;
import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
import org.apache.seata.core.serializer.SerializerType;

import java.util.concurrent.atomic.AtomicLong;

/**
* the protocol v0 rpc message
**/
public class ProtocolRpcMessageV0 implements ProtocolRpcMessage {

private static AtomicLong NEXT_ID = new AtomicLong(0);

/**
* Gets next message id.
*
* @return the next message id
*/
public static long getNextMessageId() {
return NEXT_ID.incrementAndGet();
}

private long id;
private boolean isAsync;
private boolean isRequest;
private boolean isHeartbeat;
private Object body;
private byte messageType;
private boolean isSeataCodec;

/**
* Gets id.
*
* @return the id
*/
public long getId() {
return id;
}

/**
* Sets id.
*
* @param id the id
*/
public void setId(long id) {
this.id = id;
}

/**
* Is async boolean.
*
* @return the boolean
*/
public boolean isAsync() {
return isAsync;
}

/**
* Sets async.
*
* @param async the async
*/
public void setAsync(boolean async) {
isAsync = async;
}

/**
* Is request boolean.
*
* @return the boolean
*/
public boolean isRequest() {
return isRequest;
}

/**
* Sets request.
*
* @param request the request
*/
public void setRequest(boolean request) {
isRequest = request;
}

/**
* Is heartbeat boolean.
*
* @return the boolean
*/
public boolean isHeartbeat() {
return isHeartbeat;
}

/**
* Sets heartbeat.
*
* @param heartbeat the heartbeat
*/
public void setHeartbeat(boolean heartbeat) {
isHeartbeat = heartbeat;
}

/**
* Gets body.
*
* @return the body
*/
public Object getBody() {
return body;
}

/**
* Sets body.
*
* @param body the body
*/
public void setBody(Object body) {
this.body = body;
}

public boolean isSeataCodec() {
return isSeataCodec;
}

public void setSeataCodec(boolean seataCodec) {
isSeataCodec = seataCodec;
}

public byte getMessageType() {
return messageType;
}

public void setMessageType(byte messageType) {
this.messageType = messageType;
}

@Override
public RpcMessage protocolMsg2RpcMsg() {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setMessageType(this.messageType);
rpcMessage.setCompressor(CompressorType.NONE.getCode());

byte codecType = this.isSeataCodec ? SerializerType.SEATA.getCode() : SerializerType.HESSIAN.getCode();
rpcMessage.setCodec(codecType);

if (this.isHeartbeat) {
if (this.isRequest) {
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST);
} else {
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE);
}
} else {
if (this.isRequest) {
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
} else {
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESPONSE);
}
}
rpcMessage.setBody(this.body);
rpcMessage.setId((int) this.id);
return rpcMessage;
}

@Override
public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) {
this.body = rpcMessage.getBody();
this.id = rpcMessage.getId();
this.isRequest = isRequest(rpcMessage.getMessageType());
this.isHeartbeat = isHeartbeat(rpcMessage.getMessageType());
this.isSeataCodec = rpcMessage.getCodec() == SerializerType.SEATA.getCode();
this.messageType = rpcMessage.getMessageType();
}

private boolean isHeartbeat(byte msgType) {
return msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
|| msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE;
}

private boolean isRequest(byte msgType) {
return msgType == ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY
|| msgType == ProtocolConstants.MSGTYPE_RESQUEST_SYNC;
}
}
Loading
Loading