Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Feb 24, 2020
1 parent 4172676 commit 7938701
Showing 1 changed file with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,28 +382,27 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_0") {
// query-event-logs-version-2.0.0.txt has all types of events generated by
// Structured Streaming in Spark 2.0.0.
// Structured Streaming in Spark 2.0.0. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1)
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") {
// query-event-logs-version-2.0.1.txt has all types of events generated by
// Structured Streaming in Spark 2.0.1.
// Structured Streaming in Spark 2.0.1. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt")
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1)
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2") {
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") {
// query-event-logs-version-2.0.2.txt has all types of events generated by
// Structured Streaming in Spark 2.0.2.
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt")
// Structured Streaming in Spark 2.0.2. We added a new `runId` field in 2.1.0. But we should
// still be able to load events generated by Spark 2.0.2.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5)
}

test("listener propagates observable metrics") {
Expand Down Expand Up @@ -463,7 +462,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = {
private def testReplayListenerBusWithBorkenEventJsons(
fileName: String,
expectedEventSize: Int): Unit = {
val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
val events = mutable.ArrayBuffer[SparkListenerEvent]()
try {
Expand All @@ -479,8 +480,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
replayer.addListener(new SparkListener {})
replayer.replay(input, fileName)
// SparkListenerApplicationEnd is the only valid event
assert(events.size === 1)
assert(events(0).isInstanceOf[SparkListenerApplicationEnd])
assert(events.size === expectedEventSize)
assert(events.last.isInstanceOf[SparkListenerApplicationEnd])
} finally {
input.close()
}
Expand Down

0 comments on commit 7938701

Please sign in to comment.