Skip to content

Commit

Permalink
Load Assembly referenced from within a lambda closure (#135)
Browse files Browse the repository at this point in the history
  • Loading branch information
suhsteve authored and imback82 committed Jul 10, 2019
1 parent 815c2a9 commit eb9cffb
Show file tree
Hide file tree
Showing 6 changed files with 405 additions and 31 deletions.
16 changes: 8 additions & 8 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ jobs:
inputs:
command: test
projects: '**/*UnitTest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'

- task: DotNetCoreCLI@2
displayName: 'E2E tests for Spark 2.3.0'
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.0-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -73,7 +73,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.1-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -84,7 +84,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.2-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -95,7 +95,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.3.3-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -106,7 +106,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.0-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -117,7 +117,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.1-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand All @@ -128,7 +128,7 @@ jobs:
inputs:
command: test
projects: '**/Microsoft.Spark.E2ETest/*.csproj'
arguments: '--configuration $(buildConfiguration) /p:CollectCoverage=true /p:CoverletOutputFormat=cobertura'
arguments: '--configuration $(buildConfiguration)'
env:
SPARK_HOME: $(Build.BinariesDirectory)\spark-2.4.3-bin-hadoop2.7
HADOOP_HOME: $(Build.BinariesDirectory)\hadoop
Expand Down
173 changes: 173 additions & 0 deletions src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.IO;
using System.Reflection;
using System.Runtime.Serialization.Formatters.Binary;
using Microsoft.Spark.Utils;
using Xunit;

namespace Microsoft.Spark.UnitTest
{
public class UdfSerDeTests
{
[Serializable]
private class TestClass
{
private string _str;

public TestClass(string s)
{
_str = s;
}

public string Concat(string s)
{
if (_str == null)
{
return s + s;
}

return _str + s;
}

public override bool Equals(object obj)
{
var that = obj as TestClass;

if (that == null)
{
return false;
}

return _str == that._str;
}

public override int GetHashCode()
{
return base.GetHashCode();
}
}

[Fact]
public void TestUdfSerDe()
{
{
// Without closure.
Func<int, int> expectedUdf = i => 10 * i;
Delegate actualUdf = SerDe(expectedUdf);

VerifyUdfSerDe(expectedUdf, actualUdf, false);
Assert.Equal(100, ((Func<int, int>)actualUdf)(10));
}

{
// With closure where the delegate target is an anonymous class.
// The target will contain fields ["tc1", "tc2"], where "tc1" is
// non null and "tc2" is null.
TestClass tc1 = new TestClass("Test");
TestClass tc2 = null;
Func<string, string> expectedUdf =
(s) =>
{
if (tc2 == null)
{
return tc1.Concat(s);
}
return s;
};
Delegate actualUdf = SerDe(expectedUdf);

VerifyUdfSerDe(expectedUdf, actualUdf, true);
Assert.Equal("TestHelloWorld", ((Func<string, string>)actualUdf)("HelloWorld"));
}

{
// With closure where the delegate target is TestClass
// and target's field "_str" is set to "Test".
TestClass tc = new TestClass("Test");
Func<string, string> expectedUdf = tc.Concat;
Delegate actualUdf = SerDe(expectedUdf);

VerifyUdfSerDe(expectedUdf, actualUdf, true);
Assert.Equal("TestHelloWorld", ((Func<string, string>)actualUdf)("HelloWorld"));
}

{
// With closure where the delegate target is TestClass,
// and target's field "_str" is set to null.
TestClass tc = new TestClass(null);
Func<string, string> expectedUdf = tc.Concat;
Delegate actualUdf = SerDe(expectedUdf);

VerifyUdfSerDe(expectedUdf, actualUdf, true);
Assert.Equal(
"HelloWorldHelloWorld",
((Func<string, string>)actualUdf)("HelloWorld"));
}
}

private void VerifyUdfSerDe(Delegate expectedUdf, Delegate actualUdf, bool hasClosure)
{
VerifyUdfData(
UdfSerDe.Serialize(expectedUdf),
UdfSerDe.Serialize(actualUdf),
hasClosure);
VerifyDelegate(expectedUdf, actualUdf);
}

private void VerifyUdfData(
UdfSerDe.UdfData expectedUdfData,
UdfSerDe.UdfData actualUdfData,
bool hasClosure)
{
Assert.Equal(expectedUdfData, actualUdfData);

if (!hasClosure)
{
Assert.Null(expectedUdfData.TargetData.Fields);
Assert.Null(actualUdfData.TargetData.Fields);
}
}

private void VerifyDelegate(Delegate expectedDelegate, Delegate actualDelegate)
{
Assert.Equal(expectedDelegate.GetType(), actualDelegate.GetType());
Assert.Equal(expectedDelegate.Method, actualDelegate.Method);
Assert.Equal(expectedDelegate.Target.GetType(), actualDelegate.Target.GetType());

FieldInfo[] expectedFields = expectedDelegate.Target.GetType().GetFields();
FieldInfo[] actualFields = actualDelegate.Target.GetType().GetFields();
Assert.Equal(expectedFields, actualFields);
}

private Delegate SerDe(Delegate udf)
{
return Deserialize(Serialize(udf));
}

private byte[] Serialize(Delegate udf)
{
UdfSerDe.UdfData udfData = UdfSerDe.Serialize(udf);

using (var ms = new MemoryStream())
{
var bf = new BinaryFormatter();
bf.Serialize(ms, udfData);
return ms.ToArray();
}
}

private Delegate Deserialize(byte[] serializedUdf)
{
using (var ms = new MemoryStream(serializedUdf, false))
{
var bf = new BinaryFormatter();
UdfSerDe.UdfData udfData = (UdfSerDe.UdfData)bf.Deserialize(ms);
return UdfSerDe.Deserialize(udfData);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
<PackageReference Include="System.Memory" Version="4.5.2" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netcoreapp2.1' ">
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Microsoft.Spark\Microsoft.Spark.csproj" />
</ItemGroup>
Expand Down
11 changes: 11 additions & 0 deletions src/csharp/Microsoft.Spark.Worker/Processor/CommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,23 @@
using Microsoft.Spark.Utils;
using static Microsoft.Spark.Utils.UdfUtils;

#if NETCOREAPP
using System.Runtime.Loader;
#endif

namespace Microsoft.Spark.Worker.Processor
{
internal sealed class CommandProcessor
{
private readonly Version _version;

#if NETCOREAPP
static CommandProcessor()
{
UdfSerDe.AssemblyLoader = AssemblyLoadContext.Default.LoadFromAssemblyPath;
}
#endif

internal CommandProcessor(Version version)
{
_version = version;
Expand Down
2 changes: 1 addition & 1 deletion src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private static void SerializeUdfs(

foreach (UdfSerDe.FieldData field in fields)
{
SerializeUdfs((Delegate)field.Value, curNode, udfWrapperNodes, udfs);
SerializeUdfs((Delegate)field.ValueData.Value, curNode, udfWrapperNodes, udfs);
}
}

Expand Down
Loading

0 comments on commit eb9cffb

Please sign in to comment.