Skip to content

Commit

Permalink
[SPARK-30936][CORE] Set FAIL_ON_UNKNOWN_PROPERTIES to false by defaul…
Browse files Browse the repository at this point in the history
…t to parse Spark events

### What changes were proposed in this pull request?

Set `FAIL_ON_UNKNOWN_PROPERTIES` to `false` in `JsonProtocol` to allow ignore unknown fields in a Spark event. After this change, if we add new fields to a Spark event parsed by `ObjectMapper`, the event json string generated by a new Spark version can still be read by an old Spark History Server.

Since Spark History Server is an extra service, it usually takes time to upgrade, and it's possible that a Spark application is upgraded before SHS. Forwards-compatibility will allow an old SHS to support new Spark applications (may lose some new features but most of functions should still work).

### Why are the changes needed?

`JsonProtocol` is supposed to provide strong backwards-compatibility and forwards-compatibility guarantees: any version of Spark should be able to read JSON output written by any other version, including newer versions.

However, the forwards-compatibility guarantee is broken for events parsed by `ObjectMapper`. If a new field is added to an event parsed by `ObjectMapper` (e.g., 6dc5921#diff-dc5c7a41fbb7479cef48b67eb41ad254R33), the event json string generated by a new Spark version cannot be parsed by an old version of SHS right now.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

The new added tests.

Closes #27680 from zsxwing/SPARK-30936.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 3126557)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
zsxwing authored and cloud-fan committed Feb 25, 2020
1 parent e19f478 commit b37b085
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.{Properties, UUID}
import scala.collection.JavaConverters._
import scala.collection.Map

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
Expand Down Expand Up @@ -59,6 +59,7 @@ private[spark] object JsonProtocol {
private implicit val format = DefaultFormats

private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,28 @@ class JsonProtocolSuite extends SparkFunSuite {
testAccumValue(Some("anything"), blocks, JString(blocks.toString))
testAccumValue(Some("anything"), 123, JString("123"))
}

test("SPARK-30936: forwards compatibility - ignore unknown fields") {
val expected = TestListenerEvent("foo", 123)
val unknownFieldsJson =
"""{
| "Event" : "org.apache.spark.util.TestListenerEvent",
| "foo" : "foo",
| "bar" : 123,
| "unknown" : "unknown"
|}""".stripMargin
assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected)
}

test("SPARK-30936: backwards compatibility - set default values for missing fields") {
val expected = TestListenerEvent("foo", 0)
val unknownFieldsJson =
"""{
| "Event" : "org.apache.spark.util.TestListenerEvent",
| "foo" : "foo"
|}""".stripMargin
assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected)
}
}


Expand Down Expand Up @@ -2310,3 +2332,5 @@ private[spark] object JsonProtocolSuite extends Assertions {
|}
""".stripMargin
}

case class TestListenerEvent(foo: String, bar: Int) extends SparkListenerEvent
Original file line number Diff line number Diff line change
Expand Up @@ -382,28 +382,27 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_0") {
// query-event-logs-version-2.0.0.txt has all types of events generated by
// Structured Streaming in Spark 2.0.0.
// Structured Streaming in Spark 2.0.0. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1)
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") {
// query-event-logs-version-2.0.1.txt has all types of events generated by
// Structured Streaming in Spark 2.0.1.
// Structured Streaming in Spark 2.0.1. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt")
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1)
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2") {
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") {
// query-event-logs-version-2.0.2.txt has all types of events generated by
// Structured Streaming in Spark 2.0.2.
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt")
// Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events
// in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5)
}

test("listener propagates observable metrics") {
Expand Down Expand Up @@ -463,7 +462,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = {
private def testReplayListenerBusWithBorkenEventJsons(
fileName: String,
expectedEventSize: Int): Unit = {
val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
val events = mutable.ArrayBuffer[SparkListenerEvent]()
try {
Expand All @@ -479,8 +480,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
replayer.addListener(new SparkListener {})
replayer.replay(input, fileName)
// SparkListenerApplicationEnd is the only valid event
assert(events.size === 1)
assert(events(0).isInstanceOf[SparkListenerApplicationEnd])
assert(events.size === expectedEventSize)
assert(events.last.isInstanceOf[SparkListenerApplicationEnd])
} finally {
input.close()
}
Expand Down

0 comments on commit b37b085

Please sign in to comment.