Skip to content

Commit

Permalink
Add waitReplicas to Pipeline interface (#128)
Browse files Browse the repository at this point in the history
* Add waitReplicas to Pipeline interface
* Add simple testWaitReplicas test
  • Loading branch information
gkorland authored Jul 21, 2021
1 parent 5792322 commit 0237cd3
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.redislabs</groupId>
<artifactId>jredisgraph</artifactId>
<version>2.5.0-SNAPSHOT</version>
<version>2.5.1-SNAPSHOT</version>

<name>JRedisGraph</name>
<description>Official client for Redis-Graph</description>
Expand Down
24 changes: 13 additions & 11 deletions src/main/java/com/redislabs/redisgraph/RedisGraphPipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,6 @@ public interface RedisGraphPipeline extends
*/
Response<ResultSet> readOnlyQuery(String graphId, String query, long timeout);

/**
* Execute a Cypher query with arguments
* @param graphId a graph to perform the query on
* @param query Cypher query
* @param args
* @return a response which builds the result set with the query answer.
* @deprecated use {@link #query(String, String, Map)} instead.
*/
@Deprecated
Response<ResultSet> query(String graphId, String query, Object ...args);

/**
* Executes a cypher query with parameters.
* @param graphId a graph to perform the query on.
Expand Down Expand Up @@ -155,4 +144,17 @@ public interface RedisGraphPipeline extends
* commands you execute.
*/
public void sync();


/**
* Blocks until all the previous write commands are successfully transferred and acknowledged by
* at least the specified number of replicas. If the timeout, specified in milliseconds, is
* reached, the command returns even if the specified number of replicas were not yet reached.
* @param replicas successfully transferred and acknowledged by at least the specified number of
* replicas
* @param timeout the time to block in milliseconds, a timeout of 0 means to block forever
* @return the number of replicas reached by all the writes performed in the context of the
* current connection
*/
public Response<Long> waitReplicas(int replicas, long timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,28 +102,6 @@ public ResultSet build(Object o) {
});
}

/**
* Execute a Cypher query with arguments
*
* @param graphId a graph to perform the query on
* @param query Cypher query
* @param args
* @return response with a result set
* @deprecated use {@link #query(String, String, Map)} instead.
*/
@Deprecated
@Override
public Response<ResultSet> query(String graphId, String query, Object ...args){
String preparedQuery = Utils.prepareQuery(query, args);
client.sendCommand(RedisGraphCommand.QUERY, graphId, preparedQuery, Utils.COMPACT_STRING);
return getResponse(new Builder<ResultSet>() {
@Override
public ResultSet build(Object o) {
return new ResultSetImpl((List<Object>)o, redisGraph, caches.getGraphCache(graphId));
}
});
}

/**
* Executes a cypher query with parameters.
* @param graphId a graph to perform the query on.
Expand Down
22 changes: 17 additions & 5 deletions src/test/java/com/redislabs/redisgraph/PipelineTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ public void deleteGraph() {
}

@Test
public void testPipelineExec(){
public void testSync(){
try (RedisGraphContext c = api.getContext()) {
RedisGraphPipeline pipeline = api.getContext().pipelined();

RedisGraphPipeline pipeline = c.pipelined();
pipeline.set("x", "1");
pipeline.query("social", "CREATE (:Person {name:'a'})");
pipeline.query("g", "CREATE (:Person {name:'a'})");
Expand Down Expand Up @@ -118,9 +117,9 @@ record = resultSet.next();
}

@Test
public void testPipelineWithReadOnlyQueries(){
public void testReadOnlyQueries(){
try (RedisGraphContext c = api.getContext()) {
RedisGraphPipeline pipeline = api.getContext().pipelined();
RedisGraphPipeline pipeline = c.pipelined();

pipeline.set("x", "1");
pipeline.query("social", "CREATE (:Person {name:'a'})");
Expand Down Expand Up @@ -191,4 +190,17 @@ record = resultSet.next();
Assert.assertEquals("Person", record.getValue("label"));
}
}

@Test
public void testWaitReplicas(){
try (RedisGraphContext c = api.getContext()) {
RedisGraphPipeline pipeline = c.pipelined();
pipeline.set("x", "1");
pipeline.query("social", "CREATE (:Person {name:'a'})");
pipeline.query("g", "CREATE (:Person {name:'a'})");
pipeline.waitReplicas(0, 100L);
List<Object> results = pipeline.syncAndReturnAll();
Assert.assertEquals(0L, results.get(3));
}
}
}
4 changes: 2 additions & 2 deletions src/test/java/com/redislabs/redisgraph/TransactionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void deleteGraph() {
@Test
public void testMultiExec(){
try (RedisGraphContext c = api.getContext()) {
RedisGraphTransaction transaction = api.getContext().multi();
RedisGraphTransaction transaction = c.multi();

transaction.set("x", "1");
transaction.query("social", "CREATE (:Person {name:'a'})");
Expand Down Expand Up @@ -157,7 +157,7 @@ public void testReadTransactionWatch(){
@Test
public void testMultiExecWithReadOnlyQueries(){
try (RedisGraphContext c = api.getContext()) {
RedisGraphTransaction transaction = api.getContext().multi();
RedisGraphTransaction transaction = c.multi();

transaction.set("x", "1");
transaction.query("social", "CREATE (:Person {name:'a'})");
Expand Down

0 comments on commit 0237cd3

Please sign in to comment.