Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dependencies v2 in c* #59

Merged
merged 1 commit into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

import com.datastax.spark.connector.japi.CassandraJavaUtil;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import com.google.common.base.Joiner;
import com.google.common.net.HostAndPort;
import io.jaegertracing.spark.dependencies.DependenciesSparkHelper;
Expand Down Expand Up @@ -173,10 +174,19 @@ public void run() {
}

private void store(JavaSparkContext sc, List<Dependency> links) {
CassandraDependencies dependencies = new CassandraDependencies(links, day);
javaFunctions(sc.parallelize(Collections.singletonList(dependencies)))
.writerBuilder(keyspace, "dependencies", mapToRow(CassandraDependencies.class))
.saveToCassandra();
String table = dependenciesTable(sc);
log.info("Storing dependencies into {}", table);
if (table == "dependencies_v2") {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using equals() is better.
use == to compare strings, in this case, it works but makes confusing.

CassandraDependenciesV2 dependencies = new CassandraDependenciesV2(links, day);
javaFunctions(sc.parallelize(Collections.singletonList(dependencies)))
.writerBuilder(keyspace, table, mapToRow(CassandraDependenciesV2.class))
.saveToCassandra();
} else {
CassandraDependencies dependencies = new CassandraDependencies(links, day);
javaFunctions(sc.parallelize(Collections.singletonList(dependencies)))
.writerBuilder(keyspace, table, mapToRow(CassandraDependencies.class))
.saveToCassandra();
}
}

static String parseHosts(String contactPoints) {
Expand All @@ -198,6 +208,17 @@ static String parsePort(String contactPoints) {
return ports.size() == 1 ? String.valueOf(ports.iterator().next()) : "9042";
}

private String dependenciesTable(JavaSparkContext sc) {
try {
javaFunctions(sc)
.cassandraTable(keyspace, "dependencies_v2")
.limit(1L).collect();
} catch (Exception ex) {
return "dependencies";
}
return "dependencies_v2";
}

/**
* DTO object used to store dependencies to Cassandra, see {@link com.datastax.spark.connector.mapper.JavaBeanColumnMapper}
*/
Expand All @@ -224,5 +245,32 @@ public Long getTsIndex() {
return zonedDateTime.toInstant().toEpochMilli();
}
}

/**
* DTO object used to store dependencies to Cassandra, see {@link com.datastax.spark.connector.mapper.JavaBeanColumnMapper}
*/
public final static class CassandraDependenciesV2 implements Serializable {
private static final long serialVersionUID = 0L;

private List<Dependency> dependencies;
private ZonedDateTime zonedDateTime;

public CassandraDependenciesV2(List<Dependency> dependencies, ZonedDateTime ts) {
this.dependencies = dependencies;
this.zonedDateTime = ts;
}

public List<Dependency> getDependencies() {
return dependencies;
}

public Long getTs() {
return zonedDateTime.toInstant().toEpochMilli();
}

public Long getTsBucket() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@black-adder is this ok?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should truncate the time as well, ala .toInstant().truncatedTo(ChronoUnit.DAYS).toEpochMilli()

return zonedDateTime.toInstant().toEpochMilli();
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static org.awaitility.Awaitility.await;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import io.jaegertracing.spark.dependencies.test.DependenciesTest;
import java.time.LocalDate;
Expand Down Expand Up @@ -76,6 +77,8 @@ public static void beforeClass() throws TimeoutException {
.withNetwork(network)
.withEnv("CASSANDRA_SERVERS", "cassandra")
.withEnv("CASSANDRA_KEYSPACE", "jaeger_v1_dc1")
// TODO remove after https://github.com/jaegertracing/jaeger/pull/1364 is merged
.withEnv("CASSANDRA_ENABLE_DEPENDENCIES_V2", "true")
.waitingFor(new BoundPortHttpWaitStrategy(16687).forStatusCode(204))
.withExposedPorts(16687, 16686);
jaegerQuery.start();
Expand All @@ -97,10 +100,19 @@ public static void afterClass() {
public void after() {
try (Cluster cluster = cassandra.getCluster(); Session session = cluster.newSession()) {
session.execute("TRUNCATE jaeger_v1_dc1.traces");
session.execute("TRUNCATE jaeger_v1_dc1.dependencies");
session.execute(String.format("TRUNCATE jaeger_v1_dc1.%s", dependenciesTable(session)));
}
}

private String dependenciesTable(Session session) {
try {
session.execute("SELECT ts from jaeger_v1_dc1.dependencies_v2 limit 1;");
} catch (Exception ex) {
return "dependencies";
}
return "dependencies_v2";
}

@Override
protected void deriveDependencies() throws Exception {
CassandraDependenciesJob.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ public boolean equals(Object o) {
&& this.callCount == that.callCount;
}

public String getSource() {
return "jaeger";
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@black-adder is this ok?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

}

public void setSource(String source) {
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a no-op on purpose?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep


@Override
public int hashCode() {
int h = 1;
Expand Down