0
0
Apache Sparkdata~20 mins

Integration testing pipelines in Apache Spark - Practice Problems & Coding Challenges

Choose your learning style9 modes available
Challenge - 5 Problems
🎖️
Integration Testing Master
Get all challenges correct to earn this badge!
Test your skills under time pressure!
Predict Output
intermediate
2:00remaining
Output of Spark DataFrame after pipeline transformation
Given the following Spark pipeline code, what is the output DataFrame content after applying the transformations?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

spark = SparkSession.builder.master('local').appName('Test').getOrCreate()
data = [(0, 'cat'), (1, 'dog'), (2, 'cat'), (3, 'bird')]
schema = ['id', 'animal']
df = spark.createDataFrame(data, schema)

indexer = StringIndexer(inputCol='animal', outputCol='animalIndex')
assembler = VectorAssembler(inputCols=['animalIndex'], outputCol='features')
pipeline = Pipeline(stages=[indexer, assembler])
model = pipeline.fit(df)
result = model.transform(df).select('id', 'animal', 'animalIndex', 'features')
result.show()
A[Row(id=0, animal='cat', animalIndex=0.0, features=DenseVector([0.0])), Row(id=1, animal='dog', animalIndex=1.0, features=DenseVector([1.0])), Row(id=2, animal='cat', animalIndex=0.0, features=DenseVector([0.0])), Row(id=3, animal='bird', animalIndex=2.0, features=DenseVector([2.0]))]
B[Row(id=0, animal='cat', animalIndex=1.0, features=DenseVector([0.0])), Row(id=1, animal='dog', animalIndex=2.0, features=DenseVector([1.0])), Row(id=2, animal='cat', animalIndex=1.0, features=DenseVector([0.0])), Row(id=3, animal='bird', animalIndex=0.0, features=DenseVector([2.0]))]
C[Row(id=0, animal='cat', animalIndex=2.0, features=DenseVector([2.0])), Row(id=1, animal='dog', animalIndex=0.0, features=DenseVector([0.0])), Row(id=2, animal='cat', animalIndex=2.0, features=DenseVector([2.0])), Row(id=3, animal='bird', animalIndex=1.0, features=DenseVector([1.0]))]
D[Row(id=0, animal='cat', animalIndex=1.0, features=DenseVector([1.0])), Row(id=1, animal='dog', animalIndex=2.0, features=DenseVector([2.0])), Row(id=2, animal='cat', animalIndex=1.0, features=DenseVector([1.0])), Row(id=3, animal='bird', animalIndex=0.0, features=DenseVector([0.0]))]
Attempts:
2 left
💡 Hint
Remember StringIndexer assigns indices based on label frequency descending order.
data_output
intermediate
1:30remaining
Number of rows after filtering in Spark pipeline
Consider a Spark DataFrame with 1000 rows. A pipeline stage filters rows where the column 'age' is greater than 30. If 400 rows have age > 30, how many rows remain after the pipeline transformation?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

spark = SparkSession.builder.master('local').appName('Test').getOrCreate()
data = [(i, 20 + (i % 50)) for i in range(1000)]
schema = ['id', 'age']
df = spark.createDataFrame(data, schema)

class AgeFilter:
    def __init__(self, age_limit):
        self.age_limit = age_limit
    def transform(self, df):
        return df.filter(col('age') > self.age_limit)
    def fit(self, df):
        return self

filter_stage = AgeFilter(30)
pipeline = Pipeline(stages=[filter_stage])
model = pipeline.fit(df)
result = model.transform(df)
count = result.count()
A1000
B400
C600
D0
Attempts:
2 left
💡 Hint
Count how many rows satisfy the condition age > 30.
🔧 Debug
advanced
2:00remaining
Identify the error in Spark pipeline code
What error does the following Spark pipeline code raise when executed?
Apache Spark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

spark = SparkSession.builder.master('local').appName('Test').getOrCreate()
data = [(0, 'apple'), (1, 'banana')]
schema = ['id', 'fruit']
df = spark.createDataFrame(data, schema)

indexer = StringIndexer(inputCol='fruit', outputCol='fruitIndex')
pipeline = Pipeline(stages=[indexer])
model = pipeline.fit(df)
result = model.transform(df).select('id', 'fruitIndex')
result.show()
ANo error, outputs DataFrame with 'id' and 'fruitIndex' columns
BTypeError: 'StringIndexer' object is not callable
CAnalysisException: cannot resolve 'fruitIndex' given input columns: [id, fruit]
DAttributeError: 'Pipeline' object has no attribute 'fit'
Attempts:
2 left
💡 Hint
Check if the pipeline stages are correctly defined and used.
🚀 Application
advanced
1:30remaining
Best approach to test Spark pipeline integration
Which approach is best to test the integration of multiple Spark pipeline stages in a data science project?
AOnly test the pipeline on production data to ensure real-world correctness
BTest each pipeline stage separately with unit tests and skip full pipeline testing
CRun the full pipeline on a small sample dataset and verify the final output matches expected results
DManually inspect intermediate DataFrame outputs without automated tests
Attempts:
2 left
💡 Hint
Integration testing means testing combined parts together.
🧠 Conceptual
expert
2:00remaining
Effect of caching in Spark pipeline integration tests
In Spark pipeline integration tests, what is the main benefit of caching intermediate DataFrames during the pipeline execution?
AIt speeds up repeated access to intermediate results, reducing test runtime
BIt automatically fixes data quality issues in intermediate DataFrames
CIt prevents the pipeline from running any transformations
DIt increases memory usage without any performance benefit
Attempts:
2 left
💡 Hint
Think about how caching affects performance in Spark.