Skip to content

Commit

Permalink
extend query RESTful service to query Cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
derrickoswald committed Nov 22, 2017
1 parent eef9c64 commit a6b50bd
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 33 deletions.
6 changes: 4 additions & 2 deletions CIMWeb/src/main/scala/ch/ninecode/cim/cimweb/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ class Query extends RESTful
@Produces (Array (MediaType.APPLICATION_JSON))
def query (
@QueryParam ("sql") sql: String,
@DefaultValue ("false") @QueryParam ("cassandra") cass: String,
@DefaultValue ("") @QueryParam ("table_name") table_name: String,
@DefaultValue ("") @QueryParam ("cassandra_table_name") cassandra_table_name: String): String =
{
_Logger.info ("query sql=%s%s%s".format (sql, if ("" != table_name) " table_name=" + table_name else "", if ("" != cassandra_table_name) " cassandra_table_name=" + cassandra_table_name else ""))
val cassandra = try { cass.toBoolean } catch { case _: Throwable => false }
_Logger.info ("query %ssql=%s%s%s".format (if (cassandra) "cassandra " else "", sql, if ("" != table_name) " table_name=" + table_name else "", if ("" != cassandra_table_name) " cassandra_table_name=" + cassandra_table_name else ""))
val ret = new RESTfulJSONResult ()
val connection = getConnection (ret)
if (null != connection)
Expand All @@ -45,7 +47,7 @@ class Query extends RESTful
val spec: CIMInteractionSpec = new CIMInteractionSpecImpl
spec.setFunctionName (CIMInteractionSpec.EXECUTE_CIM_FUNCTION)
val input = getInputRecord ("input record containing the function to run")
val query = QueryFunction (sql, table_name, cassandra_table_name)
val query = QueryFunction (sql, cassandra, table_name, cassandra_table_name)
input.asInstanceOf[map].put (CIMFunction.FUNCTION, query)
val interaction = connection.createInteraction
val output = interaction.execute (spec, input)
Expand Down
105 changes: 82 additions & 23 deletions CIMWeb/src/main/scala/ch/ninecode/cim/cimweb/QueryFunction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@ import javax.json.Json
import javax.json.JsonObjectBuilder
import javax.json.JsonStructure

import scala.collection.JavaConversions._

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import com.datastax.driver.core.DataType
import com.datastax.driver.core.ResultSet
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.driver.core.DataType.Name._

case class QueryFunction (sql: String, table_name: String = "", cassandra_table_name: String = "") extends CIMWebFunction
case class QueryFunction (sql: String, cassandra: Boolean, table_name: String = "", cassandra_table_name: String = "") extends CIMWebFunction
{
jars = Array (jarForObject (this), jarForObject (SomeColumns ("mrid")), jarForObject (new com.twitter.jsr166e.LongAdder ()))

Expand Down Expand Up @@ -40,35 +46,88 @@ case class QueryFunction (sql: String, table_name: String = "", cassandra_table_

ret
}

override def executeJSON (spark: SparkSession): JsonStructure =
def packRow2 (row: com.datastax.driver.core.Row): JsonObjectBuilder =
{
val df: DataFrame = spark.sql (sql)
if ("" != table_name)
val ret = Json.createObjectBuilder
val definitions = row.getColumnDefinitions
for (index 0 until definitions.size)
{
df.cache ()
df.createOrReplaceTempView (table_name)
//column: ColumnDefinitions.Definition
val name: String = definitions.getName (index) // column.getName
val typ: DataType = definitions.getType (index) // column.getType
typ.getName match
{
case ASCII if (!row.isNull(index)) ret.add (name, row.getString (index))
case BIGINT if (!row.isNull(index)) ret.add (name, row.getLong (index))
case BLOB if (!row.isNull(index)) ret.add (name, row.getBytes (index).toString) // ToDo: handle blob
case BOOLEAN if (!row.isNull(index)) ret.add (name, row.getBool (index))
case COUNTER if (!row.isNull(index)) ret.add (name, row.getLong (index)) // ToDo: counter?
case DECIMAL if (!row.isNull(index)) ret.add (name, row.getDouble (index))
case DOUBLE if (!row.isNull(index)) ret.add (name, row.getDouble (index))
case FLOAT if (!row.isNull(index)) ret.add (name, row.getDouble (index))
case INET if (!row.isNull(index)) ret.add (name, row.getInet (index).toString) // ToDo: internet address?
case INT if (!row.isNull(index)) ret.add (name, row.getInt (index))
case TEXT if (!row.isNull(index)) ret.add (name, row.getString (index))
case TIMESTAMP if (!row.isNull(index)) ret.add (name, row.getTimestamp (index).getTime)
case UUID if (!row.isNull(index)) ret.add (name, row.getString (index))
case VARCHAR if (!row.isNull(index)) ret.add (name, row.getString (index))
case VARINT if (!row.isNull(index)) ret.add (name, row.getInt (index)) // ToDo: varying int?
case TIMEUUID if (!row.isNull(index)) ret.add (name, row.getString (index))
case LIST if (!row.isNull(index)) ret.add (name, row.getString (index)) // ToDo: list of what?
case SET if (!row.isNull(index)) ret.add (name, row.getString (index)) // ToDo: set?
case MAP if (!row.isNull(index)) ret.add (name, row.getString (index)) // ToDo: map?
case CUSTOM if (!row.isNull(index)) ret.add (name, row.getString (index)) // ToDo: custom?
case UDT if (!row.isNull(index)) ret.add (name, row.getString (index)) // ToDo: udt?
case TUPLE if (!row.isNull(index)) ret.add (name, row.getString (index)) // ToDo: tuple?
case SMALLINT if (!row.isNull(index)) ret.add (name, row.getInt (index))
case TINYINT if (!row.isNull(index)) ret.add (name, row.getInt (index))
case DATE if (!row.isNull(index)) ret.add (name, row.getDate (index).toString)
case TIME if (!row.isNull(index)) ret.add (name, row.getTime (index).toString)
}
}
if ("" != cassandra_table_name)
ret
}

override def executeJSON (spark: SparkSession): JsonStructure =
{
val response = Json.createArrayBuilder
if (cassandra)
CassandraConnector (spark.sparkContext.getConf).withSessionDo
{
session =>
val resultset: ResultSet = session.execute (sql)
for (row: com.datastax.driver.core.Row resultset.iterator)
response.add (packRow2 (row))
}
else
{
val rows: RDD[Row] = df.rdd
val len = rows.first.length
if ((len == 6) || (len == 8) || (len == 10))
val df: DataFrame = spark.sql (sql)
if ("" != table_name)
{
df.cache ()
df.createOrReplaceTempView (table_name)
}
if ("" != cassandra_table_name)
{
if (len == 6)
rows.map (row (row.getString (0), row.getString (1), row.getString (2).substring (0, 10), row.getString (2), row.getDouble (3), row.getDouble (4), row.getString (5))).saveToCassandra ("cimapplication", cassandra_table_name, SomeColumns ("mrid", "type", "date", "time", "real_a", "imag_a", "units"))
else if (len == 8)
rows.map (row (row.getString (0), row.getString (1), row.getString (2).substring (0, 10), row.getString (2), row.getDouble (3), row.getDouble (4), row.getDouble (5), row.getDouble (6), row.getString (7))).saveToCassandra ("cimapplication", cassandra_table_name, SomeColumns ("mrid", "type", "date", "time", "real_a", "imag_a", "real_b", "imag_b", "units"))
else if (len == 10)
rows.map (row (row.getString (0), row.getString (1), row.getString (2).substring (0, 10), row.getString (2), row.getDouble (3), row.getDouble (4), row.getDouble (5), row.getDouble (6), row.getString (7), row.getDouble (8), row.getString (9))).saveToCassandra ("cimapplication", cassandra_table_name, SomeColumns ("mrid", "type", "date", "time", "real_a", "imag_a", "real_b", "imag_b", "real_c", "imag_c", "units"))
val rows: RDD[Row] = df.rdd
val len = rows.first.length
if ((len == 6) || (len == 8) || (len == 10))
{
if (len == 6)
rows.map (row (row.getString (0), row.getString (1), row.getString (2).substring (0, 10), row.getString (2), row.getDouble (3), row.getDouble (4), row.getString (5))).saveToCassandra ("cimapplication", cassandra_table_name, SomeColumns ("mrid", "type", "date", "time", "real_a", "imag_a", "units"))
else if (len == 8)
rows.map (row (row.getString (0), row.getString (1), row.getString (2).substring (0, 10), row.getString (2), row.getDouble (3), row.getDouble (4), row.getDouble (5), row.getDouble (6), row.getString (7))).saveToCassandra ("cimapplication", cassandra_table_name, SomeColumns ("mrid", "type", "date", "time", "real_a", "imag_a", "real_b", "imag_b", "units"))
else if (len == 10)
rows.map (row (row.getString (0), row.getString (1), row.getString (2).substring (0, 10), row.getString (2), row.getDouble (3), row.getDouble (4), row.getDouble (5), row.getDouble (6), row.getString (7), row.getDouble (8), row.getString (9))).saveToCassandra ("cimapplication", cassandra_table_name, SomeColumns ("mrid", "type", "date", "time", "real_a", "imag_a", "real_b", "imag_b", "real_c", "imag_c", "units"))
}
else
// ToDo: need an error mechanism
println ("Cassandra format error: RDD has rows of %d columns, not 6, 8, or 10 (\"mrid\", \"type\", \"time\", \"real_a\", \"imag_a\", ... \"units\")".format (rows.first.length))
}
else
// ToDo: need an error mechanism
println ("Cassandra format error: RDD has rows of %d columns, not 6, 8, or 10 (\"mrid\", \"type\", \"time\", \"real_a\", \"imag_a\", ... \"units\")".format (rows.first.length))
for (row df.toLocalIterator)
response.add (packRow (row))
}
val resultset: Array[Row] = df.collect
val response = Json.createArrayBuilder
resultset.map (packRow).map (response.add)
response.build
}

Expand Down
40 changes: 33 additions & 7 deletions CIMWeb/src/main/webapp/js/cimquery.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,33 @@ define
*/
var TheTable;

/**
* The Cassandra table for the results.
*/
var TheCassandraTable;

/**
* Direct queries to Cassandra.
*/
var QueryCassandra = false;

/**
* @summary perform query.
* @description Perform an SQL query on loaded CIM data.
* @param {string} sql - the SQL to use
* @param {boolean} cassandra - if <code>true</code> query Cassandra rather than Spark
* @param {string} table_name - the name of the temporary view to store the result DataFrame, "" for none
* @param {string} cassandra_table_name - the name of the name of the Cassandra table to store the result DataFrame, "" for none
* @param {function} fn - the callback function with the data
* @function query
* @memberOf module:cimquery
*/
function query (sql, table_name, cassandra_table_name, fn)
function query (sql, cassandra, table_name, cassandra_table_name, fn)
{
var target = (cassandra) ? "cassandra=true&": "";
var table = ("" != table_name) ? "table_name=" + encodeURIComponent (table_name) + "&": "";
var cassandra_table = ("" != cassandra_table_name) ? "cassandra_table_name=" + encodeURIComponent (cassandra_table_name) + "&": "";
var url = util.home () + "cim/query?" + table + cassandra_table + "sql=" + encodeURIComponent (sql);
var url = util.home () + "cim/query?" + target + table + cassandra_table + "sql=" + encodeURIComponent (sql);
var xmlhttp = util.createCORSRequest ("GET", url);
xmlhttp.onreadystatechange = function ()
{
Expand Down Expand Up @@ -73,9 +85,10 @@ define
if (sql != "")
{
TheQuery = sql;
var table_name = document.getElementById ("table_name").value;
var cassandra_table_name = document.getElementById ("cassandra_table_name").value;
query (sql, table_name, cassandra_table_name, function (data) { document.getElementById ("results_table").innerHTML = "<pre>\n" + JSON.stringify (data, null, 4) + "</pre>"; });
TheTable = document.getElementById ("table_name").value;
TheCassandraTable = document.getElementById ("cassandra_table_name").value;
QueryCassandra = document.getElementById ("query_cassandra").checked;
query (TheQuery, QueryCassandra, TheTable, TheCassandraTable, function (data) { document.getElementById ("results_table").innerHTML = "<pre>\n" + JSON.stringify (data, null, 4) + "</pre>"; });
}
}

Expand All @@ -98,14 +111,25 @@ define
" <textarea id='sql' class='form-control' aria-describedby='sqlHelp' name='sql' rows='8' placeholder='select * from ACLineSegment' style='width: 80%'>{{sql}}</textarea>\n" +
" <small id='sqlHelp' class='form-text text-muted'>A Spark SQL query against the <a href='https://derrickoswald.github.io/CIMReader/doc/scaladocs/index.html#ch.ninecode.model.package' target='_blank'>CIMReader schema</a>.</small>\n" +
" </div>\n" +
" <div class='form-group row'>\n" +
" <div class='col-sm-2 col-form-label'>Cassandra</div>\n" +
" <div class='col-sm-10'>\n" +
" <div class='form-check'>\n" +
" <label class='form-check-label'>\n" +
" <input id='query_cassandra' class='form-check-input' type='checkbox' value=''{{cassandra}}>\n" +
" Query Cassandra rather than Spark.\n" +
" </label>\n" +
" </div>\n" +
" </div>\n" +
" </div>\n" +
" <div class='form-group'>\n" +
" <label for='table_name'>Save as table</label>\n" +
" <input id='table_name' type='text' class='form-control' aria-describedby='nameHelp' placeholder='table name' value='{{table}}'>\n" +
" <small id='nameHelp' class='form-text text-muted'>Enter a name for a temporary view to hold the results of the query.</small>\n" +
" </div>\n" +
" <div class='form-group'>\n" +
" <label for='cassandra_table_name'>Save in Cassandra</label>\n" +
" <input id='cassandra_table_name' type='text' class='form-control' aria-describedby='cassandraHelp' placeholder='cassandra table name, e.g. measured_value_by_day' value=''>\n" +
" <input id='cassandra_table_name' type='text' class='form-control' aria-describedby='cassandraHelp' placeholder='cassandra table name, e.g. measured_value_by_day' value='{{ctable}}'>\n" +
" <small id='cassandraHelp' class='form-text text-muted'>Enter the name of the Cassandra table to store the results of the query.</small>\n" +
" </div>\n" +
" <div class='form-group'>\n" +
Expand All @@ -123,7 +147,9 @@ define
query_template,
{
sql: function () { return ((null != TheQuery) ? TheQuery : ""); },
table: function () { return ((null != TheTable) ? TheTable : ""); }
table: function () { return ((null != TheTable) ? TheTable : ""); },
ctable: function () { return ((null != TheCassandraTable) ? TheCassandraTable : ""); },
cassandra: function () { return ((QueryCassandra) ? " checked" : ""); }
}
);
document.getElementById ("main").innerHTML = text;
Expand Down
6 changes: 6 additions & 0 deletions CIMWeb/src/main/webapp/js/cimsimulate.js
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ define
choice.files = response.result.files.map (function (item) { return ({ house: item.path.substring (0, item.path.indexOf (".")), file: root + item.path }); });
cimquery.query (
choice.sql,
false,
"",
"",
function (data)
Expand Down Expand Up @@ -454,6 +455,7 @@ define
{
cimquery.query (
choice.sql,
false,
"",
"",
function (players)
Expand All @@ -477,6 +479,7 @@ define
var house = row.name.substring (0, row.name.indexOf ("_"));
cimquery.query (
"select d.time, d.real, d.imag from data d where d.house = '" + house + "'", // could order by time here but it's not very fast
false,
"",
"",
function (measurements)
Expand Down Expand Up @@ -521,6 +524,7 @@ define
if (response.status == "OK")
cimquery.query (
choice.sql,
false,
"",
"",
callback);
Expand Down Expand Up @@ -823,6 +827,7 @@ define
// "select i.IdentifiedObject.mRID island, s.EquipmentContainer.ConnectivityNodeContainer.PowerSystemResource.IdentifiedObject.mRID station from TopologicalIsland i, TopologicalNode n, Terminal t, PowerTransformer p, Substation s, Bay b where n.TopologicalIsland = i.IdentifiedObject.mRID and t.TopologicalNode = n.IdentifiedObject.mRID and t.ConductingEquipment = p.ConductingEquipment.Equipment.PowerSystemResource.IdentifiedObject.mRID and p.ConductingEquipment.Equipment.EquipmentContainer = b.EquipmentContainer.ConnectivityNodeContainer.PowerSystemResource.IdentifiedObject.mRID and b.Substation = s.EquipmentContainer.ConnectivityNodeContainer.PowerSystemResource.IdentifiedObject.mRID",
// ToDo: this query assumes transformers are directly in a Substation
"select i.IdentifiedObject.mRID island, s.EquipmentContainer.ConnectivityNodeContainer.PowerSystemResource.IdentifiedObject.mRID station from TopologicalIsland i, TopologicalNode n, Terminal t, PowerTransformer p, Substation s where n.TopologicalIsland = i.IdentifiedObject.mRID and t.TopologicalNode = n.IdentifiedObject.mRID and t.ConductingEquipment = p.ConductingEquipment.Equipment.PowerSystemResource.IdentifiedObject.mRID and p.ConductingEquipment.Equipment.EquipmentContainer = s.EquipmentContainer.ConnectivityNodeContainer.PowerSystemResource.IdentifiedObject.mRID",
false,
"",
"",
render
Expand All @@ -834,6 +839,7 @@ define
// get the list of stations
cimquery.query (
"select s.EquipmentContainer.ConnectivityNodeContainer.PowerSystemResource.IdentifiedObject.mRID station from Substation s",
false,
"",
"",
function (data)
Expand Down
Loading

0 comments on commit a6b50bd

Please sign in to comment.