Skip to content

Commit

Permalink
add ttl
Browse files Browse the repository at this point in the history
  • Loading branch information
brianmhess committed May 23, 2017
1 parent f1ae410 commit 2ccfc0d
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 22 deletions.
3 changes: 3 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 0.0.27
- Added support for specifying a TTL (Issue 67)

## 0.0.26
- Fixed issue with long schemas (Issue 65)

Expand Down
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ loading of various types of delimited files, including
### Downloading

This utility has already been built, and is available at
https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.26/cassandra-loader
https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.27/cassandra-loader

Get it with wget:

```
wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.26/cassandra-loader
wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.27/cassandra-loader
```

### Building
Expand Down Expand Up @@ -97,6 +97,7 @@ cassandra-loader -f myFileToLoad.csv -host 1.2.3.4 -schema "test.ltest(a, b, c,
`-numFutures` | Number of Futures | 1000 | Number of Java driver futures in flight.
`-numRetries` | Number of retries | 1 | Number of times to retry the INSERT before declaring defeat.
`-queryTimeout` | Timeout in seconds | 2 | Amount of time to wait for a query to finish before timing out.
`-ttl` | Time To Live | none | TTL to use when inserting these rows
`-delim` | Delimiter | , | Delimiter to use
`-charsPerColumn`| Characters per column | 4096 | Maximum characters per column
`-nullString` | Null String | <empty string> | String to represent NULL data
Expand Down Expand Up @@ -190,7 +191,7 @@ When using `jsonline`, all JSON field names are case-sensitive. When using `jso
## Usage Statement:

```
version: 0.0.26
version: 0.0.27
Usage: -f <filename> -host <ipaddress> [OPTIONS]
OPTIONS:
-schema <schema> Table schema (when using delim)
Expand Down Expand Up @@ -233,6 +234,7 @@ OPTIONS:
-format [delim|jsonline|jsonarray] Format of data: delimited or JSON [delim]
-table <tableName> Table name (when using JSON)
-keyspace <keyspaceName> Keyspace name (when using JSON)
-ttl <TTL> TTL for all rows in this invocation [unset]
Examples:
Expand Down Expand Up @@ -311,7 +313,7 @@ cassandra-unloader -f stdout -host host1 -schema "ks.table(a,b,c)" | cassandra-l

Get it with wget:
```
wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.26/cassandra-unloader
wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.27/cassandra-unloader
```

To build, run:
Expand All @@ -330,7 +332,7 @@ cassandra-unloader
###Usage statement:

