8000 Merge branch 'master' of github.com:holdenk/spark-testing-base · randomf/spark-testing-base@f5d80dc · GitHub
[go: up one dir, main page]

Skip to content

Commit f5d80dc

Browse files
committed
Merge branch 'master' of github.com:holdenk/spark-testing-base
2 parents 6aac1c5 + 6ea1b36 commit f5d80dc

File tree

9 files changed

+213
-7
lines changed
  • test/2.2/scala/com/holdenkarau/spark/testing
  • 9 files changed

    +213
    -7
    lines changed

    README.md

    Lines changed: 3 additions & 3 deletions
    Original file line numberDiff line numberDiff line change
    @@ -12,10 +12,10 @@ This is not my beautiful code.
    1212

    1313
    ## How?
    1414

    15-
    So you include com.holdenkarau.spark-testing-base [spark_version]_0.7.2 and extend one of the classes and write some simple tests instead. For example to include this in a project using Spark 2.2.0:
    15+
    So you include com.holdenkarau.spark-testing-base [spark_version]_0.7.4 and extend one of the classes and write some simple tests instead. For example to include this in a project using Spark 2.2.0:
    1616

    1717
    ```scala
    18-
    "com.holdenkarau" %% "spark-testing-base" % "2.2.0_0.7.2" % "test"
    18+
    "com.holdenkarau" %% "spark-testing-base" % "2.2.0_0.7.4" % "test"
    1919
    ```
    2020

    2121
    or
    @@ -24,7 +24,7 @@ or
    2424
    <dependency>
    2525
    <groupId>com.holdenkarau</groupId>
    2626
    <artifactId>spark-testing-base_2.11</artifactId>
    27-
    <version>${spark.version}_0.7.2</version>
    27+
    <version>${spark.version}_0.7.4</version>
    2828
    <scope>test</scope>
    2929
    </dependency>
    3030
    ```

    RELEASE_NOTES.md

    Lines changed: 3 additions & 3 deletions
    Original file line numberDiff line numberDiff line change
    @@ -4,17 +4,17 @@
    44
    - Re-add Scala 2.10 support up to and including Spark 2.2.X series
    55
    - Attempt to make it so that users doing SQL tests without Hive don't need the hive jars.
    66
    - Don't reset the SparkSession provider when in reuse mode.
    7-
    - Add workaround for inaccessiable active context info in Spark 2.0
    7+
    - Add workaround for inaccessible active context info in Spark 2.0
    88
    - Upgrade to Hadoop 2.8.1 for mini cluster
    99
    - Change build env after travis changes
    1010
    # 0.7.2
    11-
    - Add expiremental support to for reusing a SparkContext/Session accross multiple suites. For Spark 2.0+ only.
    11+
    - Add experimental support to for reusing a SparkContext/Session across multiple suites. For Spark 2.0+ only.
    1212
    # 0.7.1
    1313
    - Upgrade mini cluster hadoop dependencies
    1414
    - Add support for Spark 2.2.0
    1515
    - YARNCluster now requires SPARK_HOME to be set so as to configure spark.yarn.jars (workaround for YARN bug from deprecated code in Spark 2.2).
    1616
    # 0.7
    17-
    - Add Python RDD comparisions
    17+
    - Add Python RDD comparisons
    1818
    - Switch to JDK8 for Spark 2.1.1+
    1919
    - Add back Kafka tests
    2020
    - Make it easier to disable Hive support when running tests

    build.sbt

    Lines changed: 9 additions & 1 deletion
    Original file line numberDiff line numberDiff line change
    @@ -123,7 +123,15 @@ unmanagedSourceDirectories in Compile := {
    123123
    }
    124124

    125125
    unmanagedSourceDirectories in Test := {
    126-
    if (sparkVersion.value >= "2.0.0") Seq(
    126+
    if (sparkVersion.value >= "2.2.0") Seq(
    127+
    (sourceDirectory in Test)(_ / "2.2/scala"),
    128+
    (sourceDirectory in Test)(_ / "2.0/scala"),
    129+
    (sourceDirectory in Test)(_ / "1.6/scala"), (sourceDirectory in Test)(_ / "1.6/java"),
    130+
    (sourceDirectory in Test)(_ / "1.4/scala"),
    131+
    (sourceDirectory in Test)(_ / "kafka/scala"),
    132+
    (sourceDirectory in Test)(_ / "1.3/scala"), (sourceDirectory in Test)(_ / "1.3/java")
    133+
    ).join.value
    134+
    else if (sparkVersion.value >= "2.0.0") Seq(
    127135
    (sourceDirectory in Test)(_ / "2.0/scala"),
    128136
    (sourceDirectory in Test)(_ / "1.6/scala"), (sourceDirectory in Test)(_ / "1.6/java"),
    129137
    (sourceDirectory in Test)(_ / "1.4/scala"),

    python/sparktestingbase/sqltestcase.py

    Lines changed: 1 addition & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -66,6 +66,7 @@ def assertDataFrameEqual(self, expected, result, tol=0):
    6666
    try:
    6767
    expectedRDD = expected.rdd.cache()
    6868
    resultRDD = result.rdd.cache()
    69+
    self.assertEqual(expectedRDD.count(), resultRDD.count())
    6970

    7071
    def zipWithIndex(rdd):
    7172
    """Zip with index (idx, data)"""

    python/sparktestingbase/test/simple_sql_test.py

    Lines changed: 18 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -18,13 +18,19 @@
    1818

    1919
    from datetime import datetime
    2020
    from pyspark.sql import Row
    21+
    from pyspark.sql.types import StructType
    2122
    from sparktestingbase.sqltestcase import SQLTestCase
    2223
    import unittest2
    2324

    2425

    2526
    class SimpleSQLTest(SQLTestCase):
    2627
    """A simple test."""
    2728

    29+
    def test_empty_expected_equal(self):
    30+
    allTypes = self.sc.parallelize([])
    31+
    df = self.sqlCtx.createDataFrame(allTypes, StructType([]))
    32+
    self.assertDataFrameEqual(df, df)
    33+
    2834
    def test_simple_expected_equal(self):
    2935
    allTypes = self.sc.parallelize([Row(
    3036
    i=1, s="string", d=1.0, l=1,
    @@ -68,5 +74,17 @@ def test_dif_schemas_unequal(self):
    6874
    allTypes2 = self.sc.parallelize([Row(d="1.0")])
    6975
    self.assertDataFrameEqual(allTypes1.toDF(), allTypes2.toDF(), 0.0001)
    7076

    77+
    @unittest2.expectedFailure
    78+
    def test_empty_dataframe_unequal(self):
    79+
    allTypes = self.sc.parallelize([Row(
    80+
    i=1, s="string", d=1.001, l=1,
    81+
    b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
    82+
    time=datetime(2014, 8, 1, 14, 1, 5))])
    83+
    empty = self.sc.parallelize([])
    84+
    self.assertDataFrameEqual(
    85+
    allTypes.toDF(),
    86+
    self.sqlCtx.createDataFrame(empty, allTypes.toDF().schema), 0.1)
    87+
    88+
    7189
    if __name__ == "__main__":
    7290
    unittest2.main()
    Lines changed: 72 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,72 @@
    1+
    /*
    2+
    * Licensed to the Apache Software Foundation (ASF) under one or more
    3+
    * contributor license agreements. See the NOTICE file distributed with
    4+
    * this work for additional information regarding copyright ownership.
    5+
    * The ASF licenses this file to You under the Apache License, Version 2.0
    6+
    * (the "License"); you may not use this file except in compliance with
    7+
    * the License. You may obtain a copy of the License at
    8+
    *
    9+
    * http://www.apache.org/licenses/LICENSE-2.0
    10+
    *
    11+
    * Unless required by applicable law or agreed to in writing, software
    12+
    * distributed under the License is distributed on an "AS IS" BASIS,
    13+
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14+
    * See the License for the specific language governing permissions and
    15+
    * limitations under the License.
    16+
    */
    17+
    18+
    package com.holdenkarau.spark.testing
    19+
    20+
    import org.apache.spark.{SparkConf, SparkContext, EvilSparkContext}
    21+
    import org.scalatest.{BeforeAndAfterAll, Suite}
    22+
    23+
    /**
    24+
    * Shares an HDFS MiniCluster based `SparkContext` between all tests in a suite and
    25+
    * closes it at the end. This requires that the env variable SPARK_HOME is set.
    26+
    * Further more if this is used in Spark versions prior to 1.6.3,
    27+
    * all Spark tests must run against the yarn mini cluster.
    28+
    *
    29+
    * (see https://issues.apache.org/jira/browse/SPARK-10812 for details).
    30+
    */
    31+
    trait SharedMiniCluster extends BeforeAndAfterAll
    32+
    with HDFSClusterLike
    33+
    with YARNClusterLike
    34+
    with SparkContextProvider{
    35+
    self: Suite =>
    36+
    @transient private var _sc: SparkContext = _
    37+
    38+
    def sc: SparkContext = _sc
    39+
    40+
    val master = "yarn-client"
    41+
    42+
    override def beforeAll() {
    43+
    // Try and do setup, and in-case we fail shutdown
    44+
    try {
    45+
    super.startHDFS()
    46+
    super.startYARN()
    47+
    48+
    // Stop the spark context if already running
    49+
    EvilSparkContext.stopActiveSparkContext()
    50+
    // Create the new context
    51+
    val sparkConf = new SparkConf().setMaster(master).setAppName("test")
    52+
    _sc = new SparkContext(sparkConf)
    53+
    setup(_sc)
    54+
    } catch {
    55+
    case e: Throwable =>
    56+
    super.shutdownYARN()
    57+
    super.shutdownHDFS()
    58+
    throw e
    59+
    }
    60+
    super.beforeAll()
    61+
    }
    62+
    63+
    override def afterAll() {
    64+
    Option(sc).foreach(_.stop())
    65+
    _sc = null
    66+
    67+
    super.shutdownYARN()
    68+
    super.shutdownHDFS()
    69+
    70+
    super.afterAll()
    71+
    }
    72+
    }
    Lines changed: 75 additions & 0 deletions
    +
    val queryName = s"${this.getClass.getSimpleName}TestSimpleStreamEndState${count}"
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,75 @@
    1+
    /*
    2+
    * Licensed to the Apache Software Foundation (ASF) under one or more
    3+
    * contributor license agreements. See the NOTICE file distributed with
    4+
    * this work for additional information regarding copyright ownership.
    5+
    * The ASF licenses this file to You under the Apache License, Version 2.0
    6+
    * (the "License"); you may not use this file except in compliance with
    7+
    * the License. You may obtain a copy of the License at
    8+
    *
    9+
    * http://www.apache.org/licenses/LICENSE-2.0
    10+
    *
    11+
    * Unless required by applicable law or agreed to in writing, software
    12+
    * distributed under the License is distributed on an "AS IS" BASIS,
    13+
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14+
    * See the License for the specific language governing permissions and
    15+
    * limitations under the License.
    16+
    */
    17+
    18+
    package com.holdenkarau.spark.testing
    19+
    20+
    import org.apache.spark.sql._
    21+
    import org.apache.spark.sql.execution.streaming._
    22+
    23+
    import org.scalatest.Suite
    24+
    25+
    import scala.reflect.ClassTag
    26+
    27+
    /**
    28+
    * Early Experimental Structured Streaming Base.
    29+
    */
    30+
    trait StructuredStreamingBase extends DataFrameSuiteBase
    31+
    with StructuredStreamingBaseLike { self: Suite =>
    32+
    /**
    33+
    * Test a simple streams end state
    34+
    */
    35+
    def testSimpleStreamEndState[T: Encoder, R: Encoder](
    36+
    spark: SparkSession,
    37+
    input: Seq[Seq[T]],
    38+
    expected: Seq[R],
    39+
    mode: String,
    40+
    queryFunction: Dataset[T] => Dataset[R]) = {
    41+
    val result = runSimpleStreamEndState(spark, input, mode, queryFunction)
    42+
    assert(result === expected)
    43+
    }
    44+
    }
    45+
    46+
    trait StructuredStreamingBaseLike extends SparkContextProvider
    47+
    with TestSuiteLike with Serializable {
    48+
    var count = 0
    49+
    /**
    50+
    * Run a simple streams end state
    51+
    */
    52+
    private[holdenkarau] def runSimpleStreamEndState[T: Encoder, R: Encoder](
    53+
    spark: SparkSession,
    54+
    input: Seq[Seq[T]],
    55+
    mode: String,
    56+
    queryFunction: Dataset[T] => Dataset[R]) = {
    57+
    import spark.implicits._
    58+
    implicit val sqlContext = spark.sqlContext
    59+
    val inputStream = MemoryStream[T]
    60+
    val transformed = queryFunction(inputStream.toDS())
    61
    62+
    count = count + 1
    63+
    val query = transformed.writeStream.
    64+
    format("memory").
    65+
    outputMode(mode).
    66+
    queryName(queryName).
    67+
    start()
    68+
    input.foreach(batch => inputStream.addData(batch))
    69+
    // Block until all processed
    70+
    query.processAllAvailable()
    71+
    val table = spark.table(queryName).as[R]
    72+
    val resultRows = table.collect()
    73+
    resultRows.toSeq
    74+
    }
    75+
    }

    src/main/1.3/scala/com/holdenkarau/spark/testing/SharedMiniCluster.scala renamed to src/main/pre-2.0/scala/com/holdenkarau/spark/testing/SharedMiniCluster.scala

    Lines changed: 1 addition & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -45,6 +45,7 @@ trait SharedMiniCluster extends BeforeAndAfterAll
    4545
    super.startHDFS()
    4646
    super.startYARN()
    4747

    48+
    // Create the new context
    4849
    val sparkConf = new SparkConf().setMaster(master).setAppName("test")
    4950
    _sc = new SparkContext(sparkConf)
    5051
    setup(_sc)
    Lines changed: 31 additions & 0 deletions
    Original file line numberDiff line numberDiff line change
    @@ -0,0 +1,31 @@
    1+
    package com.holdenkarau.spark.testing
    2+
    3+
    import org.apache.spark.sql._
    4+
    5+
    import org.scalatest.FunSuite
    6+
    7+
    class StructuredStreamingTests
    8+
    extends FunSuite with SharedSparkContext with StructuredStreamingBase {
    9+
    // re-use the spark context
    10+
    override implicit def reuseContextIfPossible: Boolean = true
    11+
    12+
    test("add 3") {
    13+
    import< 9DDD /span> spark.implicits._
    14+
    val input = List(List(1), List(2, 3))
    15+
    val expected = List(4, 5, 6)
    16+
    def compute(input: Dataset[Int]): Dataset[Int] = {
    17+
    input.map(elem => elem + 3)
    18+
    }
    19+
    testSimpleStreamEndState(spark, input, expected, "append", compute)
    20+
    }
    21+
    22+
    test("stringify") {
    23+
    import spark.implicits._
    24+
    val input = List(List(1), List(2, 3))
    25+
    val expected = List("1", "2", "3")
    26+
    def compute(input: Dataset[Int]): Dataset[String] = {
    27+
    input.map(elem => elem.toString)
    28+
    }
    29+
    testSimpleStreamEndState(spark, input, expected, "append", compute)
    30+
    }
    31+
    }

    0 commit comments

    Comments
     (0)
    0