Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load Assembly referenced from within a lambda closure #135

Merged
merged 34 commits into from
Jul 10, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
d309819
initial commit
suhsteve Jun 10, 2019
3535914
add method description
suhsteve Jun 10, 2019
8831972
Remove loadAssembly. Add custom serialization to Field Value
suhsteve Jun 12, 2019
27f8879
check for null field value
suhsteve Jun 12, 2019
bc89b28
formatting
suhsteve Jun 12, 2019
b55e426
remove coverlet test code coverage
suhsteve Jun 13, 2019
cd1225e
remove empty line
suhsteve Jun 13, 2019
7942a74
Merge branch 'master' into stsuh/udfclosure
imback82 Jun 15, 2019
53c6073
Merge branch 'master' into stsuh/udfclosure
imback82 Jun 15, 2019
8002d78
PR comments
suhsteve Jun 27, 2019
a06e48d
UdfSerDe UnitTests
suhsteve Jun 28, 2019
bf1ddb4
formatting
suhsteve Jun 28, 2019
786b34f
formatting
suhsteve Jun 28, 2019
c28b93c
PR comments
suhsteve Jun 28, 2019
42d91d8
Allow creation of instance of type, bypassing ctor()
suhsteve Jun 29, 2019
6c17292
add test
suhsteve Jun 29, 2019
8180600
reorg
suhsteve Jun 29, 2019
04443e6
Merge branch 'master' into stsuh/udfclosure
rapoth Jun 29, 2019
e4ecc1a
check if file exists instead of relying on try/catch
suhsteve Jul 2, 2019
1e40164
cache loading the assembly
suhsteve Jul 2, 2019
c93f740
remove space
suhsteve Jul 2, 2019
2b3041f
Merge branch 'master' into stsuh/udfclosure
imback82 Jul 5, 2019
665555b
PR comments
suhsteve Jul 8, 2019
f5edb33
rename var
suhsteve Jul 8, 2019
5c335ca
PR comments
suhsteve Jul 8, 2019
5389b0a
PR comments
suhsteve Jul 9, 2019
e50887d
remove debug statement
suhsteve Jul 9, 2019
ce2f502
wording
suhsteve Jul 9, 2019
d7004ed
PR comments
suhsteve Jul 9, 2019
05e1bed
Merge branch 'master' into stsuh/udfclosure
imback82 Jul 9, 2019
8cfc574
PR comment
suhsteve Jul 9, 2019
a7d0ef8
Merge branch 'stsuh/udfclosure' of https://github.com/suhsteve/spark …
suhsteve Jul 9, 2019
0cb9bb4
PR comment
suhsteve Jul 9, 2019
4b64123
char limit
suhsteve Jul 9, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ jobs:
inputs:
command: test
projects: '**/*UnitTest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
imback82 marked this conversation as resolved.
Show resolved Hide resolved
arguments: '--configuration $(buildConfiguration)'

- task: DotNetCoreCLI@2
displayName: 'E2E tests for Spark 2.3.0'
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.0-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -73,7 +73,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.1-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -84,7 +84,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.2-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -95,7 +95,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.3-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -106,7 +106,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.0-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -117,7 +117,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.1-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -128,7 +128,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.3-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand Down
2 changes: 1 addition & 1 deletion src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private static void SerializeUdfs(

foreach (UdfSerDe.FieldData field in fields)
{
SerializeUdfs((Delegate)field.Value, curNode, udfWrapperNodes, udfs);
SerializeUdfs((Delegate)field.ValueData.Value, curNode, udfWrapperNodes, udfs);
}
}

Expand Down
73 changes: 65 additions & 8 deletions src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;

namespace Microsoft.Spark.Utils
{
Expand Down Expand Up @@ -68,7 +71,51 @@ internal sealed class FieldData
{
public TypeData TypeData { get; set; }
public string Name { get; set; }
public ValueData ValueData { get; set; }
}

[Serializable]
internal sealed class ValueData : ISerializable
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
{
public ValueData() { }

public TypeData TypeData { get; set; }

public object Value { get; set; }

public void GetObjectData(SerializationInfo info, StreamingContext context)
{
info.AddValue("TypeData", TypeData, typeof(TypeData));

var bf = new BinaryFormatter();
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
using (var ms = new MemoryStream())
{
bf.Serialize(ms, Value);
imback82 marked this conversation as resolved.
Show resolved Hide resolved
info.AddValue("ValueSerialized", ms.ToArray(), typeof(byte[]));
}
}

public ValueData(SerializationInfo info, StreamingContext context)
{
TypeData = (TypeData)info.GetValue("TypeData", typeof(TypeData));

var valueSerialized = (byte[])info.GetValue("ValueSerialized", typeof(byte[]));

var bf = new BinaryFormatter();
using (var ms = new MemoryStream(valueSerialized, false))
{
try
{
Value = bf.Deserialize(ms);
}
catch (SerializationException)
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
{
ms.Seek(0, SeekOrigin.Begin);
DeserializeType(TypeData);
Value = bf.Deserialize(ms);
}
}
}
}

internal static UdfData Serialize(Delegate udf)
suhsteve marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -125,17 +172,27 @@ private static TargetData SerializeTarget(object target)
Type targetType = target.GetType();
TypeData targetTypeData = SerializeType(targetType);

System.Collections.Generic.IEnumerable<FieldData> fields = targetType.GetFields(
var fields = new List<FieldData>();
foreach (var field in targetType.GetFields(
BindingFlags.Instance |
BindingFlags.Static |
BindingFlags.Public |
BindingFlags.NonPublic).
Select((field) => new FieldData()
BindingFlags.NonPublic))
{
object value = field.GetValue(target);
var fieldData = new FieldData()
{
TypeData = SerializeType(field.FieldType),
Name = field.Name,
ValueData = new ValueData()
{
TypeData = SerializeType(field.FieldType),
Name = field.Name,
Value = field.GetValue(target)
});
TypeData = (value != null) ? SerializeType(value.GetType()) : default,
Value = value
}
};

fields.Add(fieldData);
}

// Even when an UDF does not have any closure, GetFields() returns some fields
// which include Func<> of the udf specified.
Expand Down Expand Up @@ -166,7 +223,7 @@ private static object DeserializeTargetData(TargetData targetData)
field.Name,
BindingFlags.Instance |
BindingFlags.Public |
BindingFlags.NonPublic).SetValue(target, field.Value);
BindingFlags.NonPublic).SetValue(target, field.ValueData.Value);
}

return target;
Expand Down