Skip to content

Commit

Permalink
zipkin 2.6.x package compatible. (#4368)
Browse files Browse the repository at this point in the history
fixes #3728
  • Loading branch information
cvictory authored and chickenlj committed Jun 26, 2019
1 parent 8534f48 commit 64aea16
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ public RemotingException(Channel channel, String message, Throwable cause) {
public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, String message, Throwable cause) {
super(localAddress, remoteAddress, message, cause);
}

public RemotingException(Exception e){
super(null, e.getMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.remoting.exchange;

/**
* 2019-06-20
*/
@Deprecated
public interface ResponseCallback {
/**
* done.
*
* @param response
*/
void done(Object response);

/**
* caught exception.
*
* @param exception
*/
void caught(Throwable exception);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.remoting.exchange;


import com.alibaba.dubbo.remoting.RemotingException;

/**
* 2019-06-20
*/
@Deprecated
public interface ResponseFuture {
/**
* get result.
*
* @return result.
*/
Object get() throws RemotingException;

/**
* get result with the specified timeout.
*
* @param timeoutInMillis timeout.
* @return result.
*/
Object get(int timeoutInMillis) throws RemotingException;

/**
* set callback.
*
* @param callback
*/
void setCallback(ResponseCallback callback);

/**
* check is done.
*
* @return done or not.
*/
boolean isDone();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package com.alibaba.dubbo.rpc;

import org.apache.dubbo.rpc.FutureContext;

import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

@Deprecated
public class RpcContext extends org.apache.dubbo.rpc.RpcContext {

Expand Down Expand Up @@ -48,4 +55,12 @@ private static RpcContext newInstance(org.apache.dubbo.rpc.RpcContext rpcContext

return copy;
}

public <T> Future<T> getFuture() {
CompletableFuture completableFuture = FutureContext.getCompletableFuture();
if (completableFuture == null) {
return null;
}
return new FutureAdapter(completableFuture);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.dubbo.rpc.protocol.dubbo;

import org.apache.dubbo.rpc.AppResponse;

import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.remoting.exchange.ResponseFuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;

/**
* 2019-06-20
*/
@Deprecated
public class FutureAdapter<V> implements Future<V> {
private CompletableFuture<V> future;

public FutureAdapter(CompletableFuture<V> future) {
this.future = future;

}

public ResponseFuture getFuture() {
return new ResponseFuture() {
@Override
public Object get() throws RemotingException {
try {
return FutureAdapter.this.get();
} catch (InterruptedException e) {
throw new RemotingException(e);
} catch (ExecutionException e) {
throw new RemotingException(e);
}
}

@Override
public Object get(int timeoutInMillis) throws RemotingException {
try {
return FutureAdapter.this.get(timeoutInMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RemotingException(e);
} catch (ExecutionException e) {
throw new RemotingException(e);
} catch (TimeoutException e) {
throw new RemotingException(e);
}
}

@Override
public void setCallback(ResponseCallback callback) {
FutureAdapter.this.setCallback(callback);
}

@Override
public boolean isDone() {
return FutureAdapter.this.isDone();
}
};
}

void setCallback(ResponseCallback callback) {
if (!(future instanceof org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter)) {
return;
}
org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter futureAdapter = (org.apache.dubbo.rpc.protocol.dubbo.FutureAdapter) future;
BiConsumer<AppResponse, ? super Throwable> biConsumer = new BiConsumer<AppResponse, Throwable>() {

@Override
public void accept(AppResponse appResponse, Throwable t) {
if (t != null) {
if (t instanceof CompletionException) {
t = t.getCause();
}
callback.caught(t);
} else {
if (appResponse.hasException()) {
callback.caught(appResponse.getException());
} else {
callback.done((V) appResponse.getValue());
}
}
}
};
futureAdapter.getAppResponseFuture().whenComplete(biConsumer);
}

public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}

public boolean isCancelled() {
return future.isCancelled();
}

public boolean isDone() {
return future.isDone();
}

@SuppressWarnings("unchecked")
public V get() throws InterruptedException, ExecutionException {
return future.get();
}

@SuppressWarnings("unchecked")
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return future.get(timeout, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,7 @@ public V get(long timeout, TimeUnit unit) throws InterruptedException, Execution
}
}

public CompletableFuture<AppResponse> getAppResponseFuture() {
return appResponseFuture;
}
}

0 comments on commit 64aea16

Please sign in to comment.