```
version: 0.0.24
version: 0.0.27
Usage: -f <outputStem> -host <ipaddress> -schema <schema> [OPTIONS]
OPTIONS:
-configFile <filename> File with configuration options
Expand Down
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
apply plugin: 'java'
apply plugin: 'application'

def versionNum = '0.0.26'
def versionNum = '0.0.27'

allprojects {
tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:unchecked"
options.compilerArgs << "-Xlint:deprecation"
}
}

Expand All @@ -29,7 +30,7 @@ repositories {
}

dependencies {
compile 'com.datastax.cassandra:cassandra-driver-core:3.1.0'
compile 'com.datastax.cassandra:cassandra-driver-core:3.2.0'
compile 'org.xerial.snappy:snappy-java:1.0.5'
compile 'net.jpountz.lz4:lz4:1.2.0'
compile 'ch.qos.logback:logback-classic:1.1.3'
Expand Down
24 changes: 18 additions & 6 deletions src/main/java/com/datastax/loader/CqlDelimLoad.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;

import com.codahale.metrics.Timer;
import org.apache.commons.lang3.StringEscapeUtils;

public class CqlDelimLoad {
private String version = "0.0.26";
private String version = "0.0.27";
private String host = null;
private int port = 9042;
private String username = null;
Expand All @@ -105,6 +105,8 @@ public class CqlDelimLoad {
private RateLimiter rateLimiter = null;
private String rateFile = null;
private PrintStream rateStream = null;
private Integer inTtl = null;
private int ttl = -1;

private String cqlSchema = null;
private String table = null;
Expand Down Expand Up @@ -180,6 +182,7 @@ private String usage() {
usage.append(" -format [delim|jsonline|jsonarray] Format of data: delimited or JSON [delim]\n");
usage.append(" -table <tableName> Table name (when using JSON)\n");
usage.append(" -keyspace <keyspaceName> Keyspace name (when using JSON)\n");
usage.append(" -ttl <TTL> TTL for all rows in this invocation [unset]\n");

usage.append("\n\nExamples:\n");
usage.append("cassandra-loader -f /path/to/file.csv -host localhost -schema \"test.test3(a, b, c)\"\n");
Expand Down Expand Up @@ -253,7 +256,7 @@ else if (format.equalsIgnoreCase("jsonline")
System.err.println("Progress rate must be non-negative");
return false;
}
if (numThreads < 1) {
if (1 > numThreads) {
System.err.println("Number of threads must be non-negative");
return false;
}
Expand Down Expand Up @@ -430,6 +433,7 @@ private boolean parseArgs(String[] args) throws IOException, FileNotFoundExcepti
if (null != (tkey = amap.remove("-numThreads"))) numThreads = Integer.parseInt(tkey);
if (null != (tkey = amap.remove("-rate"))) rate = Double.parseDouble(tkey);
if (null != (tkey = amap.remove("-progressRate"))) progressRate = Long.parseLong(tkey);
if (null != (tkey = amap.remove("-ttl"))) inTtl = new Integer(tkey);
if (null != (tkey = amap.remove("-rateFile"))) rateFile = tkey;
if (null != (tkey = amap.remove("-successDir"))) successDir = tkey;
if (null != (tkey = amap.remove("-failureDir"))) failureDir = tkey;
Expand Down Expand Up @@ -462,6 +466,14 @@ private boolean parseArgs(String[] args) throws IOException, FileNotFoundExcepti

if (0 < inNumFutures)
numFutures = inNumFutures / numThreads;
if (null != inTtl) {
if (1 > inTtl.intValue()) {
System.err.println("TTL must be greater than 1");
return false;
}
else
ttl = inTtl.intValue();
}

if (null != keyspace)
keyspace = quote(keyspace);
Expand Down Expand Up @@ -502,7 +514,7 @@ private SSLOptions createSSLOptions()
tmf != null ? tmf.getTrustManagers() : null,
new SecureRandom());

return JdkSSLOptions.builder().withSSLContext(sslContext).build();
return RemoteEndpointAwareJdkSSLOptions.builder().withSSLContext(sslContext).build();
}

private boolean setup()
Expand Down Expand Up @@ -642,7 +654,7 @@ public int compare(File f1, File f2) {
maxInsertErrors,
successDir, failureDir,
nullsUnset, format,
keyspace, table);
keyspace, table, ttl);
Future<Long> res = executor.submit(worker);
total = res.get();
executor.shutdown();
Expand All @@ -669,7 +681,7 @@ public int compare(File f1, File f2) {
maxInsertErrors,
successDir, failureDir,
nullsUnset, format,
keyspace, table);
keyspace, table, ttl);
results.add(executor.submit(worker));
}
executor.shutdown();
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/datastax/loader/CqlDelimLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class CqlDelimLoadTask implements Callable<Long> {
private int numFutures;
private int batchSize;
private long numInserted;
private int ttl = -1;

private String cqlSchema;
private Locale locale = null;
Expand Down Expand Up @@ -114,7 +115,7 @@ public CqlDelimLoadTask(String inCqlSchema, String inDelimiter,
int inQueryTimeout, long inMaxInsertErrors,
String inSuccessDir, String inFailureDir,
boolean inNullsUnset, String inFormat,
String inKeyspace, String inTable) {
String inKeyspace, String inTable, int inTtl) {
super();
cqlSchema = inCqlSchema;
delimiter = inDelimiter;
Expand Down Expand Up @@ -144,6 +145,7 @@ public CqlDelimLoadTask(String inCqlSchema, String inDelimiter,
format = inFormat;
keyspace = inKeyspace;
table = inTable;
ttl = inTtl;
}

public Long call() throws IOException, ParseException, org.json.simple.parser.ParseException {
Expand Down Expand Up @@ -188,15 +190,15 @@ private void setup() throws IOException, ParseException, org.json.simple.parser.
nullString, commentString,
dateFormatString, localDateFormatString,
boolStyle, locale,
skipCols, session, true);
skipCols, session, true, ttl);
}
else if (format.equalsIgnoreCase("jsonline")
|| format.equalsIgnoreCase("jsonarray")) {
cdp = new CqlDelimParser(keyspace, table, delimiter, charsPerColumn,
nullString, commentString,
dateFormatString, localDateFormatString,
boolStyle, locale,
skipCols, session, true);
skipCols, session, true, ttl);
}

insert = cdp.generateInsert();
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/datastax/loader/CqlDelimParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ public class CqlDelimParser {
private String tablename;
private DelimParser delimParser;
private JSONParser jsonParser;
private int ttl = -1;

public CqlDelimParser(String inCqlSchema, String inDelimiter, int inCharsPerColumn,
String inNullString, String inCommentString,
String inDateFormatString, String inLocalDateFormatString,
BooleanParser.BoolStyle inBoolStyle, Locale inLocale,
String skipList, Session session, boolean bLoader)
String skipList, Session session, boolean bLoader, int inTtl)
throws ParseException {
// Optionally provide things for the line parser - date format, boolean format, locale
ttl = inTtl;
initPmap(inDateFormatString, inLocalDateFormatString, inBoolStyle,
inLocale, bLoader);
processCqlSchema(inCqlSchema, session);
Expand All @@ -86,9 +88,10 @@ public CqlDelimParser(String inKeyspace, String inTable, String inDelimiter,
String inNullString, String inCommentString,
String inDateFormatString, String inLocalDateFormatString,
BooleanParser.BoolStyle inBoolStyle, Locale inLocale,
String skipList, Session session, boolean bLoader)
String skipList, Session session, boolean bLoader, int inTtl)
throws ParseException {
// Optionally provide things for the line parser - date format, boolean format, locale
ttl = inTtl;
keyspace = inKeyspace;
tablename = inTable;
initPmap(inDateFormatString, inLocalDateFormatString, inBoolStyle,
Expand Down Expand Up @@ -315,6 +318,8 @@ public String generateInsert() {
qmarks = qmarks + ", ?";
}
insert = insert + ") VALUES (" + qmarks + ")";
if (0 < ttl)
insert = insert + " USING TTL " + ttl;
return insert;
}

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/com/datastax/loader/CqlDelimUnload.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SSLOptions;
import com.datastax.driver.core.JdkSSLOptions;
import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.exceptions.QueryValidationException;


public class CqlDelimUnload {
private String version = "0.0.26";
private String version = "0.0.27";
private String host = null;
private int port = 9042;
private String username = null;
Expand Down Expand Up @@ -323,7 +323,7 @@ private SSLOptions createSSLOptions()
tmf != null ? tmf.getTrustManagers() : null,
new SecureRandom());

return JdkSSLOptions.builder().withSSLContext(sslContext).build();
return RemoteEndpointAwareJdkSSLOptions.builder().withSSLContext(sslContext).build();
}

private void setup()
Expand Down Expand Up @@ -560,7 +560,7 @@ private String getPartitionKey(CqlDelimParser cdp, Session session) {
private boolean setup() throws IOException, ParseException {
cdp = new CqlDelimParser(cqlSchema, delimiter, 4096, nullString,
null, dateFormatString, localDateFormatString,
boolStyle, locale, null, session, false);
boolStyle, locale, null, session, false, -1);
String select = cdp.generateSelect();
String partitionKey = getPartitionKey(cdp, session);
if (null != beginToken) {
Expand Down

0 comments on commit 2ccfc0d

Please sign in to comment.