Skip to content

Commit

Permalink
Merge commit '2da17cd075822328b42931f8bc04f8e3b01eee7b' into pagingfix
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghaolu committed Aug 10, 2016
2 parents 012d715 + e32f486 commit 5e6d8d2
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 89 deletions.
18 changes: 9 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
language: android
android:
components:
- build-tools-23.0.1
- android-23
- platform-tools
- extra-android-support
- extra-google-m2repository
- extra-android-m2repository
- build-tools-23.0.1
- android-23
- platform-tools
- extra-android-support
- extra-google-m2repository
- extra-android-m2repository
sudo: false
script:
- mvn clean install
- mvn checkstyle:check
- cd ./azure-android-client-authentication && ./gradlew check
- mvn clean install
- mvn checkstyle:check
- cd ./azure-android-client-authentication && ./gradlew check
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,6 @@ public void run() {
} else {
ServiceException serviceException = new ServiceException("No async header in response");
pollingCallback.failure(serviceException);
serviceCall.failure(serviceException);
}
} else {
// Check if operation failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

/**
* The type representing node in a {@link DAGraph}.
Expand All @@ -20,6 +21,7 @@ public class DAGNode<T> extends Node<T> {
private List<String> dependentKeys;
private int toBeResolved;
private boolean isPreparer;
private ReentrantLock lock;

/**
* Creates a DAG node.
Expand All @@ -30,6 +32,14 @@ public class DAGNode<T> extends Node<T> {
public DAGNode(String key, T data) {
super(key, data);
dependentKeys = new ArrayList<>();
lock = new ReentrantLock();
}

/**
* @return the lock to be used while performing thread safe operation on this node.
*/
public ReentrantLock lock() {
return this.lock;
}

/**
Expand Down
36 changes: 25 additions & 11 deletions azure-client-runtime/src/main/java/com/microsoft/azure/DAGraph.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@

package com.microsoft.azure;

import java.util.ArrayDeque;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* Type representing a DAG (directed acyclic graph).
Expand All @@ -20,7 +19,7 @@
* @param <U> the type of the nodes in the graph
*/
public class DAGraph<T, U extends DAGNode<T>> extends Graph<T, U> {
private Queue<String> queue;
private ConcurrentLinkedQueue<String> queue;
private boolean hasParent;
private U rootNode;

Expand All @@ -31,7 +30,7 @@ public class DAGraph<T, U extends DAGNode<T>> extends Graph<T, U> {
*/
public DAGraph(U rootNode) {
this.rootNode = rootNode;
this.queue = new ArrayDeque<>();
this.queue = new ConcurrentLinkedQueue<>();
this.rootNode.setPreparer(true);
this.addNode(rootNode);
}
Expand Down Expand Up @@ -103,10 +102,14 @@ public void prepare() {
* Gets next node in the DAG which has no dependency or all of it's dependencies are resolved and
* ready to be consumed.
*
* @return next node or null if all the nodes have been explored
* @return next node or null if all the nodes have been explored or no node is available at this moment.
*/
public U getNext() {
return graph.get(queue.poll());
String nextItemKey = queue.poll();
if (nextItemKey == null) {
return null;
}
return graph.get(nextItemKey);
}

/**
Expand All @@ -129,9 +132,14 @@ public void reportedCompleted(U completed) {
String dependency = completed.key();
for (String dependentKey : graph.get(dependency).dependentKeys()) {
DAGNode<T> dependent = graph.get(dependentKey);
dependent.reportResolved(dependency);
if (dependent.hasAllResolved()) {
queue.add(dependent.key());
dependent.lock().lock();
try {
dependent.reportResolved(dependency);
if (dependent.hasAllResolved()) {
queue.add(dependent.key());
}
} finally {
dependent.lock().unlock();
}
}
}
Expand All @@ -145,9 +153,8 @@ public void reportedCompleted(U completed) {
*/
private void initializeDependentKeys() {
visit(new Visitor<U>() {
// This 'visit' will be called only once per each node.
@Override
public void visit(U node) {
public void visitNode(U node) {
if (node.dependencyKeys().isEmpty()) {
return;
}
Expand All @@ -158,6 +165,13 @@ public void visit(U node) {
.addDependent(dependentKey);
}
}

@Override
public void visitEdge(String fromKey, String toKey, EdgeType edgeType) {
if (edgeType == EdgeType.BACK) {
throw new IllegalStateException("Detected circular dependency: " + findPath(fromKey, toKey));
}
}
});
}

Expand Down
129 changes: 112 additions & 17 deletions azure-client-runtime/src/main/java/com/microsoft/azure/Graph.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package com.microsoft.azure;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -23,13 +24,23 @@
public class Graph<T, U extends Node<T>> {
protected Map<String, U> graph;
private Set<String> visited;
private Integer time;
private Map<String, Integer> entryTime;
private Map<String, Integer> exitTime;
private Map<String, String> parent;
private Set<String> processed;

/**
* Creates a directed graph.
*/
public Graph() {
this.graph = new HashMap<>();
this.visited = new HashSet<>();
this.time = 0;
this.entryTime = new HashMap<>();
this.exitTime = new HashMap<>();
this.parent = new HashMap<>();
this.processed = new HashSet<>();
}

/**
Expand All @@ -42,25 +53,17 @@ public void addNode(U node) {
}

/**
* Represents a visitor to be implemented by the consumer who want to visit the
* graph's nodes in DFS order.
*
* @param <U> the type of the node
* @return all nodes in the graph.
*/
interface Visitor<U> {
/**
* visit a node.
*
* @param node the node to visited
*/
void visit(U node);
public Collection<U> getNodes() {
return graph.values();
}

/**
* Perform DFS visit in this graph.
* <p>
* The directed graph will be traversed in DFS order and the visitor will be notified as
* search explores each node
* search explores each node and edge.
*
* @param visitor the graph visitor
*/
Expand All @@ -71,15 +74,107 @@ public void visit(Visitor visitor) {
}
}
visited.clear();
time = 0;
entryTime.clear();
exitTime.clear();
parent.clear();
processed.clear();
}

private void dfs(Visitor visitor, Node<T> node) {
visitor.visit(node);
visited.add(node.key());
for (String childKey : node.children()) {
if (!visited.contains(childKey)) {
this.dfs(visitor, this.graph.get(childKey));
visitor.visitNode(node);

String fromKey = node.key();
visited.add(fromKey);
time++;
entryTime.put(fromKey, time);
for (String toKey : node.children()) {
if (!visited.contains(toKey)) {
parent.put(toKey, fromKey);
visitor.visitEdge(fromKey, toKey, edgeType(fromKey, toKey));
this.dfs(visitor, this.graph.get(toKey));
} else {
visitor.visitEdge(fromKey, toKey, edgeType(fromKey, toKey));
}
}
time++;
exitTime.put(fromKey, time);
processed.add(fromKey);
}

private EdgeType edgeType(String fromKey, String toKey) {
if (parent.containsKey(toKey) && parent.get(toKey).equals(fromKey)) {
return EdgeType.TREE;
}

if (visited.contains(toKey) && !processed.contains(toKey)) {
return EdgeType.BACK;
}

if (processed.contains(toKey) && entryTime.containsKey(toKey) && entryTime.containsKey(fromKey)) {
if (entryTime.get(toKey) > entryTime.get(fromKey)) {
return EdgeType.FORWARD;
}

if (entryTime.get(toKey) < entryTime.get(fromKey)) {
return EdgeType.CROSS;
}
}

throw new IllegalStateException("Internal Error: Unable to locate the edge type {" + fromKey + ", " + toKey + "}");
}

protected String findPath(String start, String end) {
if (start.equals(end)) {
return start;
} else {
return findPath(start, parent.get(end)) + " -> " + end;
}
}

/**
* The edge types in a graph.
*/
enum EdgeType {
/**
* An edge (u, v) is a tree edge if v is visited the first time.
*/
TREE,
/**
* An edge (u, v) is a forward edge if v is descendant of u.
*/
FORWARD,
/**
* An edge (u, v) is a back edge if v is ancestor of u.
*/
BACK,
/**
* An edge (u, v) is a cross edge if v is neither ancestor or descendant of u.
*/
CROSS
}

/**
* Represents a visitor to be implemented by the consumer who want to visit the
* graph's nodes in DFS order by calling visit method.
*
* @param <U> the type of the node
*/
interface Visitor<U> {
/**
* visit a node.
*
* @param node the node to visited
*/
void visitNode(U node);

/**
* visit an edge.
*
* @param fromKey key of the from node
* @param toKey key of the to node
* @param edgeType the edge type
*/
void visitEdge(String fromKey, String toKey, EdgeType edgeType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public interface TaskGroup<T, U extends TaskItem<T>> {
* @param callback the callback to call on failure or success
* @return the handle to the REST call
*/
ServiceCall executeAsync(ServiceCallback<Void> callback);
ServiceCall executeAsync(ServiceCallback<T> callback);

/**
* Gets the result of execution of a task in the group.
Expand Down
Loading

0 comments on commit 5e6d8d2

Please sign in to comment.