Skip to content

Latest commit

 

History

History
171 lines (134 loc) · 6.91 KB

udf-guide.md

File metadata and controls

171 lines (134 loc) · 6.91 KB

Guide to User-Defined Functions (UDFs)

This is a guide to show how to use UDFs in .NET for Apache Spark.

What are UDFs

User-Defined Functions (UDFs) are a feature of Spark that allow developers to use custom functions to extend the system's built-in functionality. They transform values from a single row within a table to produce a single corresponding output value per row based on the logic defined in the UDF.

Let's take the following as an example for a UDF definition:

string s1 = "hello";
Func<Column, Column> udf = Udf<string, string>(
    str => $"{s1} {str}");

The above defined UDF takes a string as an input (in the form of a Column of a Dataframe), and returns a string with hello appended in front of the input.

For a sample Dataframe, let's take the following Dataframe df:

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

Now let's apply the above defined udf to the dataframe df:

DataFrame udfResult = df.Select(udf(df["name"]));

This would return the below as the Dataframe udfResult:

+-------------+
|         name|
+-------------+
|hello Michael|
|   hello Andy|
| hello Justin|
+-------------+

To get a better understanding of how to implement UDFs, please take a look at the UDF helper functions and some test examples.

UDF serialization

Since UDFs are functions that need to be executed on the workers, they have to be serialized and sent to the workers as part of the payload from the driver. This involves serializing the delegate which is a reference to the method, along with its target which is the class instance on which the current delegate invokes the instance method. Please take a look at this code to get a better understanding of how UDF serialization is being done.

Good to know while implementing UDFs

One behavior to be aware of while implementing UDFs in .NET for Apache Spark is how the target of the UDF gets serialized. .NET for Apache Spark uses .NET 6, which does not support serializing delegates, so it is instead done by using reflection to serialize the target where the delegate is defined. When multiple delegates are defined in a common scope, they have a shared closure that becomes the target of reflection for serialization. Let's take an example to illustrate what that means.

The following code snippet defines two string variables that are being referenced in two function delegates that return the respective strings as result:

using System;

public class C {
    public void M() {
        string s1 = "s1";
        string s2 = "s2";
        Func<string, string> a = str => s1;
        Func<string, string> b = str => s2;
    }
}

The above C# code generates the following C# disassembly (credit source: sharplab.io) code from the compiler:

public class C
{
    [CompilerGenerated]
    private sealed class <>c__DisplayClass0_0
    {
        public string s1;

        public string s2;

        internal string <M>b__0(string str)
        {
            return s1;
        }

        internal string <M>b__1(string str)
        {
            return s2;
        }
    }

    public void M()
    {
        <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0();
        <>c__DisplayClass0_.s1 = "s1";
        <>c__DisplayClass0_.s2 = "s2";
        Func<string, string> func = new Func<string, string>(<>c__DisplayClass0_.<M>b__0);
        Func<string, string> func2 = new Func<string, string>(<>c__DisplayClass0_.<M>b__1);
    }
}

As can be seen in the above decompiled code, both func and func2 share the same closure <>c__DisplayClass0_0, which is the target that is serialized when serializing the delegates func and func2. Hence, even though Func<string, string> a is only referencing s1, s2 also gets serialized when sending over the bytes to the workers.

This can lead to some unexpected behaviors at runtime (like in the case of using broadcast variables), which is why we recommend restricting the visibility of the variables used in a function to that function's scope.

Going back to the above example, the following is the recommended way to implement the desired behavior of previous code snippet:

using System;

public class C {
    public void M() {
        {
            string s1 = "s1";
            Func<string, string> a = str => s1;
        }
        {
            string s2 = "s2";
            Func<string, string> b = str => s2;
        }
    }
}

The above C# code generates the following C# disassembly (credit source: sharplab.io) code from the compiler:

public class C
{
    [CompilerGenerated]
    private sealed class <>c__DisplayClass0_0
    {
        public string s1;

        internal string <M>b__0(string str)
        {
            return s1;
        }
    }

    [CompilerGenerated]
    private sealed class <>c__DisplayClass0_1
    {
        public string s2;

        internal string <M>b__1(string str)
        {
            return s2;
        }
    }

    public void M()
    {
        <>c__DisplayClass0_0 <>c__DisplayClass0_ = new <>c__DisplayClass0_0();
        <>c__DisplayClass0_.s1 = "s1";
        Func<string, string> func = new Func<string, string>(<>c__DisplayClass0_.<M>b__0);
        <>c__DisplayClass0_1 <>c__DisplayClass0_2 = new <>c__DisplayClass0_1();
        <>c__DisplayClass0_2.s2 = "s2";
        Func<string, string> func2 = new Func<string, string>(<>c__DisplayClass0_2.<M>b__1);
    }
}

Here we see that func and func2 no longer share a closure and have their own separate closures <>c__DisplayClass0_0 and <>c__DisplayClass0_1 respectively. When used as the target for serialization, nothing other than the referenced variables will get serialized for the delegate.

This behavior is important to keep in mind while implementing multiple UDFs in a common scope. To learn more about UDFs in general, please review the following articles that explain UDFs and how to use them: UDFs in databricks(scala), Spark UDFs and some gotchas.