+ */
+public class AccumuloBatchConfiguration {
+
+ private static final Integer DEFAULT_NUM_QUERY_THREADS = 3;
+ private Integer numQueryThreads = null;
+ private static final Long DEFAULT_MAX_MEMORY = 50 * 1024 * 1024l;
+ private Long maxMemory = null;
+ private static final Long DEFAULT_MAX_LATENCY = 2 * 60 * 1000l;
+ private Long maxLatency = null;
+ private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE;
+ private Long timeout = null;
+ private static final Integer DEFAULT_MAX_WRITE_THREADS = 3;
+ private Integer maxWriteThreads = null;
+
+ /**
+ * Sets the number of threads to spawn for querying tablet servers.
+ *
+ *
+ * Default: 3
+ *
+ * @param numQueryThreads the number threads to use
+ * @throws IllegalArgumentException if {@code maxWriteThreads} is
+ * non-positive
+ * @return {@code this} to allow chaining of set methods
+ */
+ public AccumuloBatchConfiguration setNumQueryThreads(int numQueryThreads) {
+ if (numQueryThreads <= 0) {
+ throw new IllegalArgumentException("Num threads must be positive " + numQueryThreads);
+ }
+
+ this.numQueryThreads = numQueryThreads;
+ return this;
+ }
+
+ /**
+ * Sets the maximum memory to batch before writing. The smaller this value,
+ * the more frequently the {@link BatchWriter} will write.
+ * If set to a value smaller than a single mutation, then it will
+ * {@link BatchWriter#flush()} after each added mutation. Must be
+ * non-negative.
+ *
+ *
+ * Default: 50M
+ *
+ * @param maxMemory max size in bytes
+ * @throws IllegalArgumentException if {@code maxMemory} is less than 0
+ * @return {@code this} to allow chaining of set methods
+ */
+ public AccumuloBatchConfiguration setMaxMemory(long maxMemory) {
+ if (maxMemory < 0) {
+ throw new IllegalArgumentException("Max memory must be non-negative.");
+ }
+ this.maxMemory = maxMemory;
+ return this;
+ }
+
+ /**
+ * Sets the maximum amount of time to hold the data in memory before
+ * flushing it to servers.
+ * For no maximum, set to zero, or {@link Long#MAX_VALUE} with
+ * {@link TimeUnit#MILLISECONDS}.
+ *
+ *
{@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be
+ * truncated to the nearest {@link TimeUnit#MILLISECONDS}.
+ * If this truncation would result in making the value zero when it was
+ * specified as non-zero, then a minimum value of one
+ * {@link TimeUnit#MILLISECONDS} will be used.
+ *
+ *
+ * Default: 120 seconds
+ *
+ * @param maxLatency the maximum latency, in the unit specified by the value
+ * of {@code timeUnit}
+ * @param timeUnit determines how {@code maxLatency} will be interpreted
+ * @throws IllegalArgumentException if {@code maxLatency} is less than 0
+ * @return {@code this} to allow chaining of set methods
+ */
+ public AccumuloBatchConfiguration setMaxLatency(long maxLatency, TimeUnit timeUnit) {
+ if (maxLatency < 0) {
+ throw new IllegalArgumentException("Negative max latency not allowed " + maxLatency);
+ }
+
+ if (maxLatency == 0) {
+ this.maxLatency = Long.MAX_VALUE;
+ } else { // make small, positive values that truncate to 0 when converted use the minimum millis instead
+ this.maxLatency = Math.max(1, timeUnit.toMillis(maxLatency));
+ }
+ return this;
+ }
+
+ /**
+ * Sets the maximum amount of time an unresponsive server will be re-tried.
+ * When this timeout is exceeded, the {@link BatchWriter} should throw an
+ * exception.
+ * For no timeout, set to zero, or {@link Long#MAX_VALUE} with
+ * {@link TimeUnit#MILLISECONDS}.
+ *
+ *
{@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be
+ * truncated to the nearest {@link TimeUnit#MILLISECONDS}.
+ * If this truncation would result in making the value zero when it was
+ * specified as non-zero, then a minimum value of one
+ * {@link TimeUnit#MILLISECONDS} will be used.
+ *
+ *
+ * Default: {@link Long#MAX_VALUE} (no timeout)
+ *
+ * @param timeout the timeout, in the unit specified by the value of
+ * {@code timeUnit}
+ * @param timeUnit determines how {@code timeout} will be interpreted
+ * @throws IllegalArgumentException if {@code timeout} is less than 0
+ * @return {@code this} to allow chaining of set methods
+ */
+ public AccumuloBatchConfiguration setTimeout(long timeout, TimeUnit timeUnit) {
+ if (timeout < 0) {
+ throw new IllegalArgumentException("Negative timeout not allowed " + timeout);
+ }
+
+ if (timeout == 0) {
+ this.timeout = Long.MAX_VALUE;
+ } else { // make small, positive values that truncate to 0 when converted use the minimum millis instead
+ this.timeout = Math.max(1, timeUnit.toMillis(timeout));
+ }
+ return this;
+ }
+
+ /**
+ * Sets the maximum number of threads to use for writing data to the tablet
+ * servers.
+ *
+ *
+ * Default: 3
+ *
+ * @param maxWriteThreads the maximum threads to use
+ * @throws IllegalArgumentException if {@code maxWriteThreads} is
+ * non-positive
+ * @return {@code this} to allow chaining of set methods
+ */
+ public AccumuloBatchConfiguration setMaxWriteThreads(int maxWriteThreads) {
+ if (maxWriteThreads <= 0) {
+ throw new IllegalArgumentException("Max threads must be positive " + maxWriteThreads);
+ }
+
+ this.maxWriteThreads = maxWriteThreads;
+ return this;
+ }
+
+ /**
+ * Get number of threads to spawn for querying on tablet servers.
+ *
+ * @return number of threads
+ */
+ public int getNumQueryThreads() {
+ return numQueryThreads != null ? numQueryThreads : DEFAULT_NUM_QUERY_THREADS;
+ }
+
+ /**
+ * Sets the maximum memory to batch before writing.
+ *
+ * @return max memory
+ */
+ public long getMaxMemory() {
+ return maxMemory != null ? maxMemory : DEFAULT_MAX_MEMORY;
+ }
+
+ /**
+ * Sets the maximum amount of time to hold the data in memory before flushing it to servers.
+ *
+ * @param timeUnit units for return value
+ * @return max latency
+ */
+ public long getMaxLatency(TimeUnit timeUnit) {
+ return timeUnit.convert(maxLatency != null ? maxLatency : DEFAULT_MAX_LATENCY, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Get the maximum amount of time an unresponsive server will be re-tried.
+ *
+ * @param timeUnit units for return value
+ * @return max timeout
+ */
+ public long getTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Get the maximum number of threads to use for writing data to the tablet servers.
+ *
+ * @return max threads
+ */
+ public int getMaxWriteThreads() {
+ return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS;
+ }
+
+ /**
+ * Factory method to create a BatchDeleter connected to Accumulo.
+ *
+ * @param connector connection to Accumulo
+ * @param tableName the name of the table to query and delete from
+ * @param authorizations set of authorization labels that will be checked against the column visibility.
+ * @return BatchDeleter object for configuring and deleting
+ * @throws TableNotFoundException when the specified table doesn't exist
+ */
+ public BatchDeleter createBatchDeleter(Connector connector, String tableName, Authorizations authorizations) throws TableNotFoundException {
+ return connector.createBatchDeleter(tableName, authorizations,
+ getNumQueryThreads(), getMaxMemory(), getMaxLatency(TimeUnit.MILLISECONDS), getMaxWriteThreads());
+ }
+
+ /**
+ * Factory method to create a BatchScanner connected to Accumulo.
+ *
+ * @param connector connection to Accumulo
+ * @param tableName the name of the table to query and delete from
+ * @param authorizations set of authorization labels that will be checked against the column visibility.
+ * @return BatchScanner object for configuring and querying
+ * @throws TableNotFoundException when the specified table doesn't exist
+ */
+ public BatchScanner createBatchScanner(Connector connector, String tableName, Authorizations authorizations) throws TableNotFoundException {
+ return connector.createBatchScanner(tableName, authorizations, getNumQueryThreads());
+ }
+
+ /**
+ * Factory method to create a BatchWriter connected to Accumulo.
+ *
+ * @param connector connection to Accumulo
+ * @param tableName the name of the table to insert data into
+ * @return BatchWriter object for configuring and writing data
+ * @throws TableNotFoundException when the specified table doesn't exist
+ */
+ public BatchWriter createBatchWriter(Connector connector, String tableName) throws TableNotFoundException {
+ return connector.createBatchWriter(tableName,
+ getMaxMemory(), getMaxLatency(TimeUnit.MILLISECONDS), getMaxWriteThreads());
+ }
+}
\ No newline at end of file
diff --git a/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloInstanceFactory.java b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloInstanceFactory.java
new file mode 100644
index 0000000000..7c28eb4ff1
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloInstanceFactory.java
@@ -0,0 +1,25 @@
+package com.thinkaurelius.titan.diskstorage.accumulo;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+
+/**
+ * Instance factory for Accumulo store configuration.
+ *
+ * @author Etienne Deprit
+ */
+public interface AccumuloInstanceFactory {
+
+ public Instance getInstance(String instanceName, String zooKeepers);
+
+ /*
+ * Default Zookeeper instance factory.
+ */
+ public static final AccumuloInstanceFactory ZOOKEEPER_INSTANCE_FACTORY =
+ new AccumuloInstanceFactory() {
+ @Override
+ public Instance getInstance(String instanceName, String zooKeepers) {
+ return new ZooKeeperInstance(instanceName, zooKeepers);
+ }
+ };
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloKeyColumnValueStore.java b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloKeyColumnValueStore.java
new file mode 100644
index 0000000000..e05a731ca3
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloKeyColumnValueStore.java
@@ -0,0 +1,441 @@
+package com.thinkaurelius.titan.diskstorage.accumulo;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+import com.thinkaurelius.titan.diskstorage.PermanentStorageException;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.TemporaryStorageException;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.Entry;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyIterator;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRangeQuery;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeySliceQuery;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.SliceQuery;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StaticBufferEntry;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
+import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.FirstEntryInRowIterator;
+import com.thinkaurelius.titan.diskstorage.accumulo.iterators.ColumnRangeFilter;
+import com.thinkaurelius.titan.diskstorage.accumulo.util.CallableFunction;
+import com.thinkaurelius.titan.diskstorage.accumulo.util.ConcurrentLists;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Key-Column value store for Accumulo.
+ *
+ * @author Etienne Deprit
+ */
+public class AccumuloKeyColumnValueStore implements KeyColumnValueStore {
+
+ private static final Logger logger = LoggerFactory.getLogger(AccumuloKeyColumnValueStore.class);
+ // Default parameters
+ private static final int NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
+ private static final Authorizations AUTHORIZATIONS_DEFAULT = new Authorizations();
+ // Instance variables
+ private final Connector connector; // thread-safe
+ private final String tableName;
+ private final String columnFamily;
+ private final byte[] columnFamilyBytes;
+ private final Text columnFamilyText;
+ private final AccumuloBatchConfiguration batchConfiguration;
+ private final boolean serverSideIterators;
+
+ AccumuloKeyColumnValueStore(Connector connector, String tableName, String columnFamily,
+ AccumuloBatchConfiguration batchConfiguration, boolean serverSideIterators) {
+ this.connector = connector;
+ this.tableName = tableName;
+ this.columnFamily = columnFamily;
+ this.columnFamilyBytes = columnFamily.getBytes();
+ this.columnFamilyText = new Text(columnFamily);
+ this.batchConfiguration = batchConfiguration;
+ this.serverSideIterators = serverSideIterators;
+ }
+
+ @Override
+ public String getName() {
+ return columnFamily;
+ }
+
+ @Override
+ public void close() throws StorageException {
+ }
+
+ @Override
+ public boolean containsKey(StaticBuffer key, StoreTransaction txh) throws StorageException {
+ Scanner scanner;
+ try {
+ scanner = connector.createScanner(tableName, AUTHORIZATIONS_DEFAULT);
+ } catch (TableNotFoundException ex) {
+ logger.error("Can't find Titan store " + tableName, ex);
+ throw new PermanentStorageException(ex);
+ }
+
+ byte[] keyBytes = key.as(StaticBuffer.ARRAY_FACTORY);
+ scanner.setRange(new Range(new Text(keyBytes)));
+ scanner.fetchColumnFamily(columnFamilyText);
+
+ return scanner.iterator().hasNext();
+ }
+
+ @Override
+ public List getSlice(KeySliceQuery query, StoreTransaction txh) throws StorageException {
+ Scanner scanner;
+ try {
+ scanner = connector.createScanner(tableName, AUTHORIZATIONS_DEFAULT);
+ } catch (TableNotFoundException ex) {
+ logger.error("Can't find Titan store " + tableName, ex);
+ throw new PermanentStorageException(ex);
+ }
+
+ scanner.setRange(getRange(query));
+ if (query.getLimit() < scanner.getBatchSize()) {
+ scanner.setBatchSize(query.getLimit());
+ }
+
+ int count = 1;
+ List slice = new ArrayList();
+ for (Map.Entry kv : scanner) {
+ if (count > query.getLimit()) {
+ break;
+ }
+ slice.add(getEntry(kv));
+ count++;
+ }
+
+ return slice;
+ }
+
+ @Override
+ public List> getSlice(List keys, final SliceQuery query, final StoreTransaction txh) throws StorageException {
+ List> slices = ConcurrentLists.transform(keys,
+ new CallableFunction>() {
+ @Override
+ public List apply(StaticBuffer key) throws Exception {
+ return getSlice(new KeySliceQuery(key, query), txh);
+ }
+ });
+
+ return slices;
+ }
+
+ @Override
+ public void mutate(StaticBuffer key, List additions, List deletions,
+ StoreTransaction txh) throws StorageException {
+
+ Text keyText = new Text(key.as(StaticBuffer.ARRAY_FACTORY));
+ List batch = makeBatch(columnFamilyText, keyText, additions, deletions);
+
+ if (batch.isEmpty()) {
+ return; // nothing to apply
+ }
+
+ try {
+ BatchWriter writer = batchConfiguration.createBatchWriter(connector, tableName);
+ try {
+ writer.addMutations(batch);
+ writer.flush();
+ } catch (MutationsRejectedException ex) {
+ logger.error("Can't write mutations to Titan store " + tableName, ex);
+ throw new TemporaryStorageException(ex);
+ } finally {
+ try {
+ writer.close();
+ } catch (MutationsRejectedException ex) {
+ logger.error("Can't write mutations to Titan store " + tableName, ex);
+ }
+ }
+ } catch (TableNotFoundException ex) {
+ logger.error("Can't find Titan store " + tableName, ex);
+ throw new PermanentStorageException(ex);
+ }
+ }
+
+ private static List makeBatch(Text colFamily, Text key, List additions, List deletions) {
+ if (additions.isEmpty() && deletions.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List batch = new ArrayList(2);
+
+ if (!additions.isEmpty()) {
+ Mutation put = makePutMutation(colFamily, key, additions);
+ batch.add(put);
+ }
+
+ if (!deletions.isEmpty()) {
+ Mutation delete = makeDeleteMutation(colFamily, key, deletions);
+ batch.add(delete);
+ }
+
+ return batch;
+ }
+
+ /**
+ * Convert Titan deletions into Accumulo delete {@code Mutation}.
+ *
+ * @param colFamily Name of column family for deletions
+ * @param key Row key
+ * @param deletions Name of column qualifiers to delete
+ *
+ * @return Delete command or null if deletions were null or empty.
+ */
+ private static Mutation makeDeleteMutation(Text colFamily, Text key, List deletions) {
+ Preconditions.checkArgument(!deletions.isEmpty());
+
+ Mutation mutation = new Mutation(new Text(key));
+ for (StaticBuffer del : deletions) {
+ Text colQual = new Text(del.as(StaticBuffer.ARRAY_FACTORY));
+ mutation.putDelete(new Text(colFamily), colQual);
+ }
+ return mutation;
+ }
+
+ /**
+ * Convert Titan {@code Entry} modifications entries into Accumulo put
+ * {@code Mutation}.
+ *
+ * @param colFamily Name of column family for modifications
+ * @param key Row key
+ * @param modifications Entries to insert/update.
+ *
+ * @return Put command or null if additions were null or empty.
+ */
+ private static Mutation makePutMutation(Text colFamily, Text key, List modifications) {
+ Preconditions.checkArgument(!modifications.isEmpty());
+
+ Mutation mutation = new Mutation(new Text(key));
+ for (Entry entry : modifications) {
+ Text colQual = new Text(entry.getArrayColumn());
+ byte[] value = entry.getArrayValue();
+ mutation.put(colFamily, colQual, new Value(value));
+ }
+ return mutation;
+ }
+
+ @Override
+ public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue,
+ StoreTransaction txh) throws StorageException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StaticBuffer[] getLocalKeyPartition() throws StorageException {
+ throw new UnsupportedOperationException(); // Accumulo stores do not support local key partitions.
+ }
+
+ @Override
+ public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws StorageException {
+ return executeKeySliceQuery(query.getKeyStart(), query.getKeyEnd(), query, false);
+ }
+
+ @Override
+ public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws StorageException {
+ return executeKeySliceQuery(query, false);
+ }
+
+ private KeyIterator executeKeySliceQuery(SliceQuery columnSlice, boolean keysOnly) throws StorageException {
+ return executeKeySliceQuery(null, null, columnSlice, keysOnly);
+ }
+
+ private KeyIterator executeKeySliceQuery(StaticBuffer startKey, StaticBuffer endKey,
+ SliceQuery columnSlice, boolean keysOnly) throws StorageException {
+
+ Scanner scanner = getKeySliceScanner(startKey, endKey, columnSlice, keysOnly);
+
+ return new RowKeyIterator(scanner);
+ }
+
+ private Scanner getKeySliceScanner(StaticBuffer startKey, StaticBuffer endKey,
+ SliceQuery columnSlice, boolean keysOnly) throws StorageException {
+
+ Scanner scanner;
+ try {
+ scanner = connector.createScanner(tableName, AUTHORIZATIONS_DEFAULT);
+
+ } catch (TableNotFoundException ex) {
+ logger.error("Can't find Titan store " + tableName, ex);
+ throw new PermanentStorageException(ex);
+ }
+
+ scanner.fetchColumnFamily(columnFamilyText);
+
+ IteratorSetting columnSliceIterator = null;
+ if (columnSlice != null) {
+ columnSliceIterator = getColumnSliceIterator(columnSlice);
+
+ if (columnSliceIterator != null) {
+ if (!serverSideIterators) {
+ scanner = new ClientSideIteratorScanner(scanner);
+ }
+ scanner.addScanIterator(columnSliceIterator);
+ }
+ }
+
+ if (keysOnly) {
+ IteratorSetting firstRowKeyIterator = new IteratorSetting(15, "firstRowKeyIter", FirstEntryInRowIterator.class);
+ scanner.addScanIterator(firstRowKeyIterator);
+ }
+
+ Range range = getRange(startKey, endKey);
+ scanner.setRange(range);
+
+ return scanner;
+ }
+
+ private static Entry getEntry(Map.Entry keyValue) {
+ byte[] colQual = keyValue.getKey().getColumnQualifier().getBytes();
+ byte[] value = keyValue.getValue().get();
+ return StaticBufferEntry.of(new StaticArrayBuffer(colQual), new StaticArrayBuffer(value));
+ }
+
+ private Range getRange(StaticBuffer startKey, StaticBuffer endKey) {
+ Text startRow = null;
+ Text endRow = null;
+
+ if (startKey != null && startKey.length() > 0) {
+ startRow = new Text(startKey.as(StaticBuffer.ARRAY_FACTORY));
+ }
+
+ if (endKey != null && endKey.length() > 0) {
+ endRow = new Text(endKey.as(StaticBuffer.ARRAY_FACTORY));
+ }
+
+ return new Range(startRow, true, endRow, false);
+ }
+
+ private Range getRange(KeySliceQuery query) {
+ return getRange(query.getKey(), query);
+ }
+
+ private Range getRange(StaticBuffer key, SliceQuery query) {
+ Text row = new Text(key.as(StaticBuffer.ARRAY_FACTORY));
+
+ Key startKey;
+ Key endKey;
+
+ if (query.getSliceStart().length() > 0) {
+ startKey = new Key(row, columnFamilyText,
+ new Text(query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY)));
+ } else {
+ startKey = new Key(row, columnFamilyText);
+ }
+
+ if (query.getSliceEnd().length() > 0) {
+ endKey = new Key(row, columnFamilyText,
+ new Text(query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY)));
+ } else {
+ endKey = new Key(row, columnFamilyText);
+ }
+
+ return new Range(startKey, true, endKey, false);
+ }
+
+ public static IteratorSetting getColumnSliceIterator(SliceQuery sliceQuery) {
+ IteratorSetting is = null;
+
+ byte[] minColumn = sliceQuery.getSliceStart().as(StaticBuffer.ARRAY_FACTORY);
+ byte[] maxColumn = sliceQuery.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY);
+
+ if (minColumn.length > 0 || maxColumn.length > 0) {
+ is = new IteratorSetting(10, "columnRangeIter", ColumnRangeFilter.class);
+ ColumnRangeFilter.setRange(is, minColumn, true, maxColumn, false);
+ }
+
+ return is;
+ }
+
+ private static class RowKeyIterator implements KeyIterator {
+
+ RowIterator rows;
+ PeekingIterator> currentRow;
+ boolean isClosed;
+
+ RowKeyIterator(Scanner scanner) {
+ rows = new RowIterator(scanner);
+ isClosed = false;
+ }
+
+ @Override
+ public RecordIterator getEntries() {
+ RecordIterator rowIter = new RecordIterator() {
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+ return currentRow.hasNext();
+ }
+
+ @Override
+ public Entry next() {
+ ensureOpen();
+ Map.Entry kv = currentRow.next();
+ return getEntry(kv);
+ }
+
+ @Override
+ public void close() {
+ isClosed = true; // same semantics as in-memory implementation in Titan core
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ return rowIter;
+ }
+
+ @Override
+ public boolean hasNext() {
+ ensureOpen();
+ return rows.hasNext();
+ }
+
+ @Override
+ public StaticBuffer next() {
+ ensureOpen();
+ currentRow = Iterators.peekingIterator(rows.next());
+ return new StaticArrayBuffer(currentRow.peek().getKey().getRow().getBytes());
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ rows = null;
+ currentRow = null;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private void ensureOpen() {
+ if (isClosed) {
+ throw new IllegalStateException("Iterator has been closed.");
+ }
+ }
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloStoreConfiguration.java b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloStoreConfiguration.java
new file mode 100644
index 0000000000..9dd694ed61
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloStoreConfiguration.java
@@ -0,0 +1,141 @@
+package com.thinkaurelius.titan.diskstorage.accumulo;
+
+import com.google.common.base.Preconditions;
+import com.thinkaurelius.titan.diskstorage.PermanentStorageException;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.TemporaryStorageException;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of storage backend properties in Accumulo.
+ *
+ * Each storage backend provides the functionality to get and set properties.
+ * This class implements this backend properties using a single row and column
+ * family in Accumulo.
+ *
+ * AccumuloStoreConfiguration
is thread-safe.
+ *
+ * @author Etienne Deprit
+ */
+public class AccumuloStoreConfiguration {
+
+ private static final Logger logger = LoggerFactory.getLogger(AccumuloStoreConfiguration.class);
+ // Configuration defaults
+ private static final String ROW_ID_DEFAULT = "";
+ private static final String COL_FAMILY_DEFAULT = "_properties";
+ private static final Authorizations AUTHORIZATIONS_DEFAULT = new Authorizations();
+ // Instance variables
+ private final Connector connector; // thread-safe
+ private final String tableName;
+ private final AccumuloBatchConfiguration batchConfiguration;
+ private final Text rowIdText;
+ private final Text colFamilyText;
+
+ /**
+ * Construct Accumulo store configuration that gets and sets properties.
+ *
+ * @param connector Connection to Accumulo instance
+ * @param tableName Accumulo table backing Titan store
+ * @param batchConfiguration Configuration for batch operations
+ */
+ public AccumuloStoreConfiguration(Connector connector, String tableName, AccumuloBatchConfiguration batchConfiguration) {
+ this(connector, tableName, batchConfiguration, ROW_ID_DEFAULT, COL_FAMILY_DEFAULT);
+ }
+
+ /**
+ * Construct Accumulo store configuration that gets and sets properties.
+ *
+ * @param connector Connection to Accumulo instance
+ * @param tableName Accumulo table backing Titan store
+ * @param batchConfiguration Configuration for batch operations
+ * @param rowId Row containing properties.
+ * @param colFamily Column family for key-value pairs.
+ */
+ public AccumuloStoreConfiguration(Connector connector, String tableName,
+ AccumuloBatchConfiguration batchConfiguration, String rowId, String colFamily) {
+ this.connector = connector;
+ this.tableName = tableName;
+ this.batchConfiguration = batchConfiguration;
+ this.rowIdText = new Text(rowId);
+ this.colFamilyText = new Text(colFamily);
+ }
+
+ /**
+ * Get Accumulo store property for {
+ *
+ * @ key}.
+ *
+ * @param key Property to get
+ * @return Property value
+ * @throws StorageException
+ */
+ public String getConfigurationProperty(String key) throws StorageException {
+ Preconditions.checkArgument(key != null, "Key cannot be null");
+
+ Scanner scanner;
+ try {
+ scanner = connector.createScanner(tableName, AUTHORIZATIONS_DEFAULT);
+ } catch (TableNotFoundException ex) {
+ logger.error("Can't find Titan table " + tableName, ex);
+ throw new PermanentStorageException(ex);
+ }
+
+ Key scanKey = new Key(rowIdText, colFamilyText, new Text(key));
+ scanner.setRange(new Range(scanKey, true, scanKey, true));
+
+ Iterator> kvs = scanner.iterator();
+
+ if (kvs.hasNext()) {
+ return kvs.next().getValue().toString();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Sets Accumulo store property for {@ key} to {@ value}.
+ *
+ * @param key Property to set
+ * @param value Property value
+ * @throws StorageException
+ */
+ public void setConfigurationProperty(String key, String value) throws StorageException {
+ try {
+ BatchWriter writer = batchConfiguration.createBatchWriter(connector, tableName);
+ try {
+ Mutation mutation = new Mutation(rowIdText);
+ mutation.put(colFamilyText, new Text(key), new Value(value.getBytes()));
+
+ writer.addMutation(mutation);
+ writer.flush();
+ } catch (MutationsRejectedException ex) {
+ logger.error("Can't set configuration on Titan store" + tableName, ex);
+ throw new TemporaryStorageException(ex);
+ } finally {
+ try {
+ writer.close();
+ } catch (MutationsRejectedException ex) {
+ logger.error("Can't set configuration on Titan store" + tableName, ex);
+ throw new TemporaryStorageException(ex);
+ }
+ }
+ } catch (TableNotFoundException ex) {
+ logger.error("Can't find Titan store " + tableName, ex);
+ throw new PermanentStorageException(ex);
+ }
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloStoreManager.java b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloStoreManager.java
new file mode 100644
index 0000000000..edf99cd615
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloStoreManager.java
@@ -0,0 +1,313 @@
+package com.thinkaurelius.titan.diskstorage.accumulo;
+
+import com.thinkaurelius.titan.diskstorage.PermanentStorageException;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.TemporaryStorageException;
+import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.ConsistencyLevel;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.Entry;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTxConfig;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Storage Manager for Accumulo.
+ *
+ * @author Etienne Deprit
+ */
+public class AccumuloStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(AccumuloStoreManager.class);
+ // Default parameters
+ private static final Authorizations AUTHORIZATIONS_DEFAULT = new Authorizations();
+ // Configuration namespace
+ public static final String ACCUMULO_NAMESPACE = "accumulo";
+ // Configuration keys
+ public static final String ACCUMULO_INTSANCE_KEY = "instance";
+ public static final String TABLE_NAME_KEY = "tablename";
+ public static final String SERVER_SIDE_ITERATORS_KEY = "server-side-iterators";
+ // Configuration defaults
+ public static final String TABLE_NAME_DEFAULT = "titan";
+ public static final int PORT_DEFAULT = 9160;
+ public static final boolean SERVER_SIDE_ITERATORS_DEFAULT = false;
+ // Instance injector
+ public static AccumuloInstanceFactory instanceFactory = AccumuloInstanceFactory.ZOOKEEPER_INSTANCE_FACTORY;
+ // Instance variables
+ private final String tableName;
+ private final String instanceName;
+ private final String zooKeepers;
+ private final boolean serverSideIterators;
+ private final Instance instance; // thread-safe
+ private final Connector connector; // thread-safe
+ private final ConcurrentMap openStores;
+ private final StoreFeatures features; // immutable at constructor exit
+ private final AccumuloBatchConfiguration batchConfiguration; // immutable at constructor exit
+ private final AccumuloStoreConfiguration storeConfiguration; // immutable at constructor exit
+
+ public AccumuloStoreManager(Configuration config) throws StorageException {
+ super(config, PORT_DEFAULT);
+
+ zooKeepers = config.getString(GraphDatabaseConfiguration.HOSTNAME_KEY,
+ GraphDatabaseConfiguration.HOSTNAME_DEFAULT);
+ tableName = config.getString(TABLE_NAME_KEY, TABLE_NAME_DEFAULT);
+
+ // Accumulo specific keys
+ Configuration accumuloConfig = config.subset(ACCUMULO_NAMESPACE);
+ instanceName = accumuloConfig.getString(ACCUMULO_INTSANCE_KEY);
+
+ serverSideIterators = accumuloConfig.getBoolean(SERVER_SIDE_ITERATORS_KEY, SERVER_SIDE_ITERATORS_DEFAULT);
+
+ instance = instanceFactory.getInstance(instanceName, zooKeepers);
+
+ try {
+ connector = instance.getConnector(username, password.getBytes());
+ } catch (AccumuloException ex) {
+ logger.error("Accumulo failure", ex);
+ throw new PermanentStorageException(ex.getMessage(), ex);
+ } catch (AccumuloSecurityException ex) {
+ logger.error("User doesn't have permission to connect", ex);
+ throw new PermanentStorageException(ex.getMessage(), ex);
+ }
+
+ openStores = new ConcurrentHashMap();
+
+ features = new StoreFeatures();
+ features.supportsOrderedScan = true;
+ features.supportsUnorderedScan = true;
+ features.supportsBatchMutation = true;
+ features.supportsTransactions = false;
+ features.supportsMultiQuery = true;
+ features.supportsConsistentKeyOperations = true;
+ features.supportsLocking = false;
+ features.isKeyOrdered = true;
+ features.isDistributed = true;
+ features.hasLocalKeyPartition = false;
+
+ batchConfiguration = new AccumuloBatchConfiguration();
+
+ storeConfiguration = new AccumuloStoreConfiguration(connector, tableName, batchConfiguration);
+ }
+
+ @Override
+ public String getName() {
+ return tableName;
+ }
+
+ @Override
+ public String toString() {
+ return "accumulo[" + getName() + "@" + super.toString() + "]";
+ }
+
+ @Override
+ public void close() {
+ openStores.clear();
+ }
+
+ @Override
+ public StoreFeatures getFeatures() {
+ return features;
+ }
+
+ @Override
+ public KeyColumnValueStore openDatabase(String dbName) throws StorageException {
+ AccumuloKeyColumnValueStore store = openStores.get(dbName);
+
+ if (store == null) {
+ AccumuloKeyColumnValueStore newStore = new AccumuloKeyColumnValueStore(connector, tableName, dbName,
+ batchConfiguration, serverSideIterators);
+
+ store = openStores.putIfAbsent(dbName, newStore); // atomic so only one store dbName
+
+ if (store == null) { // ensure that column family exists on first open
+ ensureColumnFamilyExists(tableName, dbName);
+ store = newStore;
+ }
+ }
+
+ return store;
+ }
+
+ @Override
+ public void clearStorage() throws StorageException {
+ TableOperations operations = connector.tableOperations();
+
+ // Check if table exists, if not we are done
+ if (!operations.exists(tableName)) {
+ logger.warn("clearStorage() called before table {} created, skipping.", tableName);
+ return;
+ }
+
+ try {
+ BatchDeleter deleter = batchConfiguration.createBatchDeleter(connector, tableName, AUTHORIZATIONS_DEFAULT);
+ deleter.setRanges(Collections.singletonList(new Range()));
+ try {
+ deleter.delete();
+ } catch (MutationsRejectedException ex) {
+ logger.error("Can't write mutations to " + tableName, ex);
+ throw new TemporaryStorageException(ex);
+ } finally {
+ deleter.close();
+ }
+
+ } catch (TableNotFoundException ex) {
+ logger.error("Can't find Titan table " + tableName, ex);
+ throw new PermanentStorageException(ex);
+ }
+ }
+
+ public String getConfigurationProperty(String key) throws StorageException {
+ return storeConfiguration.getConfigurationProperty(key);
+ }
+
+ public void setConfigurationProperty(String key, String value) throws StorageException {
+ storeConfiguration.setConfigurationProperty(key, value);
+ }
+
+ @Override
+ public StoreTransaction beginTransaction(final StoreTxConfig config) throws StorageException {
+ return new AccumuloTransaction(config);
+ }
+
+ @Override
+ public void mutateMany(Map> mutations, StoreTransaction txh) throws StorageException {
+ final long delTS = System.currentTimeMillis();
+ final long putTS = delTS + 1;
+
+ Collection actions = convertToActions(mutations, putTS, delTS);
+
+ try {
+ BatchWriter writer = batchConfiguration.createBatchWriter(connector, tableName);
+ try {
+ writer.addMutations(actions);
+ writer.flush();
+ } catch (MutationsRejectedException ex) {
+ logger.error("Can't write mutations to Titan store " + tableName, ex);
+ throw new TemporaryStorageException(ex);
+ } finally {
+ try {
+ writer.close();
+ } catch (MutationsRejectedException ex) {
+ logger.error("Can't write mutations to Titan store " + tableName, ex);
+ throw new TemporaryStorageException(ex);
+ }
+ }
+ } catch (TableNotFoundException ex) {
+ logger.error("Can't find Titan store " + tableName, ex);
+ throw new PermanentStorageException(ex);
+ }
+
+ waitUntil(putTS);
+ }
+
+ /**
+ * Convert Titan internal {
+ *
+ * @ KCVMutation} representation into Accumulo native {
+ * @ Mutation}.
+ *
+ * @param mutations Mutations to convert into Accumulo actions.
+ * @param putTimestamp The timestamp to use for put mutations.
+ * @param delTimestamp The timestamp to use for delete mutations.
+ *
+ * @return Mutations converted from Titan internal representation.
+ */
+ private static Collection convertToActions(Map> mutations,
+ final long putTimestamp, final long delTimestamp) {
+
+ Map actionsPerKey = new HashMap();
+
+ for (Map.Entry> entry : mutations.entrySet()) {
+ Text colFamily = new Text(entry.getKey().getBytes());
+
+ for (Map.Entry m : entry.getValue().entrySet()) {
+ StaticBuffer key = m.getKey();
+ KCVMutation mutation = m.getValue();
+
+ Mutation commands = actionsPerKey.get(key);
+
+ if (commands == null) {
+ commands = new Mutation(new Text(key.as(StaticBuffer.ARRAY_FACTORY)));
+ actionsPerKey.put(key, commands);
+ }
+
+ if (mutation.hasDeletions()) {
+ for (StaticBuffer del : mutation.getDeletions()) {
+ commands.putDelete(colFamily, new Text(del.as(StaticBuffer.ARRAY_FACTORY)), delTimestamp);
+ }
+ }
+
+ if (mutation.hasAdditions()) {
+ for (Entry add : mutation.getAdditions()) {
+ commands.put(colFamily, new Text(add.getArrayColumn()), putTimestamp, new Value(add.getArrayValue()));
+ }
+ }
+ }
+ }
+
+ return actionsPerKey.values();
+ }
+
+ private void ensureColumnFamilyExists(String tableName, String columnFamily) throws StorageException {
+ ensureTableExists(tableName);
+ // Option to set locality groups here
+ }
+
+ private void ensureTableExists(String tableName) throws StorageException {
+ TableOperations operations = connector.tableOperations();
+ if (!operations.exists(tableName)) {
+ try {
+ operations.create(tableName);
+ } catch (AccumuloException ex) {
+ logger.error("Accumulo failure", ex);
+ throw new PermanentStorageException(ex);
+ } catch (AccumuloSecurityException ex) {
+ logger.error("User doesn't have permission to create Titan store" + tableName, ex);
+ throw new PermanentStorageException(ex);
+ } catch (TableExistsException ex) {
+ // Concurrent creation of table, this thread lost race
+ }
+ }
+ }
+
+ private static void waitUntil(long until) {
+ long now = System.currentTimeMillis();
+
+ while (now <= until) {
+ try {
+ Thread.sleep(1L);
+ now = System.currentTimeMillis();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloTransaction.java b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloTransaction.java
new file mode 100644
index 0000000000..0b9a17a7fe
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloTransaction.java
@@ -0,0 +1,16 @@
+package com.thinkaurelius.titan.diskstorage.accumulo;
+
+import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTxConfig;
+
+/**
+ * This creates a transaction type specific to Accumulo.
+ *
+ * @author Etienne Deprit
+ */
+public class AccumuloTransaction extends AbstractStoreTransaction {
+
+ public AccumuloTransaction(final StoreTxConfig config) {
+ super(config);
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/util/CallableFunction.java b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/util/CallableFunction.java
new file mode 100644
index 0000000000..44c76478b6
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/util/CallableFunction.java
@@ -0,0 +1,18 @@
+package com.thinkaurelius.titan.diskstorage.accumulo.util;
+
+/**
+ * Callable function that throws exceptions for use with {@code ConcurrentLists}.
+ *
+ * @author Etienne Deprit
+ */
+public interface CallableFunction {
+
+ /**
+ * Application of this function to {@code input}.
+ *
+ * @param input Function parameter
+ * @return Computed value
+ * @throws Exception
+ */
+ public T apply(F input) throws Exception;
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/util/ConcurrentLists.java b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/util/ConcurrentLists.java
new file mode 100644
index 0000000000..3cda10c50f
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/util/ConcurrentLists.java
@@ -0,0 +1,113 @@
+package com.thinkaurelius.titan.diskstorage.accumulo.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Concurrent transformations on lists.
+ *
+ * @author Etienne Deprit
+ */
+public class ConcurrentLists {
+
+ /**
+ * Default size of executor thread pool.
+ */
+ public static final int NUM_THREADS_DEFAULT = 4;
+
+ /**
+ * Returns list that is concurrent application of {@code function} to each
+ * element of {@code fromList}.
+ *
+ * @param Element type of from list
+ * @param Element type of result list
+ * @param fromList Source list
+ * @param function Transformation function
+ * @return List of function applications
+ */
+ public static List transform(List fromList,
+ final CallableFunction super F, ? extends T> function) {
+
+ return transform(fromList, function, NUM_THREADS_DEFAULT);
+ }
+
+ /**
+ * Returns list that is concurrent application of {@code function} to each
+ * element of {@code fromList}.
+ *
+ * @param Element type of from list
+ * @param Element type of result list
+ * @param fromList Source list
+ * @param function Transformation function
+ * @param numThreads Size of thread pool
+ * @return List of function applications
+ */
+ public static List transform(List fromList,
+ CallableFunction super F, ? extends T> function, int numThreads) {
+ Preconditions.checkArgument(numThreads > 0, "numThreads must be > 0");
+
+ ExecutorService executor = null;
+ try {
+ executor = Executors.newFixedThreadPool(numThreads);
+ return transform(fromList, function, executor);
+ } finally {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * Returns list that is concurrent application of {@code function} to each
+ * element of {@code fromList}.
+ *
+ * @param Element type of from list
+ * @param Element type of result list
+ * @param fromList Source list
+ * @param function Transformation function
+ * @param executor Executor service for threads
+ * @return List of function applications
+ */
+ public static List transform(List fromList,
+ final CallableFunction super F, ? extends T> function, ExecutorService executor) {
+
+ List> tasks = Lists.newArrayListWithCapacity(fromList.size());
+ for (final F from : fromList) {
+ tasks.add(
+ new Callable() {
+ @Override
+ public T call() throws Exception {
+ return function.apply(from);
+ }
+ });
+ }
+
+ List results = Lists.newArrayListWithCapacity(fromList.size());
+ try {
+ List> futures = executor.invokeAll(tasks);
+
+ for (Future future : futures) {
+ results.add(future.get());
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException ex) {
+ Throwable t = ex.getCause();
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else if (t instanceof Error) {
+ throw (Error) t;
+ } else {
+ throw new IllegalStateException(t);
+ }
+ }
+
+ return results;
+ }
+}
\ No newline at end of file
diff --git a/titan-accumulo/titan-accumulo-core/src/main/resources/log4j.properties b/titan-accumulo/titan-accumulo-core/src/main/resources/log4j.properties
new file mode 100644
index 0000000000..a02cd1ff42
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/main/resources/log4j.properties
@@ -0,0 +1,14 @@
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# Set root logger level to the designated level and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+log4j.logger.org.apache.accumulo=INFO
+log4j.logger.org.apache.cassandra=INFO
+log4j.logger.org.apache.hadoop=INFO
+log4j.logger.org.apache.zookeeper=INFO
diff --git a/titan-accumulo/titan-accumulo-core/src/main/resources/titan.properties b/titan-accumulo/titan-accumulo-core/src/main/resources/titan.properties
new file mode 100644
index 0000000000..02dc2c4ce5
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/main/resources/titan.properties
@@ -0,0 +1,2 @@
+titan.version=${project.version}
+titan.compatible-versions=${titan.compatible.versions}
\ No newline at end of file
diff --git a/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/AccumuloStorageSetup.java b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/AccumuloStorageSetup.java
new file mode 100644
index 0000000000..f37c387798
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/AccumuloStorageSetup.java
@@ -0,0 +1,63 @@
+package com.thinkaurelius.titan;
+
+import com.thinkaurelius.titan.diskstorage.PermanentStorageException;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.accumulo.AccumuloStoreManager;
+import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+/**
+ * Set up Accumulo storage back-end for unit tests.
+ *
+ * @author Etienne Deprit
+ */
+public class AccumuloStorageSetup {
+
+ public static AccumuloStoreManager getAccumuloStoreManager() throws StorageException {
+ try {
+ Configuration config = getAccumuloStorageConfiguration();
+ String backend = config.getString(GraphDatabaseConfiguration.STORAGE_BACKEND_KEY);
+
+ Class cls = Class.forName(backend);
+ Constructor constructor = cls.getDeclaredConstructor(Configuration.class);
+
+ return (AccumuloStoreManager) constructor.newInstance(config);
+ } catch (Exception ex) {
+ throw new PermanentStorageException(ex);
+ }
+ }
+
+ public static Configuration getAccumuloStorageConfiguration() {
+ return getAccumuloGraphConfiguration()
+ .subset(GraphDatabaseConfiguration.STORAGE_NAMESPACE);
+ }
+
+ public static Configuration getAccumuloGraphConfiguration() {
+ BaseConfiguration config = new BaseConfiguration();
+
+ Configuration storageConfig = config.subset(GraphDatabaseConfiguration.STORAGE_NAMESPACE);
+
+ storageConfig.addProperty(GraphDatabaseConfiguration.STORAGE_BACKEND_KEY,
+ "com.thinkaurelius.titan.diskstorage.accumulo.MockAccumuloStoreManager");
+ storageConfig.addProperty(GraphDatabaseConfiguration.HOSTNAME_KEY, "localhost");
+
+ storageConfig.addProperty(GraphDatabaseConfiguration.AUTH_USERNAME_KEY, "root");
+ storageConfig.addProperty(GraphDatabaseConfiguration.AUTH_PASSWORD_KEY, "");
+
+ Configuration accumuloConfig = storageConfig.subset(AccumuloStoreManager.ACCUMULO_NAMESPACE);
+
+ accumuloConfig.addProperty(AccumuloStoreManager.ACCUMULO_INTSANCE_KEY, "devdb");
+ accumuloConfig.addProperty(AccumuloStoreManager.SERVER_SIDE_ITERATORS_KEY, false);
+
+ return config;
+ }
+
+ public static void startAccumulo() throws IOException {
+ }
+
+ private static void shutdownAccumulo() throws IOException {
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/blueprints/AccumuloBlueprintsTest.java b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/blueprints/AccumuloBlueprintsTest.java
new file mode 100644
index 0000000000..9137fef98a
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/blueprints/AccumuloBlueprintsTest.java
@@ -0,0 +1,50 @@
+package com.thinkaurelius.titan.blueprints;
+
+import com.thinkaurelius.titan.AccumuloStorageSetup;
+import com.thinkaurelius.titan.core.TitanFactory;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.accumulo.AccumuloStoreManager;
+import com.tinkerpop.blueprints.Graph;
+
+import java.io.IOException;
+
+/**
+ * @author Matthias Broecheler (me@matthiasb.com)
+ */
+public class AccumuloBlueprintsTest extends TitanBlueprintsTest {
+
+ @Override
+ public void startUp() {
+ try {
+ AccumuloStorageSetup.startAccumulo();
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ public void shutDown() {
+ // we don't need to restart on each test because cleanup is in please
+ }
+
+ @Override
+ public Graph generateGraph() {
+ return TitanFactory.open(AccumuloStorageSetup.getAccumuloGraphConfiguration());
+ }
+
+ @Override
+ public void cleanUp() throws StorageException {
+ AccumuloStoreManager s = AccumuloStorageSetup.getAccumuloStoreManager();
+ s.clearStorage();
+ }
+
+ @Override
+ public boolean supportsMultipleGraphs() {
+ return false;
+ }
+
+ @Override
+ public Graph generateGraph(String s) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloIDAllocationTest.java b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloIDAllocationTest.java
new file mode 100644
index 0000000000..caf2682b9c
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloIDAllocationTest.java
@@ -0,0 +1,27 @@
+package com.thinkaurelius.titan.diskstorage.accumulo;
+
+import com.thinkaurelius.titan.AccumuloStorageSetup;
+import com.thinkaurelius.titan.diskstorage.IDAllocationTest;
+import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import org.apache.commons.configuration.Configuration;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class AccumuloIDAllocationTest extends IDAllocationTest {
+
+ public AccumuloIDAllocationTest(Configuration baseConfig) {
+ super(baseConfig);
+ }
+
+ @BeforeClass
+ public static void startAccmulo() throws IOException {
+ AccumuloStorageSetup.startAccumulo();
+ }
+
+ public KeyColumnValueStoreManager openStorageManager(int idx) throws StorageException {
+ return AccumuloStorageSetup.getAccumuloStoreManager();
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloKeyColumnValueTest.java b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloKeyColumnValueTest.java
new file mode 100644
index 0000000000..c98b2cb64d
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloKeyColumnValueTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.diskstorage.accumulo;
+
+import com.thinkaurelius.titan.AccumuloStorageSetup;
+import com.thinkaurelius.titan.diskstorage.KeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import org.junit.BeforeClass;
+import java.io.IOException;
+
+public class AccumuloKeyColumnValueTest extends KeyColumnValueStoreTest {
+
+ @BeforeClass
+ public static void startAccmulo() throws IOException {
+ AccumuloStorageSetup.startAccumulo();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws StorageException {
+ return AccumuloStorageSetup.getAccumuloStoreManager();
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloLockKeyColumnValueStoreTest.java b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloLockKeyColumnValueStoreTest.java
new file mode 100644
index 0000000000..1b24f551aa
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloLockKeyColumnValueStoreTest.java
@@ -0,0 +1,21 @@
+package com.thinkaurelius.titan.diskstorage.accumulo;
+
+import com.thinkaurelius.titan.AccumuloStorageSetup;
+import com.thinkaurelius.titan.diskstorage.LockKeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class AccumuloLockKeyColumnValueStoreTest extends LockKeyColumnValueStoreTest {
+ @BeforeClass
+ public static void startAccumulo() throws IOException {
+ AccumuloStorageSetup.startAccumulo();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager(int idx) throws StorageException {
+ return AccumuloStorageSetup.getAccumuloStoreManager();
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloMultiWriteKeyColumnValueStoreTest.java b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloMultiWriteKeyColumnValueStoreTest.java
new file mode 100644
index 0000000000..eb3b92c3d9
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/AccumuloMultiWriteKeyColumnValueStoreTest.java
@@ -0,0 +1,22 @@
+package com.thinkaurelius.titan.diskstorage.accumulo;
+
+import com.thinkaurelius.titan.AccumuloStorageSetup;
+import com.thinkaurelius.titan.diskstorage.MultiWriteKeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class AccumuloMultiWriteKeyColumnValueStoreTest extends MultiWriteKeyColumnValueStoreTest {
+
+ @BeforeClass
+ public static void startAccumulo() throws IOException {
+ AccumuloStorageSetup.startAccumulo();
+ }
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager() throws StorageException {
+ return AccumuloStorageSetup.getAccumuloStoreManager();
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/MockAccumuloStoreManager.java b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/MockAccumuloStoreManager.java
new file mode 100644
index 0000000000..bd8267dc81
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/MockAccumuloStoreManager.java
@@ -0,0 +1,31 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package com.thinkaurelius.titan.diskstorage.accumulo;
+
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.commons.configuration.Configuration;
+
+/**
+ * Store manager that injects mock Accumulo instance.
+ *
+ * @author Etienne Deprit
+ */
+public class MockAccumuloStoreManager extends AccumuloStoreManager {
+
+ static {
+ instanceFactory = new AccumuloInstanceFactory() {
+ @Override
+ public Instance getInstance(String instanceName, String zooKeepers) {
+ return new MockInstance(instanceName);
+ }
+ };
+ }
+
+ public MockAccumuloStoreManager(Configuration config) throws StorageException {
+ super(config);
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/graphdb/accumulo/AccumuloGraphConcurrentTest.java b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/graphdb/accumulo/AccumuloGraphConcurrentTest.java
new file mode 100644
index 0000000000..7a4f12dab7
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/graphdb/accumulo/AccumuloGraphConcurrentTest.java
@@ -0,0 +1,18 @@
+package com.thinkaurelius.titan.graphdb.accumulo;
+
+import com.thinkaurelius.titan.AccumuloStorageSetup;
+import com.thinkaurelius.titan.graphdb.TitanGraphConcurrentTest;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class AccumuloGraphConcurrentTest extends TitanGraphConcurrentTest {
+ @BeforeClass
+ public static void startAccumulo() throws IOException {
+ AccumuloStorageSetup.startAccumulo();
+ }
+
+ public AccumuloGraphConcurrentTest() {
+ super(AccumuloStorageSetup.getAccumuloGraphConfiguration());
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/graphdb/accumulo/AccumuloGraphPerformanceMemoryTest.java b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/graphdb/accumulo/AccumuloGraphPerformanceMemoryTest.java
new file mode 100644
index 0000000000..7b475b968c
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/graphdb/accumulo/AccumuloGraphPerformanceMemoryTest.java
@@ -0,0 +1,18 @@
+package com.thinkaurelius.titan.graphdb.accumulo;
+
+import com.thinkaurelius.titan.AccumuloStorageSetup;
+import com.thinkaurelius.titan.graphdb.TitanGraphPerformanceMemoryTest;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class AccumuloGraphPerformanceMemoryTest extends TitanGraphPerformanceMemoryTest {
+ @BeforeClass
+ public static void startAccumulo() throws IOException {
+ AccumuloStorageSetup.startAccumulo();
+ }
+
+ public AccumuloGraphPerformanceMemoryTest() {
+ super(AccumuloStorageSetup.getAccumuloGraphConfiguration());
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/graphdb/accumulo/AccumuloGraphTest.java b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/graphdb/accumulo/AccumuloGraphTest.java
new file mode 100644
index 0000000000..a344ebc58a
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/java/com/thinkaurelius/titan/graphdb/accumulo/AccumuloGraphTest.java
@@ -0,0 +1,18 @@
+package com.thinkaurelius.titan.graphdb.accumulo;
+
+import com.thinkaurelius.titan.AccumuloStorageSetup;
+import com.thinkaurelius.titan.graphdb.TitanGraphTest;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class AccumuloGraphTest extends TitanGraphTest {
+ @BeforeClass
+ public static void startAccumulo() throws IOException {
+ AccumuloStorageSetup.startAccumulo();
+ }
+
+ public AccumuloGraphTest() {
+ super(AccumuloStorageSetup.getAccumuloGraphConfiguration());
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-core/src/test/resources/log4j.properties b/titan-accumulo/titan-accumulo-core/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..1f8f2d4c63
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/resources/log4j.properties
@@ -0,0 +1,15 @@
+# A1 is set to be a FileAppender.
+log4j.appender.A1=org.apache.log4j.FileAppender
+log4j.appender.A1.File=target/test.log
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# Set root logger level to the designated level and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+log4j.logger.org.apache.accumulo=INFO
+log4j.logger.org.apache.cassandra=INFO
+log4j.logger.org.apache.hadoop=INFO
+log4j.logger.org.apache.zookeeper=INFO
\ No newline at end of file
diff --git a/titan-accumulo/titan-accumulo-core/src/test/resources/rexster-fragment.xml b/titan-accumulo/titan-accumulo-core/src/test/resources/rexster-fragment.xml
new file mode 100644
index 0000000000..19f88f9236
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-core/src/test/resources/rexster-fragment.xml
@@ -0,0 +1,13 @@
+
+
+ false
+ home
+
+ local
+
+
+
+ tp:gremlin
+
+
+
diff --git a/titan-accumulo/titan-accumulo-iterators/pom.xml b/titan-accumulo/titan-accumulo-iterators/pom.xml
new file mode 100644
index 0000000000..7233b44f66
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-iterators/pom.xml
@@ -0,0 +1,84 @@
+
+ 4.0.0
+
+ com.thinkaurelius.titan
+ titan-accumulo
+ 0.4.3-SNAPSHOT
+ ../pom.xml
+
+ titan-accumulo-iterators
+ Titan-Accumulo: Graph Database Iterators
+
+
+ org.apache.accumulo
+ accumulo-core
+ ${accumulo.version}
+ provided
+
+
+ libthrift
+ org.apache.thrift
+
+
+
+
+ org.apache.accumulo
+ accumulo-start
+ ${accumulo.version}
+ provided
+
+
+ commons-logging-api
+ commons-logging
+
+
+
+
+ org.apache.hadoop
+ hadoop-core
+ 1.0.4
+ provided
+
+
+ junit
+ junit
+ 4.11
+ provided
+
+
+ com.thinkaurelius.titan
+ titan-test
+ ${project.version}
+ test
+
+
+
+
+
+ ${basedir}/src/main/resources
+ true
+
+
+
+
+ maven-dependency-plugin
+
+
+ test-compile
+
+ copy-dependencies
+
+
+ com.thinkaurelius.titan
+ target/test-lib
+ false
+ false
+ true
+
+
+
+
+
+
+
diff --git a/titan-accumulo/titan-accumulo-iterators/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/iterators/ColumnRangeFilter.java b/titan-accumulo/titan-accumulo-iterators/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/iterators/ColumnRangeFilter.java
new file mode 100644
index 0000000000..fa8c558dc7
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-iterators/src/main/java/com/thinkaurelius/titan/diskstorage/accumulo/iterators/ColumnRangeFilter.java
@@ -0,0 +1,126 @@
+package com.thinkaurelius.titan.diskstorage.accumulo.iterators;
+
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.Filter;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * A Filter that matches entries based on range of column qualifiers,
+ * modeled after {@code RegExFilter}.
+ *
+ * @author Etienne Deprit
+ */
+public class ColumnRangeFilter extends Filter {
+
+ @Override
+ public SortedKeyValueIterator deepCopy(IteratorEnvironment env) {
+ ColumnRangeFilter result = (ColumnRangeFilter) super.deepCopy(env);
+ result.minColumn = minColumn;
+ result.minColInclusive = minColInclusive;
+ result.maxColumn = maxColumn;
+ result.maxColInclusive = maxColInclusive;
+ return result;
+ }
+ public static final String MIN_COLUMN = "minColumn";
+ public static final String MIN_COL_INCLUSIVE = "minColInclusive";
+ public static final String MAX_COLUMN = "maxColumn";
+ public static final String MAX_COL_INCLUSIVE = "maxColInclusive";
+ private ByteSequence minColumn;
+ private boolean minColInclusive;
+ private ByteSequence maxColumn;
+ private boolean maxColInclusive;
+
+ @Override
+ public boolean accept(Key key, Value value) {
+ int cmpMin = -1;
+
+ if (minColumn != null) {
+ cmpMin = minColumn.compareTo(key.getColumnQualifierData());
+ }
+
+ if (cmpMin > 0 || (!minColInclusive && cmpMin == 0)) {
+ return false;
+ }
+
+ if (maxColumn == null) {
+ return true;
+ }
+
+ int cmpMax = maxColumn.compareTo(key.getColumnQualifierData());
+
+ if (cmpMax < 0 || (!maxColInclusive && cmpMax == 0)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException {
+ super.init(source, options, env);
+ if (options.containsKey(MIN_COLUMN)) {
+ minColumn = new ArrayByteSequence(options.get(MIN_COLUMN));
+ } else {
+ minColumn = null;
+ }
+
+ if (options.containsKey(MIN_COL_INCLUSIVE)) {
+ minColInclusive = Boolean.parseBoolean(options.get(MIN_COL_INCLUSIVE));
+ } else {
+ minColInclusive = true;
+ }
+
+ if (options.containsKey(MAX_COLUMN)) {
+ maxColumn = new ArrayByteSequence(options.get(MAX_COLUMN));
+ } else {
+ maxColumn = null;
+ }
+
+ if (options.containsKey(MAX_COL_INCLUSIVE)) {
+ maxColInclusive = Boolean.parseBoolean(options.get(MAX_COL_INCLUSIVE));
+ } else {
+ maxColInclusive = false;
+ }
+ }
+
+ @Override
+ public IteratorOptions describeOptions() {
+ IteratorOptions io = super.describeOptions();
+ io.setName("colrange");
+ io.setDescription("The ColumnRangeFilter/Iterator allows you to filter for a range of column qualifiers");
+ io.addNamedOption(MIN_COLUMN, "mininum column qualifier");
+ io.addNamedOption(MIN_COL_INCLUSIVE, "minimum column inclusive");
+ io.addNamedOption(MAX_COLUMN, "maximum column qualifier");
+ io.addNamedOption(MAX_COL_INCLUSIVE, "maximum column inclusive");
+ return io;
+ }
+
+ public static void setRange(IteratorSetting is, String minColumn, boolean minColInclusive,
+ String maxColumn, boolean maxColInclusive) {
+ if (minColumn != null && minColumn.length() > 0) {
+ is.addOption(ColumnRangeFilter.MIN_COLUMN, minColumn);
+ }
+ if (!minColInclusive) {
+ is.addOption(ColumnRangeFilter.MIN_COL_INCLUSIVE, "false");
+ }
+ if (maxColumn != null && maxColumn.length() > 0) {
+ is.addOption(ColumnRangeFilter.MAX_COLUMN, maxColumn);
+ }
+ if (maxColInclusive) {
+ is.addOption(ColumnRangeFilter.MAX_COL_INCLUSIVE, "true");
+ }
+ }
+
+ public static void setRange(IteratorSetting is, byte[] minColumn, boolean minColInclusive,
+ byte[] maxColumn, boolean maxColInclusive) {
+ setRange(is, new String(minColumn), minColInclusive, new String(maxColumn), maxColInclusive);
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-iterators/src/main/resources/log4j.properties b/titan-accumulo/titan-accumulo-iterators/src/main/resources/log4j.properties
new file mode 100644
index 0000000000..a02cd1ff42
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-iterators/src/main/resources/log4j.properties
@@ -0,0 +1,14 @@
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# Set root logger level to the designated level and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+log4j.logger.org.apache.accumulo=INFO
+log4j.logger.org.apache.cassandra=INFO
+log4j.logger.org.apache.hadoop=INFO
+log4j.logger.org.apache.zookeeper=INFO
diff --git a/titan-accumulo/titan-accumulo-iterators/src/main/resources/titan.properties b/titan-accumulo/titan-accumulo-iterators/src/main/resources/titan.properties
new file mode 100644
index 0000000000..02dc2c4ce5
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-iterators/src/main/resources/titan.properties
@@ -0,0 +1,2 @@
+titan.version=${project.version}
+titan.compatible-versions=${titan.compatible.versions}
\ No newline at end of file
diff --git a/titan-accumulo/titan-accumulo-iterators/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/iterators/ColumnRangeFilterTest.java b/titan-accumulo/titan-accumulo-iterators/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/iterators/ColumnRangeFilterTest.java
new file mode 100644
index 0000000000..eb58738230
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-iterators/src/test/java/com/thinkaurelius/titan/diskstorage/accumulo/iterators/ColumnRangeFilterTest.java
@@ -0,0 +1,145 @@
+package com.thinkaurelius.titan.diskstorage.accumulo.iterators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+import static junit.framework.TestCase.assertFalse;
+import static junit.framework.TestCase.assertTrue;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.DefaultIteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedMapIterator;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+/**
+ *
+ * @author Etienne Deprit
+ */
+public class ColumnRangeFilterTest extends TestCase {
+
+ private static final Collection EMPTY_COL_FAMS = new ArrayList();
+
+ private Key nkv(TreeMap tm, String row, String cf, String cq, String val) {
+ Key k = nk(row, cf, cq);
+ tm.put(k, new Value(val.getBytes()));
+ return k;
+ }
+
+ private Key nk(String row, String cf, String cq) {
+ return new Key(new Text(row), new Text(cf), new Text(cq));
+ }
+
+ @Test
+ public void test1() throws IOException {
+ TreeMap tm = new TreeMap();
+
+ Key k1 = nkv(tm, "row1", "cf1", "a", "x");
+ Key k2 = nkv(tm, "row1", "cf1", "b", "y");
+ Key k3 = nkv(tm, "row1", "cf2", "c", "z");
+
+ ColumnRangeFilter cri = new ColumnRangeFilter();
+ cri.describeOptions();
+
+ IteratorSetting is = new IteratorSetting(1, ColumnRangeFilter.class);
+ ColumnRangeFilter.setRange(is, (String) null, false, (String) null, false);
+ assertTrue(cri.validateOptions(is.getOptions()));
+ cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment());
+ cri.deepCopy(new DefaultIteratorEnvironment());
+ cri.seek(new Range(), EMPTY_COL_FAMS, false);
+
+ assertTrue(cri.hasTop());
+ assertTrue(cri.getTopKey().equals(k1));
+ cri.next();
+ cri.next();
+ assertTrue(cri.hasTop());
+
+ // -----------------------------------------------------
+ is.clearOptions();
+ ColumnRangeFilter.setRange(is, "c", false, null, false);
+ assertTrue(cri.validateOptions(is.getOptions()));
+ cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment());
+ cri.seek(new Range(), EMPTY_COL_FAMS, false);
+
+ assertFalse(cri.hasTop());
+
+ // -----------------------------------------------------
+ is.clearOptions();
+ ColumnRangeFilter.setRange(is, "b", false, null, false);
+ assertTrue(cri.validateOptions(is.getOptions()));
+ cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment());
+ cri.seek(new Range(), EMPTY_COL_FAMS, false);
+
+ assertTrue(cri.hasTop());
+ assertTrue(cri.getTopKey().equals(k3));
+ cri.next();
+ assertFalse(cri.hasTop());
+
+ // -----------------------------------------------------
+ is.clearOptions();
+ ColumnRangeFilter.setRange(is, "b", true, null, false);
+ assertTrue(cri.validateOptions(is.getOptions()));
+ cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment());
+ cri.seek(new Range(), EMPTY_COL_FAMS, false);
+
+ assertTrue(cri.hasTop());
+ assertTrue(cri.getTopKey().equals(k2));
+ cri.next();
+ assertTrue(cri.hasTop());
+
+ // -----------------------------------------------------
+ is.clearOptions();
+ ColumnRangeFilter.setRange(is, null, false, "b", false);
+ assertTrue(cri.validateOptions(is.getOptions()));
+ cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment());
+ cri.seek(new Range(), EMPTY_COL_FAMS, false);
+
+ assertTrue(cri.hasTop());
+ assertTrue(cri.getTopKey().equals(k1));
+ cri.next();
+ assertFalse(cri.hasTop());
+
+ // -----------------------------------------------------
+ is.clearOptions();
+ ColumnRangeFilter.setRange(is, null, false, "b", true);
+ assertTrue(cri.validateOptions(is.getOptions()));
+ cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment());
+ cri.seek(new Range(), EMPTY_COL_FAMS, false);
+
+ assertTrue(cri.hasTop());
+ assertTrue(cri.getTopKey().equals(k1));
+ cri.next();
+ assertTrue(cri.hasTop());
+
+ // -----------------------------------------------------
+ is.clearOptions();
+ ColumnRangeFilter.setRange(is, "b", true, "c", false);
+ assertTrue(cri.validateOptions(is.getOptions()));
+ cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment());
+ cri.seek(new Range(), EMPTY_COL_FAMS, false);
+
+ assertTrue(cri.hasTop());
+ assertTrue(cri.getTopKey().equals(k2));
+ cri.next();
+ assertFalse(cri.hasTop());
+
+ // -----------------------------------------------------
+ is.clearOptions();
+ ColumnRangeFilter.setRange(is, "b", true, "c", true);
+ assertTrue(cri.validateOptions(is.getOptions()));
+ cri.init(new SortedMapIterator(tm), is.getOptions(), new DefaultIteratorEnvironment());
+ cri.seek(new Range(), EMPTY_COL_FAMS, false);
+
+ assertTrue(cri.hasTop());
+ assertTrue(cri.getTopKey().equals(k2));
+ cri.next();
+ assertTrue(cri.hasTop());
+ }
+}
\ No newline at end of file
diff --git a/titan-accumulo/titan-accumulo-iterators/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java b/titan-accumulo/titan-accumulo-iterators/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java
new file mode 100644
index 0000000000..ed5ce0b983
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-iterators/src/test/java/org/apache/accumulo/core/iterators/DefaultIteratorEnvironment.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.iterators;
+
+import java.io.IOException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.map.MyMapFile;
+import org.apache.accumulo.core.file.map.MyMapFile.Reader;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+@SuppressWarnings("deprecation")
+public class DefaultIteratorEnvironment implements IteratorEnvironment {
+
+ AccumuloConfiguration conf;
+
+ public DefaultIteratorEnvironment(AccumuloConfiguration conf) {
+ this.conf = conf;
+ }
+
+ public DefaultIteratorEnvironment() {
+ this.conf = AccumuloConfiguration.getDefaultConfiguration();
+ }
+
+ @Override
+ public Reader reserveMapFileReader(String mapFileName) throws IOException {
+ Configuration conf = CachedConfiguration.getInstance();
+ FileSystem fs = FileSystem.get(conf);
+ return new MyMapFile.Reader(fs, mapFileName, conf);
+ }
+
+ @Override
+ public AccumuloConfiguration getConfig() {
+ return conf;
+ }
+
+ @Override
+ public IteratorScope getIteratorScope() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isFullMajorCompaction() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void registerSideChannel(SortedKeyValueIterator iter) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/titan-accumulo/titan-accumulo-iterators/src/test/resources/log4j.properties b/titan-accumulo/titan-accumulo-iterators/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..1f8f2d4c63
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-iterators/src/test/resources/log4j.properties
@@ -0,0 +1,15 @@
+# A1 is set to be a FileAppender.
+log4j.appender.A1=org.apache.log4j.FileAppender
+log4j.appender.A1.File=target/test.log
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# Set root logger level to the designated level and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+log4j.logger.org.apache.accumulo=INFO
+log4j.logger.org.apache.cassandra=INFO
+log4j.logger.org.apache.hadoop=INFO
+log4j.logger.org.apache.zookeeper=INFO
\ No newline at end of file
diff --git a/titan-accumulo/titan-accumulo-iterators/src/test/resources/rexster-fragment.xml b/titan-accumulo/titan-accumulo-iterators/src/test/resources/rexster-fragment.xml
new file mode 100644
index 0000000000..19f88f9236
--- /dev/null
+++ b/titan-accumulo/titan-accumulo-iterators/src/test/resources/rexster-fragment.xml
@@ -0,0 +1,13 @@
+
+
+ false
+ home
+
+ local
+
+
+
+ tp:gremlin
+
+
+
diff --git a/titan-all/pom.xml b/titan-all/pom.xml
index ff8bc81d3f..cee286946f 100644
--- a/titan-all/pom.xml
+++ b/titan-all/pom.xml
@@ -43,6 +43,17 @@
titan-hbase
${project.version}
+
+ com.thinkaurelius.titan
+ titan-accumulo-core
+ ${project.version}
+
+
+ zookeeper
+ org.apache.zookeeper
+
+
+
com.thinkaurelius.titan
titan-es
diff --git a/titan-dist/pom.xml b/titan-dist/pom.xml
index 93d92f3df4..85f90b4023 100644
--- a/titan-dist/pom.xml
+++ b/titan-dist/pom.xml
@@ -38,6 +38,7 @@
titan-dist-cassandra
titan-dist-hbase
titan-dist-persistit
+ titan-dist-accumulo
titan-dist-server
@@ -164,26 +165,26 @@
compile-tests
- test-compile
-
- testCompile
-
-
-
-
+ test-compile
+
+ testCompile
+
+
+
+
maven-jar-plugin
pack-test-jar
- package
-
- test-jar
-
-
-
-
+ package
+
+ test-jar
+
+
+
+
diff --git a/titan-dist/titan-dist-accumulo/accumulo.xml b/titan-dist/titan-dist-accumulo/accumulo.xml
new file mode 100644
index 0000000000..5e57bcff79
--- /dev/null
+++ b/titan-dist/titan-dist-accumulo/accumulo.xml
@@ -0,0 +1,37 @@
+
+
+ ${distribution.assembly.name}-${project.version}
+ titan-${distribution.assembly.name}-${project.version}
+
+
+ zip
+ tar.bz2
+
+
+
+ ../src/assembly/descriptor/common.component.xml
+ ../src/assembly/descriptor/readmes.component.xml
+ ../src/assembly/descriptor/htmldocs.component.xml
+
+
+
+
+ 775
+ 775
+ ../../titan-accumulo/bin
+ /bin
+
+
+ ../../titan-accumulo/config
+ /conf
+
+
+ ../../conf
+ /conf
+
+ titan-*${distribution.assembly.name}*
+
+
+
+
diff --git a/titan-dist/titan-dist-accumulo/pom.xml b/titan-dist/titan-dist-accumulo/pom.xml
new file mode 100644
index 0000000000..73e2084d2a
--- /dev/null
+++ b/titan-dist/titan-dist-accumulo/pom.xml
@@ -0,0 +1,45 @@
+
+ 4.0.0
+
+ com.thinkaurelius.titan
+ titan-dist-parent
+ 0.4.3-SNAPSHOT
+ ../titan-dist-parent/pom.xml
+
+ pom
+ titan-dist-accumulo
+ Titan-Dist-Accumulo: Archives with Accumulo
+ http://thinkaurelius.github.com/titan/
+
+
+ accumulo
+ ${project.basedir}/../src/assembly/descriptor/backend.xml
+ titan-accumulo*.properties
+ ${basedir}/../..
+
+
+
+
+ aurelius-release
+
+
+
+ com.thinkaurelius.titan
+ titan-accumulo-core
+ ${project.version}
+
+
+ libthrift
+ org.apache.thrift
+
+
+
+
+ org.apache.thrift
+ libthrift
+ 0.6.1
+
+
+
+
+
diff --git a/titan-dist/titan-dist-all/pom.xml b/titan-dist/titan-dist-all/pom.xml
index edbe9d945c..ec58905ae4 100644
--- a/titan-dist/titan-dist-all/pom.xml
+++ b/titan-dist/titan-dist-all/pom.xml
@@ -47,6 +47,11 @@
titan-hbase
${project.version}
+
+ com.thinkaurelius.titan
+ titan-accumulo-core
+ ${project.version}
+
com.thinkaurelius.titan
titan-lucene