Skip to content

Commit

Permalink
remove CIMConnector cruft
Browse files Browse the repository at this point in the history
  • Loading branch information
derrickoswald committed Nov 22, 2017
1 parent 3c044a5 commit 9cb9f4e
Show file tree
Hide file tree
Showing 19 changed files with 217 additions and 730 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public ResultSetInfo getResultSetInfo () throws ResourceException
*/
public void close () throws ResourceException
{
_ManagedConnection.close (this);
if (_Valid)
_ManagedConnection.close (this);
else
throw new ResourceException (CLOSED_ERROR);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ public interface CIMFunction extends Serializable
*/
static String FUNCTION = "function";

/**
* The key in the input CIMMappedRecord holding the query to execute.
*/
static String QUERY = "query";

/**
* The key in the output CIMMappedRecord holding the CIMFunction result.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package ch.ninecode.cim.connector;

import java.util.HashMap;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import javax.json.JsonStructure;
import javax.resource.ResourceException;
import javax.resource.cci.Connection;
Expand All @@ -18,11 +15,12 @@
public class CIMInteractionImpl implements Interaction
{

private static final String INVALID_CONNECTION = "Invalid (%1$s) Connection";
private static final String CLOSED_ERROR = "Connection closed";
private static final String INVALID_FUNCTION_ERROR = "Invalid function";
private static final String FUNCTION_MISSING_ERROR = "CIMFunction not provided";
private static final String INVALID_INPUT_ERROR = "Invalid input record for function";
private static final String INVALID_OUTPUT_ERROR = "Invalid output record for function";
private static final String NOT_IMPLEMENTED = "Not implemented";

protected CIMConnection _Connection;
protected boolean _Valid;
Expand All @@ -35,9 +33,9 @@ public CIMInteractionImpl (Connection connection) throws ResourceException

super ();
if (null == connection)
throw new ResourceException ("null cannot be used as a connection object");
throw new ResourceException (String.format (INVALID_CONNECTION, "null"));
else if (!connection.getClass ().isAssignableFrom (CIMConnection.class))
throw new ResourceException ("object of class " + connection.getClass ().toGenericString () + " cannot be used as a connection object");
throw new ResourceException (String.format (INVALID_CONNECTION, connection.getClass ().toGenericString ()));
else
_Connection = (CIMConnection)connection;
_Valid = true;
Expand All @@ -60,137 +58,13 @@ public Connection getConnection ()
return (_Connection);
}

protected Dataset<Row> readFile (SparkSession session, String filename) throws ResourceException
{
String[] files = filename.split (",");
HashMap<String,String> options = new HashMap<> ();
options.put ("path", filename);
options.put ("StorageLevel", "MEMORY_AND_DISK_SER");

return (session.read ().format ("ch.ninecode.cim").options (options).load (files));
}

/**
* @see Interaction#execute(InteractionSpec, Record, Record)
*/
public boolean execute (InteractionSpec ispec, Record input, Record output)
throws ResourceException
{
boolean ret;

ret = false;
if (_Valid)
{
if ((null == ispec) || (!ispec.getClass ().isAssignableFrom (CIMInteractionSpecImpl.class)))
throw new ResourceException (INVALID_FUNCTION_ERROR);
else
{
CIMInteractionSpecImpl _spec = (CIMInteractionSpecImpl) ispec;
switch (_spec.getFunctionName ())
{
case CIMInteractionSpec.READ_FUNCTION:
if (input.getRecordName ().equals (CIMMappedRecord.INPUT))
if (output.getRecordName ().equals (CIMMappedRecord.OUTPUT))
{
((CIMMappedRecord) output).clear ();
try
{
String filename = (String)((CIMMappedRecord) input).get ("filename");
SparkSession session = ((CIMConnection)getConnection ())._ManagedConnection._SparkSession;
long num = readFile (session, filename).count ();
((CIMMappedRecord) output).put ("count", num);
ret = true;
}
catch (Exception exception)
{
throw new ResourceException (exception.getLocalizedMessage (), exception);
}
}
else
throw new ResourceException (INVALID_OUTPUT_ERROR);
else
throw new ResourceException (INVALID_INPUT_ERROR);
break;
case CIMInteractionSpec.GET_STRING_FUNCTION:
if (input.getRecordName ().equals (CIMMappedRecord.INPUT))
if (output.getRecordName ().equals (CIMMappedRecord.OUTPUT))
{
((CIMMappedRecord) output).clear ();
try
{
CIMMappedRecord record = (CIMMappedRecord)input;
CIMConnection connection = (CIMConnection)getConnection ();
String filename = record.get ("filename").toString ();
String cls = record.get ("class").toString ();
String method = record.get ("method").toString ();
SparkSession session = connection._ManagedConnection._SparkSession;
Object jars = record.get ("jars");
if (null != jars)
for (String jar: jars.toString ().split (","))
if (!session.sparkContext ().jars().contains (jar))
session.sparkContext ().addJar (jar);
StringBuilder args = new StringBuilder();
for (Object key : record.keySet ())
if ((key != "filename") && (key != "class") && (key != "method") && (key != "jars"))
{
args.append ((0 == args.length ()) ? "" : ",");
args.append (key.toString ());
args.append ("=");
args.append (record.get (key).toString ());
}
try
{
Class<?> c = Class.forName (cls);
Object _obj = c.newInstance();

Method[] allMethods = c.getDeclaredMethods();
for (Method _method : allMethods)
{
String name = _method.getName();
if (name.equals (method))
{
try
{
long num = readFile (session, filename).count ();
System.out.println ("" + num + " elements");
_method.setAccessible (true);
Object o = _method.invoke (_obj, session, args.toString());
String result = (String)o;
((CIMMappedRecord) output).put ("result", result);
ret = true;
}
catch (InvocationTargetException ite)
{
throw new ResourceException (ite.getLocalizedMessage (), ite);
}
break;
}
}
}
catch (ClassNotFoundException cnfe)
{
throw new ResourceException (cnfe.getLocalizedMessage (), cnfe);
}
}
catch (Exception exception)
{
throw new ResourceException (exception.getLocalizedMessage (), exception);
}
}
else
throw new ResourceException (INVALID_OUTPUT_ERROR);
else
throw new ResourceException (INVALID_INPUT_ERROR);
break;
default:
throw new ResourceException (INVALID_FUNCTION_ERROR);
}
}
}
else
throw new ResourceException (CLOSED_ERROR);

return (ret);
throw new ResourceException (NOT_IMPLEMENTED);
}

/**
Expand Down Expand Up @@ -260,7 +134,7 @@ public Record execute (InteractionSpec ispec, Record input) throws ResourceExcep
try
{
CIMMappedRecord record = (CIMMappedRecord)input;
String query = record.get ("query").toString ();
String query = record.get (CIMFunction.QUERY).toString ();
SparkSession session = ((CIMConnection)getConnection ())._ManagedConnection._SparkSession;
Dataset<Row> result = session.sqlContext ().sql (query);
ret = new CIMResultSet (result.schema (), result.collectAsList ());
Expand All @@ -272,84 +146,12 @@ public Record execute (InteractionSpec ispec, Record input) throws ResourceExcep
else
throw new ResourceException (INVALID_INPUT_ERROR);
break;
case CIMInteractionSpec.EXECUTE_METHOD_FUNCTION:
if (input.getRecordName ().equals (CIMMappedRecord.INPUT))
try
{
CIMMappedRecord record = (CIMMappedRecord)input;
CIMConnection connection = (CIMConnection)getConnection ();
String filename = record.get ("filename").toString ();
String cls = record.get ("class").toString ();
String method = record.get ("method").toString ();
SparkSession session = connection._ManagedConnection._SparkSession;
Object jars = record.get ("jars");
if (null != jars)
for (String jar: jars.toString ().split (","))
if (!session.sparkContext ().jars().contains (jar))
session.sparkContext ().addJar (jar);
// ToDo: don't know the mapping from Java world to Scala world
// HashMap<String,String> map = new HashMap<String,String> ();
// for (Object key: record.keySet ())
// if ((key != "filename") && (key != "class") && (key != "method"))
// map.put (key.toString (), (String)record.get (key));
StringBuilder args = new StringBuilder ();
for (Object key: record.keySet ())
if ((key != "filename") && (key != "class") && (key != "method") && (key != "jars"))
{
args.append ((0 == args.length ()) ? "" : ",");
args.append (key.toString ());
args.append ("=");
args.append (record.get (key).toString ());
}
try
{
Class<?> c = Class.forName (cls);
Object _obj = c.newInstance();

Method[] allMethods = c.getDeclaredMethods();
for (Method _method : allMethods)
{
String name = _method.getName();
if (name.equals (method))
{
try
{
System.out.println ("readFile " + filename);
long num = readFile (session, filename).count ();
System.out.println ("" + num + " elements");
_method.setAccessible (true);
System.out.println (method + " (sc, sql, \"" + args + "\")");
Object o = _method.invoke (_obj, session, args.toString ());
System.out.println ("got a result");
@SuppressWarnings ("unchecked")
Dataset<Row> result = (Dataset<Row>)o;
System.out.println ("it's a DataFrame with " + result.count () + " rows");
ret = new CIMResultSet (result.schema (), result.collectAsList ());
}
catch (InvocationTargetException ite)
{
throw new ResourceException (ite.getLocalizedMessage (), ite);
}
break;
}
}
}
catch (ClassNotFoundException cnfe)
{
throw new ResourceException (cnfe.getLocalizedMessage (), cnfe);
}
}
catch (Exception exception)
{
throw new ResourceException (exception.getLocalizedMessage (), exception);
}
else
throw new ResourceException (INVALID_INPUT_ERROR);
break;
}
}

}
else
throw new ResourceException (CLOSED_ERROR);

return (ret);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@

public interface CIMInteractionSpec extends InteractionSpec
{
static final String READ_FUNCTION = "read";
static final String GET_DATAFRAME_FUNCTION = "getDataFrame";
static final String EXECUTE_METHOD_FUNCTION = "executeMethodReturningDataFrame";
static final String GET_STRING_FUNCTION = "getString";
static final String EXECUTE_CIM_FUNCTION = "executeCIMFunction";

String getFunctionName ();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public String getSpecVersion ()
@Override
public boolean supportsExecuteWithInputAndOutputRecord ()
{
return (true);
return (false);
}

@Override
Expand Down
Loading

0 comments on commit 9cb9f4e

Please sign in to comment.