diff --git a/pom.xml b/pom.xml index eeef3c7e..35632cfd 100644 --- a/pom.xml +++ b/pom.xml @@ -67,6 +67,7 @@ 2.6 3.2.1 3.4.4 + 0.6.0 3.5.3 1.8 4.13.1 @@ -96,6 +97,12 @@ slf4j-api ${slf4j.version} + + org.apache.fury + fury-core + ${fury.version} + provided + io.netty netty-all diff --git a/src/main/java/com/alipay/remoting/serialization/HessianSerializer.java b/src/main/java/com/alipay/remoting/serialization/HessianSerializer.java index 87fcec1e..9dc32282 100644 --- a/src/main/java/com/alipay/remoting/serialization/HessianSerializer.java +++ b/src/main/java/com/alipay/remoting/serialization/HessianSerializer.java @@ -34,12 +34,8 @@ public class HessianSerializer implements Serializer { private SerializerFactory serializerFactory = new SerializerFactory(); - private static ThreadLocal localOutputByteArray = new ThreadLocal() { - @Override - protected ByteArrayOutputStream initialValue() { - return new ByteArrayOutputStream(); - } - }; + private static ThreadLocal localOutputByteArray = ThreadLocal.withInitial( + ByteArrayOutputStream::new); /** * @see com.alipay.remoting.serialization.Serializer#serialize(java.lang.Object) diff --git a/src/main/java/com/alipay/remoting/serialization/SerializerManager.java b/src/main/java/com/alipay/remoting/serialization/SerializerManager.java index b30b36e3..1da42cda 100644 --- a/src/main/java/com/alipay/remoting/serialization/SerializerManager.java +++ b/src/main/java/com/alipay/remoting/serialization/SerializerManager.java @@ -17,6 +17,7 @@ package com.alipay.remoting.serialization; import java.util.concurrent.locks.ReentrantLock; +import com.alipay.remoting.serialization.fury.FurySerializer; /** * Manage all serializers. @@ -32,19 +33,27 @@ public class SerializerManager { private static Serializer[] serializers = new Serializer[5]; public static final byte Hessian2 = 1; + //public static final byte Json = 2; + public static final byte Fury = 3; + private static final ReentrantLock REENTRANT_LOCK = new ReentrantLock(); public static Serializer getSerializer(int idx) { Serializer currentSerializer = serializers[idx]; - if (currentSerializer == null && idx == Hessian2) { + if (currentSerializer == null) { REENTRANT_LOCK.lock(); try { currentSerializer = serializers[idx]; if (currentSerializer == null) { - currentSerializer = new HessianSerializer(); - addSerializer(Hessian2, currentSerializer); + if (idx == Hessian2) { + currentSerializer = new HessianSerializer(); + addSerializer(Hessian2, currentSerializer); + } else if (idx == Fury) { + currentSerializer = new FurySerializer(); + addSerializer(Fury, currentSerializer); + } } } finally { REENTRANT_LOCK.unlock(); diff --git a/src/main/java/com/alipay/remoting/serialization/fury/FurySerializer.java b/src/main/java/com/alipay/remoting/serialization/fury/FurySerializer.java new file mode 100644 index 00000000..799128db --- /dev/null +++ b/src/main/java/com/alipay/remoting/serialization/fury/FurySerializer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.remoting.serialization.fury; + +import java.util.ArrayList; +import java.util.List; +import com.alipay.remoting.exception.CodecException; +import com.alipay.remoting.serialization.Serializer; +import org.apache.fury.Fury; +import org.apache.fury.ThreadLocalFury; +import org.apache.fury.ThreadSafeFury; + +/** + * @author jianbin@apache.org + */ +public class FurySerializer implements Serializer { + + private static final List> REGISTRY_LIST = new ArrayList<>(); + + private final ThreadSafeFury fury = new ThreadLocalFury(classLoader -> { + Fury fury = Fury.builder().withRefTracking(true) + .requireClassRegistration(true).withClassLoader(classLoader).build(); + REGISTRY_LIST.forEach(fury::register); + return fury; + }); + + @Override + public byte[] serialize(Object obj) throws CodecException { + try { + return fury.serialize(obj); + } catch (Exception e) { + throw new CodecException("Fury serialization failed", e); + } + } + + @Override + public T deserialize(byte[] data, String classOfT) throws CodecException { + return (T)fury.deserialize(data); + } + + public static void registry(Class clazz) { + REGISTRY_LIST.add(clazz); + } + +} diff --git a/src/test/java/com/alipay/remoting/rpc/protocol/RpcCommandHandlerTest.java b/src/test/java/com/alipay/remoting/rpc/protocol/RpcCommandHandlerTest.java index 41cf21ca..bb709f9d 100644 --- a/src/test/java/com/alipay/remoting/rpc/protocol/RpcCommandHandlerTest.java +++ b/src/test/java/com/alipay/remoting/rpc/protocol/RpcCommandHandlerTest.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -41,7 +42,7 @@ public class RpcCommandHandlerTest { private static RemotingContext remotingContext = null; - private static final List remotingContextList = new ArrayList<>(); + private static final List remotingContextList = Collections.synchronizedList(new ArrayList<>()); private static final CountDownLatch countDownLatch = new CountDownLatch(2); @@ -65,8 +66,8 @@ public void testHandleCommand() throws Exception { msg.add(rpcRequestCommand2); RpcCommandHandler rpcCommandHandler = new RpcCommandHandler(new RpcCommandFactory()); rpcCommandHandler.handleCommand(remotingContext, msg); - countDownLatch.await(10, TimeUnit.SECONDS); - Assert.assertTrue(remotingContextList.size() == 2); + countDownLatch.await(); + Assert.assertEquals(2, remotingContextList.size()); Assert.assertTrue(remotingContextList.get(0).getTimeout() != remotingContextList.get(1).getTimeout()); } @@ -89,7 +90,7 @@ public boolean isStarted() { @Override public BizContext preHandleRequest(RemotingContext remotingCtx, Object request) { - Assert.assertTrue(remotingCtx != remotingContext); + Assert.assertNotSame(remotingCtx, remotingContext); remotingContextList.add(remotingCtx); countDownLatch.countDown(); return null; diff --git a/src/test/java/com/alipay/remoting/serialization/fury/FurySerializerTest.java b/src/test/java/com/alipay/remoting/serialization/fury/FurySerializerTest.java new file mode 100644 index 00000000..7b89e3a3 --- /dev/null +++ b/src/test/java/com/alipay/remoting/serialization/fury/FurySerializerTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.remoting.serialization.fury; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import com.alipay.remoting.serialization.HessianSerializerTest; + +import com.alipay.remoting.exception.CodecException; +import org.apache.fury.exception.InsecureException; +import org.junit.Assert; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * @author jianbin@apache.org + */ +public class FurySerializerTest { + + public static FurySerializer serializer; + + static { + FurySerializer.registry(HessianSerializerTest.class); + serializer = new FurySerializer(); + } + + @Test + public void concurrentSerializeTest() throws InterruptedException { + int concurrentNum = 10; + CountDownLatch countDownLatch = new CountDownLatch(concurrentNum); + for (int i = 0; i < concurrentNum; ++i) { + FurySerializerTest.MyThread thread = new FurySerializerTest.MyThread(countDownLatch); + new Thread(thread).start(); + } + countDownLatch.await(2, TimeUnit.SECONDS); + + } + + @Test + public void testSerializeError() { + FurySerializerTest furySerializerTest = new FurySerializerTest(); + try { + serializer.serialize(furySerializerTest); + } catch (CodecException e) { + Assert.assertEquals(e.getCause().getClass(), InsecureException.class); + } + } + + @Test + public void testSerialize() { + HessianSerializerTest furySerializerTest = new HessianSerializerTest(); + try { + Assert.assertNotNull(serializer.serialize(furySerializerTest)); + } catch (CodecException e) { + fail(); + } + } + + static class MyThread implements Runnable { + CountDownLatch countDownLatch; + + public MyThread(CountDownLatch countDownLatch) { + this.countDownLatch = countDownLatch; + } + + @Override + public void run() { + try { + for (int i = 0; i < 100; i++) { + String randomStr = UUID.randomUUID().toString(); + byte[] bytes = serializer.serialize(randomStr); + String o = serializer.deserialize(bytes, String.class.getName()); + assertEquals(o, randomStr); + } + } catch (Exception e) { + fail(); + } finally { + countDownLatch.countDown(); + } + } + } + +}