diff --git a/.travis.yml b/.travis.yml index 05c26fa53735e..00de3217624ed 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,14 +1,14 @@ language: android android: components: - - build-tools-23.0.1 - - android-23 - - platform-tools - - extra-android-support - - extra-google-m2repository - - extra-android-m2repository + - build-tools-23.0.1 + - android-23 + - platform-tools + - extra-android-support + - extra-google-m2repository + - extra-android-m2repository sudo: false script: - - mvn clean install - - mvn checkstyle:check - - cd ./azure-android-client-authentication && ./gradlew check +- mvn clean install +- mvn checkstyle:check +- cd ./azure-android-client-authentication && ./gradlew check diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java b/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java index edc06ef80c6e6..4bc9cd9e55d14 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java @@ -923,7 +923,6 @@ public void run() { } else { ServiceException serviceException = new ServiceException("No async header in response"); pollingCallback.failure(serviceException); - serviceCall.failure(serviceException); } } else { // Check if operation failed diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/DAGNode.java b/azure-client-runtime/src/main/java/com/microsoft/azure/DAGNode.java index 112130413fd87..4e1848fd8c159 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/DAGNode.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/DAGNode.java @@ -10,6 +10,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.locks.ReentrantLock; /** * The type representing node in a {@link DAGraph}. @@ -20,6 +21,7 @@ public class DAGNode extends Node { private List dependentKeys; private int toBeResolved; private boolean isPreparer; + private ReentrantLock lock; /** * Creates a DAG node. @@ -30,6 +32,14 @@ public class DAGNode extends Node { public DAGNode(String key, T data) { super(key, data); dependentKeys = new ArrayList<>(); + lock = new ReentrantLock(); + } + + /** + * @return the lock to be used while performing thread safe operation on this node. + */ + public ReentrantLock lock() { + return this.lock; } /** diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/DAGraph.java b/azure-client-runtime/src/main/java/com/microsoft/azure/DAGraph.java index 58179e0152ed4..153c38f2592a3 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/DAGraph.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/DAGraph.java @@ -7,9 +7,8 @@ package com.microsoft.azure; -import java.util.ArrayDeque; import java.util.Map; -import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; /** * Type representing a DAG (directed acyclic graph). @@ -20,7 +19,7 @@ * @param the type of the nodes in the graph */ public class DAGraph> extends Graph { - private Queue queue; + private ConcurrentLinkedQueue queue; private boolean hasParent; private U rootNode; @@ -31,7 +30,7 @@ public class DAGraph> extends Graph { */ public DAGraph(U rootNode) { this.rootNode = rootNode; - this.queue = new ArrayDeque<>(); + this.queue = new ConcurrentLinkedQueue<>(); this.rootNode.setPreparer(true); this.addNode(rootNode); } @@ -103,10 +102,14 @@ public void prepare() { * Gets next node in the DAG which has no dependency or all of it's dependencies are resolved and * ready to be consumed. * - * @return next node or null if all the nodes have been explored + * @return next node or null if all the nodes have been explored or no node is available at this moment. */ public U getNext() { - return graph.get(queue.poll()); + String nextItemKey = queue.poll(); + if (nextItemKey == null) { + return null; + } + return graph.get(nextItemKey); } /** @@ -129,9 +132,14 @@ public void reportedCompleted(U completed) { String dependency = completed.key(); for (String dependentKey : graph.get(dependency).dependentKeys()) { DAGNode dependent = graph.get(dependentKey); - dependent.reportResolved(dependency); - if (dependent.hasAllResolved()) { - queue.add(dependent.key()); + dependent.lock().lock(); + try { + dependent.reportResolved(dependency); + if (dependent.hasAllResolved()) { + queue.add(dependent.key()); + } + } finally { + dependent.lock().unlock(); } } } @@ -145,9 +153,8 @@ public void reportedCompleted(U completed) { */ private void initializeDependentKeys() { visit(new Visitor() { - // This 'visit' will be called only once per each node. @Override - public void visit(U node) { + public void visitNode(U node) { if (node.dependencyKeys().isEmpty()) { return; } @@ -158,6 +165,13 @@ public void visit(U node) { .addDependent(dependentKey); } } + + @Override + public void visitEdge(String fromKey, String toKey, EdgeType edgeType) { + if (edgeType == EdgeType.BACK) { + throw new IllegalStateException("Detected circular dependency: " + findPath(fromKey, toKey)); + } + } }); } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/Graph.java b/azure-client-runtime/src/main/java/com/microsoft/azure/Graph.java index 40ceebaa50b2b..fccc893d714a5 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/Graph.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/Graph.java @@ -7,6 +7,7 @@ package com.microsoft.azure; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -23,6 +24,11 @@ public class Graph> { protected Map graph; private Set visited; + private Integer time; + private Map entryTime; + private Map exitTime; + private Map parent; + private Set processed; /** * Creates a directed graph. @@ -30,6 +36,11 @@ public class Graph> { public Graph() { this.graph = new HashMap<>(); this.visited = new HashSet<>(); + this.time = 0; + this.entryTime = new HashMap<>(); + this.exitTime = new HashMap<>(); + this.parent = new HashMap<>(); + this.processed = new HashSet<>(); } /** @@ -42,25 +53,17 @@ public void addNode(U node) { } /** - * Represents a visitor to be implemented by the consumer who want to visit the - * graph's nodes in DFS order. - * - * @param the type of the node + * @return all nodes in the graph. */ - interface Visitor { - /** - * visit a node. - * - * @param node the node to visited - */ - void visit(U node); + public Collection getNodes() { + return graph.values(); } /** * Perform DFS visit in this graph. *

* The directed graph will be traversed in DFS order and the visitor will be notified as - * search explores each node + * search explores each node and edge. * * @param visitor the graph visitor */ @@ -71,15 +74,107 @@ public void visit(Visitor visitor) { } } visited.clear(); + time = 0; + entryTime.clear(); + exitTime.clear(); + parent.clear(); + processed.clear(); } private void dfs(Visitor visitor, Node node) { - visitor.visit(node); - visited.add(node.key()); - for (String childKey : node.children()) { - if (!visited.contains(childKey)) { - this.dfs(visitor, this.graph.get(childKey)); + visitor.visitNode(node); + + String fromKey = node.key(); + visited.add(fromKey); + time++; + entryTime.put(fromKey, time); + for (String toKey : node.children()) { + if (!visited.contains(toKey)) { + parent.put(toKey, fromKey); + visitor.visitEdge(fromKey, toKey, edgeType(fromKey, toKey)); + this.dfs(visitor, this.graph.get(toKey)); + } else { + visitor.visitEdge(fromKey, toKey, edgeType(fromKey, toKey)); } } + time++; + exitTime.put(fromKey, time); + processed.add(fromKey); + } + + private EdgeType edgeType(String fromKey, String toKey) { + if (parent.containsKey(toKey) && parent.get(toKey).equals(fromKey)) { + return EdgeType.TREE; + } + + if (visited.contains(toKey) && !processed.contains(toKey)) { + return EdgeType.BACK; + } + + if (processed.contains(toKey) && entryTime.containsKey(toKey) && entryTime.containsKey(fromKey)) { + if (entryTime.get(toKey) > entryTime.get(fromKey)) { + return EdgeType.FORWARD; + } + + if (entryTime.get(toKey) < entryTime.get(fromKey)) { + return EdgeType.CROSS; + } + } + + throw new IllegalStateException("Internal Error: Unable to locate the edge type {" + fromKey + ", " + toKey + "}"); + } + + protected String findPath(String start, String end) { + if (start.equals(end)) { + return start; + } else { + return findPath(start, parent.get(end)) + " -> " + end; + } + } + + /** + * The edge types in a graph. + */ + enum EdgeType { + /** + * An edge (u, v) is a tree edge if v is visited the first time. + */ + TREE, + /** + * An edge (u, v) is a forward edge if v is descendant of u. + */ + FORWARD, + /** + * An edge (u, v) is a back edge if v is ancestor of u. + */ + BACK, + /** + * An edge (u, v) is a cross edge if v is neither ancestor or descendant of u. + */ + CROSS + } + + /** + * Represents a visitor to be implemented by the consumer who want to visit the + * graph's nodes in DFS order by calling visit method. + * + * @param the type of the node + */ + interface Visitor { + /** + * visit a node. + * + * @param node the node to visited + */ + void visitNode(U node); + + /** + * visit an edge. + * + * @param fromKey key of the from node + * @param toKey key of the to node + * @param edgeType the edge type + */ + void visitEdge(String fromKey, String toKey, EdgeType edgeType); } } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java index 26bbbece2606b..3ad514d93874a 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java @@ -63,7 +63,7 @@ public interface TaskGroup> { * @param callback the callback to call on failure or success * @return the handle to the REST call */ - ServiceCall executeAsync(ServiceCallback callback); + ServiceCall executeAsync(ServiceCallback callback); /** * Gets the result of execution of a task in the group. diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java index efc8d30e491bc..f3a4b231c8e2c 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java @@ -9,15 +9,20 @@ import com.microsoft.rest.ServiceCall; import com.microsoft.rest.ServiceCallback; +import com.microsoft.rest.ServiceResponse; + +import java.util.concurrent.ConcurrentLinkedQueue; /** * The base implementation of TaskGroup interface. * * @param the result type of the tasks in the group + * @param the task item */ -public abstract class TaskGroupBase - implements TaskGroup> { - private DAGraph, DAGNode>> dag; +public abstract class TaskGroupBase> + implements TaskGroup { + private DAGraph> dag; + private ParallelServiceCall parallelServiceCall; /** * Creates TaskGroupBase. @@ -25,12 +30,13 @@ public abstract class TaskGroupBase * @param rootTaskItemId the id of the root task in this task group * @param rootTaskItem the root task */ - public TaskGroupBase(String rootTaskItemId, TaskItem rootTaskItem) { + public TaskGroupBase(String rootTaskItemId, U rootTaskItem) { this.dag = new DAGraph<>(new DAGNode<>(rootTaskItemId, rootTaskItem)); + this.parallelServiceCall = new ParallelServiceCall(); } @Override - public DAGraph, DAGNode>> dag() { + public DAGraph> dag() { return dag; } @@ -40,7 +46,7 @@ public boolean isPreparer() { } @Override - public void merge(TaskGroup> parentTaskGroup) { + public void merge(TaskGroup parentTaskGroup) { dag.merge(parentTaskGroup.dag()); } @@ -53,30 +59,18 @@ public void prepare() { @Override public void execute() throws Exception { - DAGNode> nextNode = dag.getNext(); - if (nextNode == null) { - return; - } - - if (dag.isRootNode(nextNode)) { - executeRootTask(nextNode.data()); - } else { - nextNode.data().execute(this, nextNode); + DAGNode nextNode = dag.getNext(); + while (nextNode != null) { + nextNode.data().execute(); + this.dag().reportedCompleted(nextNode); + nextNode = dag.getNext(); } } @Override - public ServiceCall executeAsync(final ServiceCallback callback) { - final DAGNode> nextNode = dag.getNext(); - if (nextNode == null) { - return null; - } - - if (dag.isRootNode(nextNode)) { - return executeRootTaskAsync(nextNode.data(), callback); - } else { - return nextNode.data().executeAsync(this, nextNode, callback); - } + public ServiceCall executeAsync(final ServiceCallback callback) { + executeReadyTasksAsync(callback); + return parallelServiceCall; } @Override @@ -85,27 +79,94 @@ public T taskResult(String taskId) { } /** - * Executes the root task in this group. - *

- * This method will be invoked when all the task dependencies of the root task are finished - * executing, at this point root task can be executed by consuming the result of tasks it - * depends on. + * Executes all runnable tasks, a task is runnable when all the tasks its depends + * on are finished running. * - * @param task the root task in this group - * @throws Exception the exception + * @param callback the callback */ - public abstract void executeRootTask(TaskItem task) throws Exception; + private void executeReadyTasksAsync(final ServiceCallback callback) { + DAGNode nextNode = dag.getNext(); + while (nextNode != null) { + ServiceCall serviceCall = nextNode.data().executeAsync(taskCallback(nextNode, callback)); + this.parallelServiceCall.addCall(serviceCall); + nextNode = dag.getNext(); + } + } /** - * Executes the root task in this group asynchronously. - *

- * This method will be invoked when all the task dependencies of the root task are finished - * executing, at this point root task can be executed by consuming the result of tasks it - * depends on. + * This method create and return a callback for the runnable task stored in the given node. + * This callback wraps the given callback. * - * @param task the root task in this group - * @param callback the callback when the task fails or succeeds - * @return the handle to the REST call + * @param taskNode the node containing runnable task + * @param callback the callback to wrap + * @return the task callback + */ + private ServiceCallback taskCallback(final DAGNode taskNode, final ServiceCallback callback) { + final TaskGroupBase self = this; + return new ServiceCallback() { + @Override + public void failure(Throwable t) { + callback.failure(t); + parallelServiceCall.failure(t); + } + + @Override + public void success(ServiceResponse result) { + self.dag().reportedCompleted(taskNode); + if (self.dag().isRootNode(taskNode)) { + if (callback != null) { + callback.success(result); + } + parallelServiceCall.success(result); + } else { + self.executeReadyTasksAsync(callback); + } + } + }; + } + + /** + * Type represents a set of REST calls running possibly in parallel. */ - public abstract ServiceCall executeRootTaskAsync(TaskItem task, ServiceCallback callback); + private class ParallelServiceCall extends ServiceCall { + private ConcurrentLinkedQueue> serviceCalls; + + /** + * Creates a ParallelServiceCall. + */ + ParallelServiceCall() { + super(null); + this.serviceCalls = new ConcurrentLinkedQueue<>(); + } + + /** + * Cancels all the service calls currently executing. + */ + public void cancel() { + for (ServiceCall call : this.serviceCalls) { + call.cancel(); + } + } + + /** + * @return true if the call has been canceled; false otherwise. + */ + public boolean isCancelled() { + for (ServiceCall call : this.serviceCalls) { + if (!call.isCanceled()) { + return false; + } + } + return true; + } + + /** + * Add a call to the list of parallel calls. + * + * @param call the call + */ + private void addCall(ServiceCall call) { + this.serviceCalls.add(call); + } + } } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java index fb74c23845c3b..8f0a3459a2e92 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java @@ -26,21 +26,17 @@ public interface TaskItem { *

* once executed the result will be available through result getter * - * @param taskGroup the task group dispatching tasks - * @param node the node the task item is associated with * @throws Exception exception */ - void execute(TaskGroup> taskGroup, DAGNode> node) throws Exception; + void execute() throws Exception; /** * Executes the task asynchronously. *

* once executed the result will be available through result getter - - * @param taskGroup the task group dispatching tasks - * @param node the node the task item is associated with + * * @param callback callback to call on success or failure * @return the handle of the REST call */ - ServiceCall executeAsync(TaskGroup> taskGroup, DAGNode> node, ServiceCallback callback); + ServiceCall executeAsync(ServiceCallback callback); }