diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 4d89c4f079f29..aee986230d4b5 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -22,7 +22,7 @@ import java.util.{Properties, UUID} import scala.collection.JavaConverters._ import scala.collection.Map -import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.json4s.DefaultFormats import org.json4s.JsonAST._ @@ -59,6 +59,7 @@ private[spark] object JsonProtocol { private implicit val format = DefaultFormats private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) /** ------------------------------------------------- * * JSON serialization methods for SparkListenerEvents | diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a2a4b3aa974fc..f2b9f97d6cf7d 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -481,6 +481,28 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), blocks, JString(blocks.toString)) testAccumValue(Some("anything"), 123, JString("123")) } + + test("SPARK-30936: forwards compatibility - ignore unknown fields") { + val expected = TestListenerEvent("foo", 123) + val unknownFieldsJson = + """{ + | "Event" : "org.apache.spark.util.TestListenerEvent", + | "foo" : "foo", + | "bar" : 123, + | "unknown" : "unknown" + |}""".stripMargin + assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) + } + + test("SPARK-30936: backwards compatibility - set default values for missing fields") { + val expected = TestListenerEvent("foo", 0) + val unknownFieldsJson = + """{ + | "Event" : "org.apache.spark.util.TestListenerEvent", + | "foo" : "foo" + |}""".stripMargin + assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) + } } @@ -2310,3 +2332,5 @@ private[spark] object JsonProtocolSuite extends Assertions { |} """.stripMargin } + +case class TestListenerEvent(foo: String, bar: Int) extends SparkListenerEvent diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 9d0f829ac9684..6bb1646becf22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -382,28 +382,27 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_0") { // query-event-logs-version-2.0.0.txt has all types of events generated by - // Structured Streaming in Spark 2.0.0. + // Structured Streaming in Spark 2.0.0. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt") + testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1) } - testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") { // query-event-logs-version-2.0.1.txt has all types of events generated by - // Structured Streaming in Spark 2.0.1. + // Structured Streaming in Spark 2.0.1. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt") + testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1) } - testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") { // query-event-logs-version-2.0.2.txt has all types of events generated by - // Structured Streaming in Spark 2.0.2. - // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it - // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt") + // Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events + // in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2. + testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5) } test("listener propagates observable metrics") { @@ -463,7 +462,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = { + private def testReplayListenerBusWithBorkenEventJsons( + fileName: String, + expectedEventSize: Int): Unit = { val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName") val events = mutable.ArrayBuffer[SparkListenerEvent]() try { @@ -479,8 +480,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { replayer.addListener(new SparkListener {}) replayer.replay(input, fileName) // SparkListenerApplicationEnd is the only valid event - assert(events.size === 1) - assert(events(0).isInstanceOf[SparkListenerApplicationEnd]) + assert(events.size === expectedEventSize) + assert(events.last.isInstanceOf[SparkListenerApplicationEnd]) } finally { input.close() }