diff --git a/ChangeLog.md b/ChangeLog.md index 743ed27..b2c4a6e 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,3 +1,6 @@ +## 0.0.27 +- Added support for specifying a TTL (Issue 67) + ## 0.0.26 - Fixed issue with long schemas (Issue 65) diff --git a/README.md b/README.md index 9cf8b23..6d287e3 100644 --- a/README.md +++ b/README.md @@ -18,12 +18,12 @@ loading of various types of delimited files, including ### Downloading This utility has already been built, and is available at -https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.26/cassandra-loader +https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.27/cassandra-loader Get it with wget: ``` -wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.26/cassandra-loader +wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.27/cassandra-loader ``` ### Building @@ -97,6 +97,7 @@ cassandra-loader -f myFileToLoad.csv -host 1.2.3.4 -schema "test.ltest(a, b, c, `-numFutures` | Number of Futures | 1000 | Number of Java driver futures in flight. `-numRetries` | Number of retries | 1 | Number of times to retry the INSERT before declaring defeat. `-queryTimeout` | Timeout in seconds | 2 | Amount of time to wait for a query to finish before timing out. + `-ttl` | Time To Live | none | TTL to use when inserting these rows `-delim` | Delimiter | , | Delimiter to use `-charsPerColumn`| Characters per column | 4096 | Maximum characters per column `-nullString` | Null String | <empty string> | String to represent NULL data @@ -190,7 +191,7 @@ When using `jsonline`, all JSON field names are case-sensitive. When using `jso ## Usage Statement: ``` -version: 0.0.26 +version: 0.0.27 Usage: -f -host [OPTIONS] OPTIONS: -schema Table schema (when using delim) @@ -233,6 +234,7 @@ OPTIONS: -format [delim|jsonline|jsonarray] Format of data: delimited or JSON [delim] -table Table name (when using JSON) -keyspace Keyspace name (when using JSON) + -ttl TTL for all rows in this invocation [unset] Examples: @@ -311,7 +313,7 @@ cassandra-unloader -f stdout -host host1 -schema "ks.table(a,b,c)" | cassandra-l Get it with wget: ``` -wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.26/cassandra-unloader +wget https://github.com/brianmhess/cassandra-loader/releases/download/v0.0.27/cassandra-unloader ``` To build, run: @@ -330,7 +332,7 @@ cassandra-unloader ###Usage statement: ``` -version: 0.0.24 +version: 0.0.27 Usage: -f -host -schema [OPTIONS] OPTIONS: -configFile File with configuration options diff --git a/build.gradle b/build.gradle index 3ceb503..4813e2b 100644 --- a/build.gradle +++ b/build.gradle @@ -1,11 +1,12 @@ apply plugin: 'java' apply plugin: 'application' -def versionNum = '0.0.26' +def versionNum = '0.0.27' allprojects { tasks.withType(JavaCompile) { options.compilerArgs << "-Xlint:unchecked" + options.compilerArgs << "-Xlint:deprecation" } } @@ -29,7 +30,7 @@ repositories { } dependencies { - compile 'com.datastax.cassandra:cassandra-driver-core:3.1.0' + compile 'com.datastax.cassandra:cassandra-driver-core:3.2.0' compile 'org.xerial.snappy:snappy-java:1.0.5' compile 'net.jpountz.lz4:lz4:1.2.0' compile 'ch.qos.logback:logback-classic:1.1.3' diff --git a/src/main/java/com/datastax/loader/CqlDelimLoad.java b/src/main/java/com/datastax/loader/CqlDelimLoad.java index ad95c45..d7d98fb 100644 --- a/src/main/java/com/datastax/loader/CqlDelimLoad.java +++ b/src/main/java/com/datastax/loader/CqlDelimLoad.java @@ -75,7 +75,7 @@ import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.SSLOptions; -import com.datastax.driver.core.JdkSSLOptions; +import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions; import com.datastax.driver.core.policies.TokenAwarePolicy; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; @@ -83,7 +83,7 @@ import org.apache.commons.lang3.StringEscapeUtils; public class CqlDelimLoad { - private String version = "0.0.26"; + private String version = "0.0.27"; private String host = null; private int port = 9042; private String username = null; @@ -105,6 +105,8 @@ public class CqlDelimLoad { private RateLimiter rateLimiter = null; private String rateFile = null; private PrintStream rateStream = null; + private Integer inTtl = null; + private int ttl = -1; private String cqlSchema = null; private String table = null; @@ -180,6 +182,7 @@ private String usage() { usage.append(" -format [delim|jsonline|jsonarray] Format of data: delimited or JSON [delim]\n"); usage.append(" -table Table name (when using JSON)\n"); usage.append(" -keyspace Keyspace name (when using JSON)\n"); + usage.append(" -ttl TTL for all rows in this invocation [unset]\n"); usage.append("\n\nExamples:\n"); usage.append("cassandra-loader -f /path/to/file.csv -host localhost -schema \"test.test3(a, b, c)\"\n"); @@ -253,7 +256,7 @@ else if (format.equalsIgnoreCase("jsonline") System.err.println("Progress rate must be non-negative"); return false; } - if (numThreads < 1) { + if (1 > numThreads) { System.err.println("Number of threads must be non-negative"); return false; } @@ -430,6 +433,7 @@ private boolean parseArgs(String[] args) throws IOException, FileNotFoundExcepti if (null != (tkey = amap.remove("-numThreads"))) numThreads = Integer.parseInt(tkey); if (null != (tkey = amap.remove("-rate"))) rate = Double.parseDouble(tkey); if (null != (tkey = amap.remove("-progressRate"))) progressRate = Long.parseLong(tkey); + if (null != (tkey = amap.remove("-ttl"))) inTtl = new Integer(tkey); if (null != (tkey = amap.remove("-rateFile"))) rateFile = tkey; if (null != (tkey = amap.remove("-successDir"))) successDir = tkey; if (null != (tkey = amap.remove("-failureDir"))) failureDir = tkey; @@ -462,6 +466,14 @@ private boolean parseArgs(String[] args) throws IOException, FileNotFoundExcepti if (0 < inNumFutures) numFutures = inNumFutures / numThreads; + if (null != inTtl) { + if (1 > inTtl.intValue()) { + System.err.println("TTL must be greater than 1"); + return false; + } + else + ttl = inTtl.intValue(); + } if (null != keyspace) keyspace = quote(keyspace); @@ -502,7 +514,7 @@ private SSLOptions createSSLOptions() tmf != null ? tmf.getTrustManagers() : null, new SecureRandom()); - return JdkSSLOptions.builder().withSSLContext(sslContext).build(); + return RemoteEndpointAwareJdkSSLOptions.builder().withSSLContext(sslContext).build(); } private boolean setup() @@ -642,7 +654,7 @@ public int compare(File f1, File f2) { maxInsertErrors, successDir, failureDir, nullsUnset, format, - keyspace, table); + keyspace, table, ttl); Future res = executor.submit(worker); total = res.get(); executor.shutdown(); @@ -669,7 +681,7 @@ public int compare(File f1, File f2) { maxInsertErrors, successDir, failureDir, nullsUnset, format, - keyspace, table); + keyspace, table, ttl); results.add(executor.submit(worker)); } executor.shutdown(); diff --git a/src/main/java/com/datastax/loader/CqlDelimLoadTask.java b/src/main/java/com/datastax/loader/CqlDelimLoadTask.java index 72910e2..1e45446 100644 --- a/src/main/java/com/datastax/loader/CqlDelimLoadTask.java +++ b/src/main/java/com/datastax/loader/CqlDelimLoadTask.java @@ -79,6 +79,7 @@ class CqlDelimLoadTask implements Callable { private int numFutures; private int batchSize; private long numInserted; + private int ttl = -1; private String cqlSchema; private Locale locale = null; @@ -114,7 +115,7 @@ public CqlDelimLoadTask(String inCqlSchema, String inDelimiter, int inQueryTimeout, long inMaxInsertErrors, String inSuccessDir, String inFailureDir, boolean inNullsUnset, String inFormat, - String inKeyspace, String inTable) { + String inKeyspace, String inTable, int inTtl) { super(); cqlSchema = inCqlSchema; delimiter = inDelimiter; @@ -144,6 +145,7 @@ public CqlDelimLoadTask(String inCqlSchema, String inDelimiter, format = inFormat; keyspace = inKeyspace; table = inTable; + ttl = inTtl; } public Long call() throws IOException, ParseException, org.json.simple.parser.ParseException { @@ -188,7 +190,7 @@ private void setup() throws IOException, ParseException, org.json.simple.parser. nullString, commentString, dateFormatString, localDateFormatString, boolStyle, locale, - skipCols, session, true); + skipCols, session, true, ttl); } else if (format.equalsIgnoreCase("jsonline") || format.equalsIgnoreCase("jsonarray")) { @@ -196,7 +198,7 @@ else if (format.equalsIgnoreCase("jsonline") nullString, commentString, dateFormatString, localDateFormatString, boolStyle, locale, - skipCols, session, true); + skipCols, session, true, ttl); } insert = cdp.generateInsert(); diff --git a/src/main/java/com/datastax/loader/CqlDelimParser.java b/src/main/java/com/datastax/loader/CqlDelimParser.java index cf35296..c570574 100644 --- a/src/main/java/com/datastax/loader/CqlDelimParser.java +++ b/src/main/java/com/datastax/loader/CqlDelimParser.java @@ -67,14 +67,16 @@ public class CqlDelimParser { private String tablename; private DelimParser delimParser; private JSONParser jsonParser; + private int ttl = -1; public CqlDelimParser(String inCqlSchema, String inDelimiter, int inCharsPerColumn, String inNullString, String inCommentString, String inDateFormatString, String inLocalDateFormatString, BooleanParser.BoolStyle inBoolStyle, Locale inLocale, - String skipList, Session session, boolean bLoader) + String skipList, Session session, boolean bLoader, int inTtl) throws ParseException { // Optionally provide things for the line parser - date format, boolean format, locale + ttl = inTtl; initPmap(inDateFormatString, inLocalDateFormatString, inBoolStyle, inLocale, bLoader); processCqlSchema(inCqlSchema, session); @@ -86,9 +88,10 @@ public CqlDelimParser(String inKeyspace, String inTable, String inDelimiter, String inNullString, String inCommentString, String inDateFormatString, String inLocalDateFormatString, BooleanParser.BoolStyle inBoolStyle, Locale inLocale, - String skipList, Session session, boolean bLoader) + String skipList, Session session, boolean bLoader, int inTtl) throws ParseException { // Optionally provide things for the line parser - date format, boolean format, locale + ttl = inTtl; keyspace = inKeyspace; tablename = inTable; initPmap(inDateFormatString, inLocalDateFormatString, inBoolStyle, @@ -315,6 +318,8 @@ public String generateInsert() { qmarks = qmarks + ", ?"; } insert = insert + ") VALUES (" + qmarks + ")"; + if (0 < ttl) + insert = insert + " USING TTL " + ttl; return insert; } diff --git a/src/main/java/com/datastax/loader/CqlDelimUnload.java b/src/main/java/com/datastax/loader/CqlDelimUnload.java index e025e02..472e33b 100644 --- a/src/main/java/com/datastax/loader/CqlDelimUnload.java +++ b/src/main/java/com/datastax/loader/CqlDelimUnload.java @@ -67,14 +67,14 @@ import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.Row; import com.datastax.driver.core.SSLOptions; -import com.datastax.driver.core.JdkSSLOptions; +import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions; import com.datastax.driver.core.policies.TokenAwarePolicy; import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; import com.datastax.driver.core.exceptions.QueryValidationException; public class CqlDelimUnload { - private String version = "0.0.26"; + private String version = "0.0.27"; private String host = null; private int port = 9042; private String username = null; @@ -323,7 +323,7 @@ private SSLOptions createSSLOptions() tmf != null ? tmf.getTrustManagers() : null, new SecureRandom()); - return JdkSSLOptions.builder().withSSLContext(sslContext).build(); + return RemoteEndpointAwareJdkSSLOptions.builder().withSSLContext(sslContext).build(); } private void setup() @@ -560,7 +560,7 @@ private String getPartitionKey(CqlDelimParser cdp, Session session) { private boolean setup() throws IOException, ParseException { cdp = new CqlDelimParser(cqlSchema, delimiter, 4096, nullString, null, dateFormatString, localDateFormatString, - boolStyle, locale, null, session, false); + boolStyle, locale, null, session, false, -1); String select = cdp.generateSelect(); String partitionKey = getPartitionKey(cdp, session); if (null != beginToken) {