From 6b1b5b4de9c7684bbd409ad63a07385d7f83e89f Mon Sep 17 00:00:00 2001 From: karsonto Date: Tue, 23 Apr 2024 15:48:24 +0800 Subject: [PATCH 1/7] init raft function --- .../eventmesh-meta-raft/build.gradle | 60 +++++ .../eventmesh-meta-raft/gradle.properties | 19 ++ .../eventmesh/meta/raft/EventClosure.java | 73 ++++++ .../eventmesh/meta/raft/EventOperation.java | 63 +++++ .../eventmesh/meta/raft/JraftServer.java | 95 ++++++++ .../eventmesh/meta/raft/MetaService.java | 29 +++ .../eventmesh/meta/raft/MetaServiceImpl.java | 79 +++++++ .../eventmesh/meta/raft/MetaStateMachine.java | 127 ++++++++++ .../eventmesh/meta/raft/RaftMetaService.java | 221 ++++++++++++++++++ .../config/RaftMetaStorageConfiguration.java | 50 ++++ .../meta/raft/consts/MetaRaftConstants.java | 45 ++++ .../meta/raft/rpc/MetaServerHelper.java | 97 ++++++++ .../meta/raft/rpc/RequestProcessor.java | 52 +++++ .../src/main/proto/request.proto | 32 +++ .../org.apache.eventmesh.api.meta.MetaService | 16 ++ settings.gradle | 4 + 16 files changed, 1062 insertions(+) create mode 100644 eventmesh-meta/eventmesh-meta-raft/build.gradle create mode 100644 eventmesh-meta/eventmesh-meta-raft/gradle.properties create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaService.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/config/RaftMetaStorageConfiguration.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/MetaServerHelper.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/proto/request.proto create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/resources/eventmesh/org.apache.eventmesh.api.meta.MetaService diff --git a/eventmesh-meta/eventmesh-meta-raft/build.gradle b/eventmesh-meta/eventmesh-meta-raft/build.gradle new file mode 100644 index 0000000000..99af858e10 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/build.gradle @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id 'java' + id 'com.google.protobuf' version '0.8.17' +} + +repositories { + mavenCentral() +} + +def grpcVersion = '1.43.2' // CURRENT_GRPC_VERSION +def protobufVersion = '3.21.5' +def protocVersion = protobufVersion + +dependencies { + implementation ("io.grpc:grpc-protobuf:${grpcVersion}") { + exclude group: "com.google.protobuf", module: "protobuf-java" + } + implementation("com.google.protobuf:protobuf-java:${protobufVersion}") + implementation "io.grpc:grpc-stub:${grpcVersion}" + implementation "com.google.protobuf:protobuf-java-util:${protobufVersion}" + implementation "javax.annotation:javax.annotation-api:1.3.2" + compileOnly 'org.projectlombok:lombok' + annotationProcessor 'org.projectlombok:lombok' + + implementation project(":eventmesh-meta:eventmesh-meta-api") + implementation project(":eventmesh-common") + + implementation "com.alipay.sofa:jraft-core:1.3.14" + implementation "com.alipay.sofa:rpc-grpc-impl:1.3.14" + testImplementation 'org.junit.jupiter:junit-jupiter:5.6.0' +} + +protobuf { + protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" } + plugins { + grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" } + } + generateProtoTasks { + all()*.plugins { + grpc {} + } + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/gradle.properties b/eventmesh-meta/eventmesh-meta-raft/gradle.properties new file mode 100644 index 0000000000..0010b2d014 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/gradle.properties @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +pluginType=metaStorage +pluginName=raft \ No newline at end of file diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java new file mode 100644 index 0000000000..0b0411db80 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants; +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import com.alipay.sofa.jraft.Closure; + + +public abstract class EventClosure implements Closure { + + private CompletableFuture future; + + private RequestResponse requestResponse; + + private EventOperation eventOperation; + + + public void setFuture(CompletableFuture future) { + this.future = future; + } + + public void setRequestResponse(RequestResponse requestResponse) { + this.requestResponse = requestResponse; + if (future != null) { + future.complete(getRequestResponse()); + } + } + + public RequestResponse getRequestResponse() { + return requestResponse; + } + + public EventOperation getEventOperation() { + return eventOperation; + } + + protected void failure(final String errorMsg, final String redirect) { + final RequestResponse response = RequestResponse.newBuilder().setSuccess(false).setErrorMsg(errorMsg) + .setRedirect(redirect).build(); + setRequestResponse(response); + } + + public void setEventOperation(EventOperation opreation) { + this.eventOperation = opreation; + } + + protected void success(final Map map) { + + final RequestResponse response = RequestResponse.newBuilder().setValue(MetaRaftConstants.RESPONSE) + .setSuccess(true).putAllInfo(map).build(); + setRequestResponse(response); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java new file mode 100644 index 0000000000..1208900e8b --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants; +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +import java.io.Serializable; +import java.util.Map; + + +public class EventOperation implements Serializable { + + private static final long serialVersionUID = -6597003954824547294L; + + public static final byte PUT = 0x01; + + public static final byte GET = 0x02; + + private byte op; + private Map data; + + public static EventOperation createOpreation(RequestResponse response) { + if (response.getValue() == MetaRaftConstants.PUT) { + return new EventOperation(PUT, response.getInfoMap()); + } else if (response.getValue() == MetaRaftConstants.GET) { + return new EventOperation(GET, response.getInfoMap()); + } + return null; + } + + public EventOperation(byte op, Map data) { + this.op = op; + this.data = data; + } + + public byte getOp() { + return op; + } + + public Map getData() { + return data; + } + + public boolean isReadOp() { + return GET == this.op; + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java new file mode 100644 index 0000000000..3f65841ec7 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import org.apache.eventmesh.meta.raft.rpc.MetaServerHelper; +import org.apache.eventmesh.meta.raft.rpc.RequestProcessor; +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; + +import com.alipay.sofa.jraft.Node; +import com.alipay.sofa.jraft.RaftGroupService; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; +import com.alipay.sofa.jraft.rpc.RpcServer; + + +public class JraftServer { + + private RaftGroupService raftGroupService; + + private Node node; + + private MetaStateMachine fsm = new MetaStateMachine(); + + public MetaStateMachine getFsm() { + return fsm; + } + + private MetaServiceImpl metaImpl; + + public JraftServer(final String dataPath, final String groupId, final PeerId serverId, + final NodeOptions nodeOptions) throws IOException { + // init raft data path, it contains log,meta,snapshot + FileUtils.forceMkdir(new File(dataPath)); + // here use same RPC server for raft and business. It also can be seperated generally + final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); + MetaServerHelper.initGRpc(); + MetaServerHelper.setRpcServer(rpcServer); + // register business processor + metaImpl = new MetaServiceImpl(this); + rpcServer.registerProcessor(new RequestProcessor(metaImpl)); + nodeOptions.setFsm(this.fsm); + // set storage path (log,meta,snapshot) + // log, must + nodeOptions.setLogUri(dataPath + File.separator + "log"); + // meta, must + nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta"); + // snapshot, optional, generally recommended + nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); + // init raft group service framework + this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer); + // start raft node + this.node = this.raftGroupService.start(); + + } + + public RequestResponse redirect() { + final RequestResponse.Builder builder = RequestResponse.newBuilder().setSuccess(false); + if (this.node != null) { + final PeerId leader = this.node.getLeaderId(); + if (leader != null) { + builder.setRedirect(leader.toString()); + } + } + return builder.build(); + } + + public MetaServiceImpl getMetaImpl() { + return metaImpl; + } + + public Node getNode() { + return node; + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaService.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaService.java new file mode 100644 index 0000000000..87c77b5589 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + + +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +/** + * MetaService. + */ +public interface MetaService { + + void handle(RequestResponse request, EventClosure closure); +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java new file mode 100644 index 0000000000..a2be9c831f --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +import org.apache.commons.lang.StringUtils; + +import java.nio.ByteBuffer; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.entity.Task; +import com.alipay.sofa.jraft.error.RaftError; + +public class MetaServiceImpl implements MetaService { + + JraftServer server; + + + public MetaServiceImpl(JraftServer server) { + this.server = server; + } + + @Override + public void handle(RequestResponse request, EventClosure closure) { + applyOperation(EventOperation.createOpreation(request), closure); + } + + public void applyOperation(EventOperation opreation, EventClosure closure) { + if (!isLeader()) { + handlerNotLeaderError(closure); + return; + } + try { + closure.setEventOperation(opreation); + final Task task = new Task(); + task.setData(ByteBuffer.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(opreation))); + task.setDone(closure); + this.server.getNode().apply(task); + } catch (CodecException e) { + String errorMsg = "Fail to encode EventOperation"; + e.printStackTrace(System.err); + closure.failure(errorMsg, StringUtils.EMPTY); + closure.run(new Status(RaftError.EINTERNAL, errorMsg)); + } + } + + + private String getRedirect() { + return this.server.redirect().getRedirect(); + } + + private boolean isLeader() { + return this.server.getFsm().isLeader(); + } + + + private void handlerNotLeaderError(final EventClosure closure) { + closure.failure("Not leader.", getRedirect()); + closure.run(new Status(RaftError.EPERM, "Not leader")); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java new file mode 100644 index 0000000000..0803534177 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import static org.apache.eventmesh.meta.raft.EventOperation.GET; +import static org.apache.eventmesh.meta.raft.EventOperation.PUT; + +import org.apache.commons.lang.StringUtils; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.Iterator; +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.core.StateMachineAdapter; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; + +public class MetaStateMachine extends StateMachineAdapter { + + private final AtomicLong leaderTerm = new AtomicLong(-1); + + private Map contentTable = new ConcurrentHashMap<>(); + + public boolean isLeader() { + return this.leaderTerm.get() > 0; + } + + @Override + public boolean onSnapshotLoad(final SnapshotReader reader) { + if (isLeader()) { + return false; + } + + return true; + + } + + @Override + public void onApply(Iterator iter) { + while (iter.hasNext()) { + Exception e1 = null; + EventOperation eventOperation = null; + EventClosure closure = null; + if (iter.done() != null) { + // This task is applied by this node, get value from closure to avoid additional parsing. + closure = (EventClosure) iter.done(); + eventOperation = closure.getEventOperation(); + } else { + // Have to parse FetchAddRequest from this user log. + final ByteBuffer data = iter.getData(); + try { + eventOperation = SerializerManager.getSerializer(SerializerManager.Hessian2) + .deserialize(data.array(), EventOperation.class.getName()); + } catch (final CodecException e) { + e.printStackTrace(System.err); + e1 = e; + + } + // follower ignore read operation + if (eventOperation != null && eventOperation.isReadOp()) { + iter.next(); + continue; + } + } + if (eventOperation != null) { + switch (eventOperation.getOp()) { + case GET: + break; + case PUT: + Map tempTable = eventOperation.getData(); + contentTable.putAll(tempTable); + break; + default: + break; + } + + if (closure != null) { + if (e1 != null) { + closure.failure(e1.getMessage(), StringUtils.EMPTY); + } else { + if (eventOperation.getOp() == PUT) { + closure.success(Collections.EMPTY_MAP); + } else { + closure.success(Collections.unmodifiableMap(contentTable)); + } + + } + closure.run(Status.OK()); + } + } + iter.next(); + } + } + + @Override + public void onLeaderStart(final long term) { + this.leaderTerm.set(term); + super.onLeaderStart(term); + + } + + @Override + public void onLeaderStop(final Status status) { + this.leaderTerm.set(-1); + super.onLeaderStop(status); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java new file mode 100644 index 0000000000..4f25162b81 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft; + +import org.apache.eventmesh.api.exception.MetaException; +import org.apache.eventmesh.api.meta.MetaServiceListener; +import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo; +import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo; +import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo; +import org.apache.eventmesh.common.config.ConfigService; +import org.apache.eventmesh.meta.raft.config.RaftMetaStorageConfiguration; +import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants; +import org.apache.eventmesh.meta.raft.rpc.MetaServerHelper; +import org.apache.eventmesh.meta.raft.rpc.RequestResponse; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.alipay.sofa.jraft.CliService; +import com.alipay.sofa.jraft.RaftServiceFactory; +import com.alipay.sofa.jraft.RouteTable; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.core.CliServiceImpl; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.error.RemotingException; +import com.alipay.sofa.jraft.option.CliOptions; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RaftMetaService implements org.apache.eventmesh.api.meta.MetaService { + + private final AtomicBoolean initStatus = new AtomicBoolean(false); + + private final AtomicBoolean startStatus = new AtomicBoolean(false); + + RaftMetaStorageConfiguration configuration; + + private JraftServer jraftServer; + + private CliService cliService; + + private CliClientServiceImpl cliClientService; + + private PeerId leader; + + private Thread refreshThread; + + @Override + public void init() throws MetaException { + if (!initStatus.compareAndSet(false, true)) { + return; + } + ConfigService configService = ConfigService.getInstance(); + configuration = configService.buildConfigInstance(RaftMetaStorageConfiguration.class); + } + + @Override + public void start() throws MetaException { + final String dataPath = configuration.getDataPath(); + final String groupId = MetaRaftConstants.GROUP; + final String serverIdStr = configuration.getSelfIpAndPort(); + final String initConfStr = configuration.getMembersIpAndPort(); + final NodeOptions nodeOptions = new NodeOptions(); + nodeOptions.setElectionTimeoutMs(configuration.getElectionTimeoutMs()); + nodeOptions.setDisableCli(false); + nodeOptions.setSnapshotIntervalSecs(configuration.getSnapshotIntervalSecs()); + final PeerId serverId = new PeerId(); + if (!serverId.parse(serverIdStr)) { + throw new MetaException("Fail to parse serverId:" + serverIdStr); + } + final Configuration initConf = new Configuration(); + if (!initConf.parse(initConfStr)) { + throw new MetaException("Fail to parse initConf:" + initConfStr); + } + initConf.addPeer(serverId); + nodeOptions.setInitialConf(initConf); + try { + jraftServer = new JraftServer(dataPath, groupId, serverId, nodeOptions); + } catch (IOException e) { + throw new MetaException("fail to start jraft server", e); + } + log.info("Started jraft server at port: {}", jraftServer.getNode().getNodeId().getPeerId().getPort()); + + final Configuration conf = new Configuration(); + if (!conf.parse(serverIdStr)) { + throw new IllegalArgumentException("Fail to parse conf:" + serverIdStr); + } + RouteTable.getInstance().updateConfiguration(MetaRaftConstants.GROUP, conf); + cliService = RaftServiceFactory.createAndInitCliService(new CliOptions()); + cliClientService = (CliClientServiceImpl) ((CliServiceImpl) this.cliService).getCliClientService(); + refreshThread = new Thread(() -> { + try { + RaftMetaService.this.refreshleader(); + Thread.sleep(configuration.getRefreshLeaderInterval()); + } catch (Exception e) { + log.error("fail to Refresh Leader", e); + } + }); + refreshThread.setName("[Raft-refresh-leader-Thread]"); + refreshThread.setDaemon(true); + refreshThread.start(); + startStatus.compareAndSet(false, true); + } + + private void refreshleader() throws InterruptedException, TimeoutException { + if (!RouteTable.getInstance().refreshLeader(cliClientService, MetaRaftConstants.GROUP, 3000).isOk()) { + throw new IllegalStateException("Refresh leader failed"); + } + this.leader = RouteTable.getInstance().selectLeader(MetaRaftConstants.GROUP); + log.info("raft Leader is {}", leader); + } + + @Override + public void shutdown() throws MetaException { + if (!startStatus.compareAndSet(true, false)) { + return; + } + MetaServerHelper.shutDown(); + if (cliService != null) { + cliService.shutdown(); + } + if (cliClientService != null) { + cliClientService.shutdown(); + } + } + + @Override + public List findEventMeshInfoByCluster(String clusterName) throws MetaException { + return null; + } + + @Override + public List findAllEventMeshInfo() throws MetaException { + return null; + } + + @Override + public void registerMetadata(Map metadataMap) { + + } + + @Override + public Map getMetaData(String key, boolean fuzzyEnabled) { + return null; + } + + @Override + public void getMetaDataWithListener(MetaServiceListener metaServiceListener, String key) { + + } + + @Override + public void updateMetaData(Map metadataMap) { + + } + + @Override + public void removeMetaData(String key) { + + } + + @Override + public boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws MetaException { + return false; + } + + @Override + public boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throws MetaException { + return false; + } + + public CompletableFuture commit(RequestResponse requestResponse, EventClosure eventClosure) + throws RemotingException, InterruptedException { + CompletableFuture future = new CompletableFuture<>(); + eventClosure.setFuture(future); + if (isLeader()) { + this.jraftServer.getMetaImpl().handle(requestResponse, eventClosure); + } else { + invokeToLeader(requestResponse, future); + } + return future; + } + + private void invokeToLeader(RequestResponse requestResponse, CompletableFuture future) + throws RemotingException, InterruptedException { + cliClientService.getRpcClient().invokeAsync(leader.getEndpoint(), requestResponse, (result, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + future.complete((RequestResponse) result); + }, 3000); + } + + + private boolean isLeader() { + return this.jraftServer.getFsm().isLeader(); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/config/RaftMetaStorageConfiguration.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/config/RaftMetaStorageConfiguration.java new file mode 100644 index 0000000000..d5f4a80191 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/config/RaftMetaStorageConfiguration.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.config; + +import org.apache.eventmesh.common.config.Config; +import org.apache.eventmesh.common.config.ConfigField; +import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@Config(prefix = "eventMesh.metaStorage.raft") +public class RaftMetaStorageConfiguration { + + @ConfigField(field = MetaRaftConstants.DATAPATH) + private String dataPath; + + @ConfigField(field = MetaRaftConstants.SELF) + private String selfIpAndPort; + + @ConfigField(field = MetaRaftConstants.MEMBERS) + private String membersIpAndPort; + + @ConfigField(field = MetaRaftConstants.ELECTIONTIMEOUT) + private Integer electionTimeoutMs; + + @ConfigField(field = MetaRaftConstants.SNAPSHOTINTERVAL) + private Integer snapshotIntervalSecs; + + @ConfigField(field = MetaRaftConstants.REFRESHLEADERINTERVAL) + private Integer refreshLeaderInterval; + +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java new file mode 100644 index 0000000000..a7f311234c --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.consts; + +/** + * MetaRaftConstants. + */ + +public interface MetaRaftConstants { + + String GROUP = "EM_META"; + + String SELF = "self"; + + String MEMBERS = "members"; + + String DATAPATH = "dataPath"; + + String ELECTIONTIMEOUT = "electionTimeout"; + + String SNAPSHOTINTERVAL = "snapshotInterval"; + + String REFRESHLEADERINTERVAL = "refreshLeaderInterval"; + + int PUT = 1; + + int GET = 2; + + int RESPONSE = 3; +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/MetaServerHelper.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/MetaServerHelper.java new file mode 100644 index 0000000000..abaeb58d8d --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/MetaServerHelper.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.rpc; + +import java.lang.reflect.Method; +import java.util.concurrent.TimeUnit; + +import com.alipay.sofa.jraft.rpc.RpcServer; +import com.alipay.sofa.jraft.util.RpcFactoryHelper; +import com.google.protobuf.Message; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MetaServerHelper { + + public static RpcServer rpcServer; + + + public static void setRpcServer(RpcServer rpcServer) { + MetaServerHelper.rpcServer = rpcServer; + } + + + public static void initGRpc() { + if ("com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory".equals( + RpcFactoryHelper.rpcFactory().getClass().getName())) { + RpcFactoryHelper.rpcFactory() + .registerProtobufSerializer(RequestResponse.class.getName(), + RequestResponse.getDefaultInstance()); + try { + Class clazz = Class.forName("com.alipay.sofa.jraft.rpc.impl.MarshallerHelper"); + Method registerRespInstance = clazz.getMethod("registerRespInstance", String.class, Message.class); + registerRespInstance.invoke(null, RequestResponse.class.getName(), + RequestResponse.getDefaultInstance()); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } + + public static void shutDown() { + if (rpcServer == null) { + return; + } + rpcServer.shutdown(); + } + + public static void blockUntilShutdown() { + if (rpcServer == null) { + return; + } + if ("com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory".equals( + RpcFactoryHelper.rpcFactory().getClass().getName())) { + try { + Method getServer = rpcServer.getClass().getMethod("getServer"); + Object grpcServer = getServer.invoke(rpcServer); + + Method shutdown = grpcServer.getClass().getMethod("shutdown"); + Method awaitTerminationLimit = grpcServer.getClass() + .getMethod("awaitTermination", long.class, TimeUnit.class); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + shutdown.invoke(grpcServer); + awaitTerminationLimit.invoke(grpcServer, 30, TimeUnit.SECONDS); + } catch (Exception e) { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + log.error(e.getMessage(), e); + } + } + }); + Method awaitTermination = grpcServer.getClass().getMethod("awaitTermination"); + awaitTermination.invoke(grpcServer); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java new file mode 100644 index 0000000000..112eeaff69 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.rpc; + +import org.apache.eventmesh.meta.raft.EventClosure; +import org.apache.eventmesh.meta.raft.MetaService; + +import com.alipay.sofa.jraft.Status; +import com.alipay.sofa.jraft.rpc.RpcContext; +import com.alipay.sofa.jraft.rpc.RpcProcessor; + + +public class RequestProcessor implements RpcProcessor { + + private final MetaService metaService; + + public RequestProcessor(MetaService metaService) { + super(); + this.metaService = metaService; + } + + @Override + public void handleRequest(final RpcContext rpcCtx, final RequestResponse request) { + final EventClosure closure = new EventClosure() { + @Override + public void run(Status status) { + rpcCtx.sendResponse(getRequestResponse()); + } + }; + this.metaService.handle(request, closure); + } + + @Override + public String interest() { + return RequestResponse.class.getName(); + } +} diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/proto/request.proto b/eventmesh-meta/eventmesh-meta-raft/src/main/proto/request.proto new file mode 100644 index 0000000000..6865495858 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/proto/request.proto @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_multiple_files = true; + +option java_package = "org.apache.eventmesh.meta.raft.rpc"; + +message RequestResponse { + int64 value = 1; + bool success = 2; + string redirect = 3; + string errorMsg = 4; + map info = 5; +} + + diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/resources/eventmesh/org.apache.eventmesh.api.meta.MetaService b/eventmesh-meta/eventmesh-meta-raft/src/main/resources/eventmesh/org.apache.eventmesh.api.meta.MetaService new file mode 100644 index 0000000000..4771d7fc1c --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/resources/eventmesh/org.apache.eventmesh.api.meta.MetaService @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +raft=org.apache.eventmesh.meta.raft.RaftMetaService \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 6a8e27bf98..7e223d9a3c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -98,6 +98,7 @@ include 'eventmesh-meta:eventmesh-meta-nacos' include 'eventmesh-meta:eventmesh-meta-etcd' include 'eventmesh-meta:eventmesh-meta-consul' include 'eventmesh-meta:eventmesh-meta-zookeeper' +include 'eventmesh-meta:eventmesh-meta-raft' include 'eventmesh-protocol-plugin' include 'eventmesh-protocol-plugin:eventmesh-protocol-api' @@ -127,3 +128,6 @@ include 'eventmesh-webhook:eventmesh-webhook-receive' include 'eventmesh-retry' include 'eventmesh-retry:eventmesh-retry-api' include 'eventmesh-retry:eventmesh-retry-rocketmq' + + + From 027dc9b5c0281e98914815d0b5e46ff73a8cdb69 Mon Sep 17 00:00:00 2001 From: karsonto Date: Wed, 24 Apr 2024 11:19:23 +0800 Subject: [PATCH 2/7] init function --- .../eventmesh-meta-raft/build.gradle | 2 +- .../eventmesh/meta/raft/EventClosure.java | 10 ++++ .../eventmesh/meta/raft/EventOperation.java | 5 ++ .../eventmesh/meta/raft/MetaServiceImpl.java | 1 - .../eventmesh/meta/raft/MetaStateMachine.java | 7 +++ .../eventmesh/meta/raft/RaftMetaService.java | 49 +++++++++++++++++++ .../meta/raft/consts/MetaRaftConstants.java | 4 +- .../org.apache.eventmesh.api.meta.MetaService | 0 eventmesh-runtime/conf/eventmesh.properties | 12 ++++- 9 files changed, 85 insertions(+), 5 deletions(-) rename eventmesh-meta/eventmesh-meta-raft/src/main/resources/{ => META-INF}/eventmesh/org.apache.eventmesh.api.meta.MetaService (100%) diff --git a/eventmesh-meta/eventmesh-meta-raft/build.gradle b/eventmesh-meta/eventmesh-meta-raft/build.gradle index 99af858e10..4fb3b7bc4a 100644 --- a/eventmesh-meta/eventmesh-meta-raft/build.gradle +++ b/eventmesh-meta/eventmesh-meta-raft/build.gradle @@ -41,7 +41,7 @@ dependencies { implementation project(":eventmesh-meta:eventmesh-meta-api") implementation project(":eventmesh-common") - + implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.0.1' implementation "com.alipay.sofa:jraft-core:1.3.14" implementation "com.alipay.sofa:rpc-grpc-impl:1.3.14" testImplementation 'org.junit.jupiter:junit-jupiter:5.6.0' diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java index 0b0411db80..a568a801dc 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventClosure.java @@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture; import com.alipay.sofa.jraft.Closure; +import com.alipay.sofa.jraft.Status; public abstract class EventClosure implements Closure { @@ -34,6 +35,15 @@ public abstract class EventClosure implements Closure { private EventOperation eventOperation; + public static EventClosure createDefaultEventClosure() { + return new EventClosure() { + + @Override + public void run(Status status) { + + } + }; + } public void setFuture(CompletableFuture future) { this.future = future; diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java index 1208900e8b..b8c7a6cd55 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/EventOperation.java @@ -32,6 +32,8 @@ public class EventOperation implements Serializable { public static final byte GET = 0x02; + public static final byte DELETE = 0x03; + private byte op; private Map data; @@ -40,6 +42,9 @@ public static EventOperation createOpreation(RequestResponse response) { return new EventOperation(PUT, response.getInfoMap()); } else if (response.getValue() == MetaRaftConstants.GET) { return new EventOperation(GET, response.getInfoMap()); + } else if (response.getValue() == MetaRaftConstants.DELETE) { + return new EventOperation(DELETE, response.getInfoMap()); + } return null; } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java index a2be9c831f..a203ba0e9e 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java @@ -56,7 +56,6 @@ public void applyOperation(EventOperation opreation, EventClosure closure) { this.server.getNode().apply(task); } catch (CodecException e) { String errorMsg = "Fail to encode EventOperation"; - e.printStackTrace(System.err); closure.failure(errorMsg, StringUtils.EMPTY); closure.run(new Status(RaftError.EINTERNAL, errorMsg)); } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java index 0803534177..ba9551be6c 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java @@ -17,6 +17,7 @@ package org.apache.eventmesh.meta.raft; +import static org.apache.eventmesh.meta.raft.EventOperation.DELETE; import static org.apache.eventmesh.meta.raft.EventOperation.GET; import static org.apache.eventmesh.meta.raft.EventOperation.PUT; @@ -90,6 +91,12 @@ public void onApply(Iterator iter) { Map tempTable = eventOperation.getData(); contentTable.putAll(tempTable); break; + case DELETE: + Map tempTable2 = eventOperation.getData(); + tempTable2.forEach((key, value) -> { + contentTable.remove(key); + }); + break; default: break; } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java index 4f25162b81..7aa15c1047 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java @@ -28,10 +28,14 @@ import org.apache.eventmesh.meta.raft.rpc.MetaServerHelper; import org.apache.eventmesh.meta.raft.rpc.RequestResponse; +import org.apache.commons.lang3.StringUtils; + import java.io.IOException; +import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,12 +49,16 @@ import com.alipay.sofa.jraft.option.CliOptions; import com.alipay.sofa.jraft.option.NodeOptions; import com.alipay.sofa.jraft.rpc.impl.cli.CliClientServiceImpl; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; import lombok.extern.slf4j.Slf4j; @Slf4j public class RaftMetaService implements org.apache.eventmesh.api.meta.MetaService { + private static ObjectMapper objectMapper = new ObjectMapper(); + private final AtomicBoolean initStatus = new AtomicBoolean(false); private final AtomicBoolean startStatus = new AtomicBoolean(false); @@ -183,14 +191,55 @@ public void removeMetaData(String key) { @Override public boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws MetaException { + //key= IP@@PORT@@CLUSTER_NAME + String[] ipAndPort = eventMeshRegisterInfo.getEndPoint().split(":"); + String clusterName = eventMeshRegisterInfo.getEventMeshClusterName(); + String key = ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + clusterName; + InfoInner infoInner = new InfoInner(eventMeshRegisterInfo); + String registerInfo = null; + try { + registerInfo = objectMapper.writeValueAsString(infoInner); + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.PUT).putInfo(key, registerInfo).build(); + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + return requestResponse.getSuccess(); + } + } catch (Exception e) { + throw new MetaException("fail to serialize ", e); + } return false; } @Override public boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throws MetaException { + //key= IP@@PORT@@CLUSTER_NAME + String[] ipAndPort = eventMeshUnRegisterInfo.getEndPoint().split(":"); + String clusterName = eventMeshUnRegisterInfo.getEventMeshClusterName(); + String key = ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + clusterName; + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.DELETE).putInfo(key, StringUtils.EMPTY).build(); + try { + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + return requestResponse.getSuccess(); + } + } catch (Exception e) { + throw new MetaException(e.getMessage(), e); + } return false; } + @Data + class InfoInner implements Serializable { + + EventMeshRegisterInfo eventMeshRegisterInfo; + + public InfoInner(EventMeshRegisterInfo eventMeshRegisterInfo) { + this.eventMeshRegisterInfo = eventMeshRegisterInfo; + } + } + public CompletableFuture commit(RequestResponse requestResponse, EventClosure eventClosure) throws RemotingException, InterruptedException { CompletableFuture future = new CompletableFuture<>(); diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java index a7f311234c..6be76855e9 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/consts/MetaRaftConstants.java @@ -41,5 +41,7 @@ public interface MetaRaftConstants { int GET = 2; - int RESPONSE = 3; + int DELETE = 3; + + int RESPONSE = 4; } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/resources/eventmesh/org.apache.eventmesh.api.meta.MetaService b/eventmesh-meta/eventmesh-meta-raft/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.meta.MetaService similarity index 100% rename from eventmesh-meta/eventmesh-meta-raft/src/main/resources/eventmesh/org.apache.eventmesh.api.meta.MetaService rename to eventmesh-meta/eventmesh-meta-raft/src/main/resources/META-INF/eventmesh/org.apache.eventmesh.api.meta.MetaService diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 91a422fa4c..3fa2936525 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -79,12 +79,20 @@ eventMesh.security.validation.type.token=false eventMesh.security.publickey= # metaStorage plugin -eventMesh.metaStorage.plugin.enabled=false -eventMesh.metaStorage.plugin.type=nacos +eventMesh.metaStorage.plugin.enabled=true +eventMesh.metaStorage.plugin.type=raft eventMesh.metaStorage.plugin.server-addr=127.0.0.1:8848 eventMesh.metaStorage.plugin.username=nacos eventMesh.metaStorage.plugin.password=nacos +# metaStorage plugin: raft +eventMesh.metaStorage.raft.dataPath=/tmp/server1 +eventMesh.metaStorage.raft.self=127.0.0.1:9091 +eventMesh.metaStorage.raft.members=127.0.0.1:9092,127.0.0.1:9093 +eventMesh.metaStorage.raft.electionTimeout=3000 +eventMesh.metaStorage.raft.snapshotInterval=30 +eventMesh.metaStorage.raft.refreshLeaderInterval=30 + # metaStorage plugin: nacos #eventMesh.metaStorage.nacos.endpoint= #eventMesh.metaStorage.nacos.accessKey= From 0f0002d92c4236d624813180c72702c128584aaf Mon Sep 17 00:00:00 2001 From: karsonto Date: Wed, 24 Apr 2024 19:55:32 +0800 Subject: [PATCH 3/7] finish this function. --- .../eventmesh/meta/raft/MetaStateMachine.java | 63 ++++++- .../eventmesh/meta/raft/RaftMetaService.java | 169 +++++++++++++++--- .../meta/raft/snapshot/MetaSnapshotFile.java | 63 +++++++ eventmesh-runtime/conf/eventmesh.properties | 16 +- eventmesh-runtime/conf/log4j2.xml | 9 + 5 files changed, 289 insertions(+), 31 deletions(-) create mode 100644 eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/snapshot/MetaSnapshotFile.java diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java index ba9551be6c..a0607f5ab4 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaStateMachine.java @@ -21,25 +21,43 @@ import static org.apache.eventmesh.meta.raft.EventOperation.GET; import static org.apache.eventmesh.meta.raft.EventOperation.PUT; +import org.apache.eventmesh.meta.raft.snapshot.MetaSnapshotFile; + import org.apache.commons.lang.StringUtils; +import java.io.File; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import com.alipay.remoting.exception.CodecException; import com.alipay.remoting.serialization.SerializerManager; +import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Iterator; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.core.StateMachineAdapter; +import com.alipay.sofa.jraft.error.RaftError; import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; +import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; +import com.fasterxml.jackson.databind.ObjectMapper; + +import lombok.extern.slf4j.Slf4j; +@Slf4j public class MetaStateMachine extends StateMachineAdapter { private final AtomicLong leaderTerm = new AtomicLong(-1); + private static ObjectMapper objectMapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private Map contentTable = new ConcurrentHashMap<>(); public boolean isLeader() { @@ -49,11 +67,44 @@ public boolean isLeader() { @Override public boolean onSnapshotLoad(final SnapshotReader reader) { if (isLeader()) { + log.warn("Leader is not supposed to load snapshot"); + return false; + } + if (reader.getFileMeta("data") == null) { + log.error("Fail to find data file in {}", reader.getPath()); + return false; + } + final MetaSnapshotFile snapshot = new MetaSnapshotFile(reader.getPath() + File.separator + "data"); + try { + Map snapshotLoaded = objectMapper.readValue(snapshot.load(), Map.class); + contentTable.clear(); + contentTable.putAll(snapshotLoaded); + return true; + } catch (final IOException e) { + log.error("Fail to load snapshot from {}", snapshot.getPath()); return false; } - return true; + } + @Override + public void onSnapshotSave(SnapshotWriter writer, Closure done) { + executor.submit(() -> { + final MetaSnapshotFile snapshot = new MetaSnapshotFile(writer.getPath() + File.separator + "data"); + try { + if (snapshot.save(objectMapper.writeValueAsString(contentTable))) { + if (writer.addFile("data")) { + done.run(Status.OK()); + } else { + done.run(new Status(RaftError.EIO, "Fail to add file to writer")); + } + } else { + done.run(new Status(RaftError.EIO, "Fail to save snapshot %s", snapshot.getPath())); + } + } catch (IOException e) { + done.run(new Status(RaftError.EIO, "Fail to deserialize snapshot %s", snapshot.getPath())); + } + }); } @Override @@ -90,12 +141,20 @@ public void onApply(Iterator iter) { case PUT: Map tempTable = eventOperation.getData(); contentTable.putAll(tempTable); + log.info("update MetaStateMachine successfully {}", contentTable); break; case DELETE: Map tempTable2 = eventOperation.getData(); tempTable2.forEach((key, value) -> { - contentTable.remove(key); + String remove = contentTable.remove(key); + if (Objects.isNull(remove)) { + log.warn("delete MetaStateMachine key: {} fail.", remove); + } else { + log.info("delete MetaStateMachine key: {} successfully.", remove); + } + }); + break; default: break; diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java index 7aa15c1047..5e1d27e8d6 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java @@ -19,10 +19,12 @@ import org.apache.eventmesh.api.exception.MetaException; import org.apache.eventmesh.api.meta.MetaServiceListener; +import org.apache.eventmesh.api.meta.config.EventMeshMetaConfig; import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo; import org.apache.eventmesh.api.meta.dto.EventMeshRegisterInfo; import org.apache.eventmesh.api.meta.dto.EventMeshUnRegisterInfo; import org.apache.eventmesh.common.config.ConfigService; +import org.apache.eventmesh.common.utils.ConfigurationContextUtil; import org.apache.eventmesh.meta.raft.config.RaftMetaStorageConfiguration; import org.apache.eventmesh.meta.raft.consts.MetaRaftConstants; import org.apache.eventmesh.meta.raft.rpc.MetaServerHelper; @@ -32,9 +34,17 @@ import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -59,6 +69,8 @@ public class RaftMetaService implements org.apache.eventmesh.api.meta.MetaServic private static ObjectMapper objectMapper = new ObjectMapper(); + private ConcurrentMap eventMeshRegisterInfoMap; + private final AtomicBoolean initStatus = new AtomicBoolean(false); private final AtomicBoolean startStatus = new AtomicBoolean(false); @@ -73,13 +85,15 @@ public class RaftMetaService implements org.apache.eventmesh.api.meta.MetaServic private PeerId leader; - private Thread refreshThread; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + @Override public void init() throws MetaException { if (!initStatus.compareAndSet(false, true)) { return; } + eventMeshRegisterInfoMap = new ConcurrentHashMap<>(ConfigurationContextUtil.KEYS.size()); ConfigService configService = ConfigService.getInstance(); configuration = configService.buildConfigInstance(RaftMetaStorageConfiguration.class); } @@ -91,7 +105,7 @@ public void start() throws MetaException { final String serverIdStr = configuration.getSelfIpAndPort(); final String initConfStr = configuration.getMembersIpAndPort(); final NodeOptions nodeOptions = new NodeOptions(); - nodeOptions.setElectionTimeoutMs(configuration.getElectionTimeoutMs()); + nodeOptions.setElectionTimeoutMs(configuration.getElectionTimeoutMs() * 1000); nodeOptions.setDisableCli(false); nodeOptions.setSnapshotIntervalSecs(configuration.getSnapshotIntervalSecs()); final PeerId serverId = new PeerId(); @@ -118,17 +132,29 @@ public void start() throws MetaException { RouteTable.getInstance().updateConfiguration(MetaRaftConstants.GROUP, conf); cliService = RaftServiceFactory.createAndInitCliService(new CliOptions()); cliClientService = (CliClientServiceImpl) ((CliServiceImpl) this.cliService).getCliClientService(); - refreshThread = new Thread(() -> { + while (true) { + try { + refreshleader(); + if (this.leader != null) { + break; + } + } catch (Exception e) { + log.warn("fail to get leader node"); + try { + Thread.sleep(3000L); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + scheduledExecutorService.scheduleAtFixedRate(() -> { try { RaftMetaService.this.refreshleader(); - Thread.sleep(configuration.getRefreshLeaderInterval()); } catch (Exception e) { log.error("fail to Refresh Leader", e); } - }); - refreshThread.setName("[Raft-refresh-leader-Thread]"); - refreshThread.setDaemon(true); - refreshThread.start(); + }, configuration.getRefreshLeaderInterval(), configuration.getRefreshLeaderInterval(), TimeUnit.SECONDS); + startStatus.compareAndSet(false, true); } @@ -145,6 +171,7 @@ public void shutdown() throws MetaException { if (!startStatus.compareAndSet(true, false)) { return; } + scheduledExecutorService.shutdown(); MetaServerHelper.shutDown(); if (cliService != null) { cliService.shutdown(); @@ -156,78 +183,178 @@ public void shutdown() throws MetaException { @Override public List findEventMeshInfoByCluster(String clusterName) throws MetaException { - return null; + List listEventMeshDataInfo = new ArrayList<>(); + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.GET).build(); + boolean result = false; + try { + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + result = requestResponse.getSuccess(); + if (result) { + Map infoMap = requestResponse.getInfoMap(); + for (Entry entry : infoMap.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith("eventMeshInfo@@")) { + if (Objects.isNull(clusterName)) { + if (!key.endsWith("@@" + clusterName)) { + continue; + } + } + EventMeshDataInfo eventMeshDataInfo = objectMapper.readValue(value, EventMeshDataInfo.class); + listEventMeshDataInfo.add(eventMeshDataInfo); + } + } + } + } + } catch (Exception e) { + throw new MetaException("fail to get meta data ", e); + } + return listEventMeshDataInfo; } @Override public List findAllEventMeshInfo() throws MetaException { - return null; + return findEventMeshInfoByCluster(null); } @Override public void registerMetadata(Map metadataMap) { - + for (Map.Entry eventMeshRegisterInfo : eventMeshRegisterInfoMap.entrySet()) { + EventMeshRegisterInfo registerInfo = eventMeshRegisterInfo.getValue(); + registerInfo.setMetadata(metadataMap); + this.register(registerInfo); + } } @Override public Map getMetaData(String key, boolean fuzzyEnabled) { - return null; + Map resultMap = new HashMap<>(); + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.GET).build(); + boolean result = false; + try { + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + result = requestResponse.getSuccess(); + if (result) { + Map infoMap = requestResponse.getInfoMap(); + resultMap.putAll(infoMap); + } + } + } catch (Exception e) { + throw new MetaException("fail to get meta data ", e); + } + if (fuzzyEnabled) { + // todo + } else { + Map finalResult = new HashMap<>(); + finalResult.put(key, resultMap.get(key)); + return finalResult; + } + + return resultMap; } @Override public void getMetaDataWithListener(MetaServiceListener metaServiceListener, String key) { - + //todo } @Override public void updateMetaData(Map metadataMap) { + String protocol = metadataMap.get(EventMeshMetaConfig.EVENT_MESH_PROTO); + String reftDataId = "Raft" + "@@" + protocol; + boolean result = false; + try { + RequestResponse req = + RequestResponse.newBuilder().setValue(MetaRaftConstants.PUT).putInfo(reftDataId, objectMapper.writeValueAsString(metadataMap)) + .build(); + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + result = requestResponse.getSuccess(); + } + } catch (Exception e) { + throw new MetaException("fail to serialize ", e); + } + if (!result) { + throw new MetaException("fail to updateMetaData "); + } } @Override public void removeMetaData(String key) { + RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.DELETE).putInfo(key, StringUtils.EMPTY).build(); + + try { + CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); + RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); + if (requestResponse != null) { + boolean result = requestResponse.getSuccess(); + if (result) { + throw new MetaException("fail to remove MetaData"); + } + } + } catch (Exception e) { + throw new MetaException("fail to remove MetaData", e); + } } @Override public boolean register(EventMeshRegisterInfo eventMeshRegisterInfo) throws MetaException { - //key= IP@@PORT@@CLUSTER_NAME + //key= eventMeshInfo@@eventMeshName@@IP@@PORT@@protocolType@@CLUSTER_NAME + String eventMeshName = eventMeshRegisterInfo.getEventMeshName(); + String protocolType = eventMeshRegisterInfo.getProtocolType(); String[] ipAndPort = eventMeshRegisterInfo.getEndPoint().split(":"); String clusterName = eventMeshRegisterInfo.getEventMeshClusterName(); - String key = ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + clusterName; + String key = "eventMeshInfo" + "@@" + eventMeshName + "@@" + ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + protocolType + "@@" + clusterName; InfoInner infoInner = new InfoInner(eventMeshRegisterInfo); String registerInfo = null; + boolean result = false; try { registerInfo = objectMapper.writeValueAsString(infoInner); RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.PUT).putInfo(key, registerInfo).build(); CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); if (requestResponse != null) { - return requestResponse.getSuccess(); + result = requestResponse.getSuccess(); } } catch (Exception e) { throw new MetaException("fail to serialize ", e); } - return false; + if (result) { + eventMeshRegisterInfoMap.put(eventMeshName, eventMeshRegisterInfo); + } + return result; } @Override public boolean unRegister(EventMeshUnRegisterInfo eventMeshUnRegisterInfo) throws MetaException { - //key= IP@@PORT@@CLUSTER_NAME + //key= eventMeshInfo@@eventMeshName@@IP@@PORT@@protocolType@@CLUSTER_NAME + String eventMeshName = eventMeshUnRegisterInfo.getEventMeshName(); + String protocolType = eventMeshUnRegisterInfo.getProtocolType(); String[] ipAndPort = eventMeshUnRegisterInfo.getEndPoint().split(":"); String clusterName = eventMeshUnRegisterInfo.getEventMeshClusterName(); - String key = ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + clusterName; + String key = "eventMeshInfo" + "@@" + eventMeshName + "@@" + ipAndPort[0] + "@@" + ipAndPort[1] + "@@" + protocolType + "@@" + clusterName; RequestResponse req = RequestResponse.newBuilder().setValue(MetaRaftConstants.DELETE).putInfo(key, StringUtils.EMPTY).build(); + boolean result = false; try { CompletableFuture future = commit(req, EventClosure.createDefaultEventClosure()); RequestResponse requestResponse = future.get(3000, TimeUnit.MILLISECONDS); if (requestResponse != null) { - return requestResponse.getSuccess(); + result = requestResponse.getSuccess(); } } catch (Exception e) { throw new MetaException(e.getMessage(), e); } - return false; + if (result) { + eventMeshRegisterInfoMap.remove(eventMeshName); + } + return result; } @Data diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/snapshot/MetaSnapshotFile.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/snapshot/MetaSnapshotFile.java new file mode 100644 index 0000000000..b185829a81 --- /dev/null +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/snapshot/MetaSnapshotFile.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eventmesh.meta.raft.snapshot; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MetaSnapshotFile { + + private String path; + + public MetaSnapshotFile(String path) { + super(); + this.path = path; + } + + public String getPath() { + return this.path; + } + + /** + * Save value to snapshot file. + */ + public boolean save(final String str) { + try { + FileUtils.writeStringToFile(new File(path), str, Charset.forName("UTF-8")); + return true; + } catch (IOException e) { + log.error("Fail to save snapshot", e); + return false; + } + } + + public String load() throws IOException { + final String s = FileUtils.readFileToString(new File(path), Charset.forName("UTF-8")); + if (!StringUtils.isBlank(s)) { + return s; + } + throw new IOException("Fail to load snapshot from " + path + ",content: " + s); + } +} diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 3fa2936525..a74485876f 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -79,19 +79,19 @@ eventMesh.security.validation.type.token=false eventMesh.security.publickey= # metaStorage plugin -eventMesh.metaStorage.plugin.enabled=true -eventMesh.metaStorage.plugin.type=raft +eventMesh.metaStorage.plugin.enabled=false +eventMesh.metaStorage.plugin.type=nacos eventMesh.metaStorage.plugin.server-addr=127.0.0.1:8848 eventMesh.metaStorage.plugin.username=nacos eventMesh.metaStorage.plugin.password=nacos # metaStorage plugin: raft -eventMesh.metaStorage.raft.dataPath=/tmp/server1 -eventMesh.metaStorage.raft.self=127.0.0.1:9091 -eventMesh.metaStorage.raft.members=127.0.0.1:9092,127.0.0.1:9093 -eventMesh.metaStorage.raft.electionTimeout=3000 -eventMesh.metaStorage.raft.snapshotInterval=30 -eventMesh.metaStorage.raft.refreshLeaderInterval=30 +#eventMesh.metaStorage.raft.dataPath=/tmp/server1 +#eventMesh.metaStorage.raft.self=127.0.0.1:9091 +#eventMesh.metaStorage.raft.members=127.0.0.1:9092,127.0.0.1:9093 +#eventMesh.metaStorage.raft.electionTimeout=5 +#eventMesh.metaStorage.raft.snapshotInterval=30 +#eventMesh.metaStorage.raft.refreshLeaderInterval=3 # metaStorage plugin: nacos #eventMesh.metaStorage.nacos.endpoint= diff --git a/eventmesh-runtime/conf/log4j2.xml b/eventmesh-runtime/conf/log4j2.xml index 3ce5ac985e..6341a0e629 100644 --- a/eventmesh-runtime/conf/log4j2.xml +++ b/eventmesh-runtime/conf/log4j2.xml @@ -95,5 +95,14 @@ + + + + + + + + + \ No newline at end of file From d0154ad2a40427a838f7004830f49d6f78612d50 Mon Sep 17 00:00:00 2001 From: karsonto Date: Thu, 25 Apr 2024 10:41:20 +0800 Subject: [PATCH 4/7] rename file and remove unnecessary plugin. --- .../eventmesh-meta-raft/build.gradle | 7 +----- ...MetaService.java => JraftMetaService.java} | 2 +- ...iceImpl.java => JraftMetaServiceImpl.java} | 24 +++++++++---------- .../eventmesh/meta/raft/JraftServer.java | 6 ++--- .../eventmesh/meta/raft/RaftMetaService.java | 3 ++- .../meta/raft/rpc/RequestProcessor.java | 6 ++--- 6 files changed, 22 insertions(+), 26 deletions(-) rename eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/{MetaService.java => JraftMetaService.java} (96%) rename eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/{MetaServiceImpl.java => JraftMetaServiceImpl.java} (94%) diff --git a/eventmesh-meta/eventmesh-meta-raft/build.gradle b/eventmesh-meta/eventmesh-meta-raft/build.gradle index 4fb3b7bc4a..386b5c6e72 100644 --- a/eventmesh-meta/eventmesh-meta-raft/build.gradle +++ b/eventmesh-meta/eventmesh-meta-raft/build.gradle @@ -16,15 +16,10 @@ */ plugins { - id 'java' id 'com.google.protobuf' version '0.8.17' } -repositories { - mavenCentral() -} - -def grpcVersion = '1.43.2' // CURRENT_GRPC_VERSION +def grpcVersion = '1.50.2' // CURRENT_GRPC_VERSION def protobufVersion = '3.21.5' def protocVersion = protobufVersion diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaService.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaService.java similarity index 96% rename from eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaService.java rename to eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaService.java index 87c77b5589..daf636ccd4 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaService.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaService.java @@ -23,7 +23,7 @@ /** * MetaService. */ -public interface MetaService { +public interface JraftMetaService { void handle(RequestResponse request, EventClosure closure); } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaServiceImpl.java similarity index 94% rename from eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java rename to eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaServiceImpl.java index a203ba0e9e..1af6d5c963 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/MetaServiceImpl.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftMetaServiceImpl.java @@ -29,20 +29,20 @@ import com.alipay.sofa.jraft.entity.Task; import com.alipay.sofa.jraft.error.RaftError; -public class MetaServiceImpl implements MetaService { - +public class JraftMetaServiceImpl implements JraftMetaService { + JraftServer server; - - - public MetaServiceImpl(JraftServer server) { + + + public JraftMetaServiceImpl(JraftServer server) { this.server = server; } - + @Override public void handle(RequestResponse request, EventClosure closure) { applyOperation(EventOperation.createOpreation(request), closure); } - + public void applyOperation(EventOperation opreation, EventClosure closure) { if (!isLeader()) { handlerNotLeaderError(closure); @@ -60,17 +60,17 @@ public void applyOperation(EventOperation opreation, EventClosure closure) { closure.run(new Status(RaftError.EINTERNAL, errorMsg)); } } - - + + private String getRedirect() { return this.server.redirect().getRedirect(); } - + private boolean isLeader() { return this.server.getFsm().isLeader(); } - - + + private void handlerNotLeaderError(final EventClosure closure) { closure.failure("Not leader.", getRedirect()); closure.run(new Status(RaftError.EPERM, "Not leader")); diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java index 3f65841ec7..ad8cb14d96 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/JraftServer.java @@ -46,7 +46,7 @@ public MetaStateMachine getFsm() { return fsm; } - private MetaServiceImpl metaImpl; + private JraftMetaServiceImpl metaImpl; public JraftServer(final String dataPath, final String groupId, final PeerId serverId, final NodeOptions nodeOptions) throws IOException { @@ -57,7 +57,7 @@ public JraftServer(final String dataPath, final String groupId, final PeerId ser MetaServerHelper.initGRpc(); MetaServerHelper.setRpcServer(rpcServer); // register business processor - metaImpl = new MetaServiceImpl(this); + metaImpl = new JraftMetaServiceImpl(this); rpcServer.registerProcessor(new RequestProcessor(metaImpl)); nodeOptions.setFsm(this.fsm); // set storage path (log,meta,snapshot) @@ -85,7 +85,7 @@ public RequestResponse redirect() { return builder.build(); } - public MetaServiceImpl getMetaImpl() { + public JraftMetaServiceImpl getMetaImpl() { return metaImpl; } diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java index 5e1d27e8d6..d9cf371630 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/RaftMetaService.java @@ -18,6 +18,7 @@ package org.apache.eventmesh.meta.raft; import org.apache.eventmesh.api.exception.MetaException; +import org.apache.eventmesh.api.meta.MetaService; import org.apache.eventmesh.api.meta.MetaServiceListener; import org.apache.eventmesh.api.meta.config.EventMeshMetaConfig; import org.apache.eventmesh.api.meta.dto.EventMeshDataInfo; @@ -65,7 +66,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class RaftMetaService implements org.apache.eventmesh.api.meta.MetaService { +public class RaftMetaService implements MetaService { private static ObjectMapper objectMapper = new ObjectMapper(); diff --git a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java index 112eeaff69..2fd77ec5b2 100644 --- a/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java +++ b/eventmesh-meta/eventmesh-meta-raft/src/main/java/org/apache/eventmesh/meta/raft/rpc/RequestProcessor.java @@ -18,7 +18,7 @@ package org.apache.eventmesh.meta.raft.rpc; import org.apache.eventmesh.meta.raft.EventClosure; -import org.apache.eventmesh.meta.raft.MetaService; +import org.apache.eventmesh.meta.raft.JraftMetaService; import com.alipay.sofa.jraft.Status; import com.alipay.sofa.jraft.rpc.RpcContext; @@ -27,9 +27,9 @@ public class RequestProcessor implements RpcProcessor { - private final MetaService metaService; + private final JraftMetaService metaService; - public RequestProcessor(MetaService metaService) { + public RequestProcessor(JraftMetaService metaService) { super(); this.metaService = metaService; } From 7414f5c41db5361d5540b12a0a63dd609d70b832 Mon Sep 17 00:00:00 2001 From: karsonto Date: Thu, 25 Apr 2024 14:48:49 +0800 Subject: [PATCH 5/7] properties modify --- eventmesh-runtime/conf/eventmesh.properties | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index a74485876f..97bed9a860 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -86,11 +86,17 @@ eventMesh.metaStorage.plugin.username=nacos eventMesh.metaStorage.plugin.password=nacos # metaStorage plugin: raft -#eventMesh.metaStorage.raft.dataPath=/tmp/server1 +#Path to local snapshot storage for raft data# +#eventMesh.metaStorage.raft.dataPath=/tmp/eventmesh-meta-raft +###########Raft local ip and port######### #eventMesh.metaStorage.raft.self=127.0.0.1:9091 -#eventMesh.metaStorage.raft.members=127.0.0.1:9092,127.0.0.1:9093 +###########Members ip and port######### +#eventMesh.metaStorage.raft.members=127.0.0.2:9091,127.0.0.3:9091 +###########Raft leader election timeout second######### #eventMesh.metaStorage.raft.electionTimeout=5 +###########Raft snapshot interval second######### #eventMesh.metaStorage.raft.snapshotInterval=30 +###########Raft refresh leader interval second######### #eventMesh.metaStorage.raft.refreshLeaderInterval=3 # metaStorage plugin: nacos From 0d08d71102ccbfa13d0d2ede59af6da484fae6bb Mon Sep 17 00:00:00 2001 From: karsonto Date: Thu, 25 Apr 2024 15:17:22 +0800 Subject: [PATCH 6/7] properties modify --- eventmesh-runtime/conf/eventmesh.properties | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index 97bed9a860..f358fb1045 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -88,15 +88,15 @@ eventMesh.metaStorage.plugin.password=nacos # metaStorage plugin: raft #Path to local snapshot storage for raft data# #eventMesh.metaStorage.raft.dataPath=/tmp/eventmesh-meta-raft -###########Raft local ip and port######### +#Raft local ip and port #eventMesh.metaStorage.raft.self=127.0.0.1:9091 -###########Members ip and port######### -#eventMesh.metaStorage.raft.members=127.0.0.2:9091,127.0.0.3:9091 -###########Raft leader election timeout second######### +#Members ip and port +#eventMesh.metaStorage.raft.members=192.168.1.2:9091,192.168.1.3:9091 +#Raft leader election timeout second #eventMesh.metaStorage.raft.electionTimeout=5 -###########Raft snapshot interval second######### +#Raft snapshot interval second #eventMesh.metaStorage.raft.snapshotInterval=30 -###########Raft refresh leader interval second######### +#Raft refresh leader interval second #eventMesh.metaStorage.raft.refreshLeaderInterval=3 # metaStorage plugin: nacos From e9b300c7a056a79219580114a7cf162c80d13798 Mon Sep 17 00:00:00 2001 From: karsonto Date: Thu, 25 Apr 2024 15:19:50 +0800 Subject: [PATCH 7/7] properties modify --- eventmesh-runtime/conf/eventmesh.properties | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/eventmesh-runtime/conf/eventmesh.properties b/eventmesh-runtime/conf/eventmesh.properties index f358fb1045..4984181ca8 100644 --- a/eventmesh-runtime/conf/eventmesh.properties +++ b/eventmesh-runtime/conf/eventmesh.properties @@ -86,17 +86,17 @@ eventMesh.metaStorage.plugin.username=nacos eventMesh.metaStorage.plugin.password=nacos # metaStorage plugin: raft -#Path to local snapshot storage for raft data# +# Path to local snapshot storage for raft data #eventMesh.metaStorage.raft.dataPath=/tmp/eventmesh-meta-raft -#Raft local ip and port +# Raft local ip and port #eventMesh.metaStorage.raft.self=127.0.0.1:9091 -#Members ip and port +# Members ip and port #eventMesh.metaStorage.raft.members=192.168.1.2:9091,192.168.1.3:9091 -#Raft leader election timeout second +# Raft leader election timeout second #eventMesh.metaStorage.raft.electionTimeout=5 -#Raft snapshot interval second +# Raft snapshot interval second #eventMesh.metaStorage.raft.snapshotInterval=30 -#Raft refresh leader interval second +# Raft refresh leader interval second #eventMesh.metaStorage.raft.refreshLeaderInterval=3 # metaStorage plugin: nacos