Study Notes 5.3.1-2 First Look at Spark/PySpark & Spark Dataframes
Pizofreude

Pizofreude @pizofreude

About: CAE Engineer & Data Cruncher, Tech Stack Enthusiast, FOSS Advocate, and Avid Cyclist.

Location:
Die Erde
Joined:
Nov 12, 2022

Study Notes 5.3.1-2 First Look at Spark/PySpark & Spark Dataframes

Publish Date: Mar 4
0 0

Study Notes 5.3.1 - on Spark/PySpark

These notes cover the basics and some intermediate topics discussed in DE Zoomcamp 5.3.1 – First Look at Spark/PySpark. Extra details on schema definition, partitioning, and best practices for a well-rounded understanding.


1. Introduction to Spark and PySpark

  • Apache Spark Overview:
    • Spark is a fast, in-memory distributed computing framework designed for large-scale data processing.
    • PySpark is the Python API for Spark, providing a way to work with Spark’s distributed DataFrame and SQL functionalities using Python.
  • Why Spark?
    • It is designed to handle high volume data efficiently.
    • Supports a range of operations—from data ingestion and processing to machine learning.
    • Emphasizes parallel execution through clusters (executors).
  • Key Concepts:
    • SparkSession: The main entry point to interact with Spark. It is used for reading data, processing, and writing output.
    • Lazy Evaluation: Transformations (e.g., reading files, repartitioning) are lazy—they are not executed until an action (e.g., show(), write()) is called.

2. Setting Up the Environment

  • Remote Execution:
    • The transcript demonstrates connecting via SSH to a remote machine.
    • Jupyter Notebooks are used to interact with PySpark code. Remember to configure port forwarding (e.g., port 4040 for Spark UI) correctly.
  • Development Tools:
    • Terminal commands are used to manage files and run Jupyter.
    • For Windows users, alternatives like Git Bash or MinGW can help mimic Linux command behavior.

3. Reading CSV Files with PySpark

  • Reading Data:

    • Use the spark.read.csv() method to load CSV files into a Spark DataFrame.
    • Example code snippet:

      df = spark.read.csv("path/to/file.csv", header=True)
      df.show()
      
      
  • Type Inference in Spark vs. Pandas:

    • Spark: Does not automatically infer data types; by default, every column is read as a string.
    • Pandas: Has basic type inference but might not recognize date/time fields accurately.
    • Implication: When accuracy is critical, explicitly defining the schema is recommended.
  • Defining Schema:

    • Use pyspark.sql.types (e.g., StructType, StructField, StringType, IntegerType, TimestampType) to create a schema.
    • Example schema definition:

      from pyspark.sql import types as T
      
      schema = T.StructType([
          T.StructField("pickup_datetime", T.TimestampType(), True),
          T.StructField("dropoff_datetime", T.TimestampType(), True),
          T.StructField("pickup_location_id", T.IntegerType(), True),
          T.StructField("dropoff_location_id", T.IntegerType(), True),
          T.StructField("sr_flag", T.StringType(), True),
          # Add more fields as needed
      ])
      df = spark.read.csv("path/to/file.csv", header=True, schema=schema)
      df.show(10)
      
      
    • Tip: Always check your data and use Pandas or manual inspection if needed to decide on the correct types.


4. Converting Pandas DataFrame to Spark DataFrame

  • When and Why:
    • Sometimes you might load a small sample with Pandas (which infers types) and then convert to a Spark DataFrame for distributed processing.
    • Use spark.createDataFrame(pandas_df, schema) to convert, ensuring the Spark DataFrame has the desired data types.
  • Benefits:
    • Allows leveraging Pandas’ flexibility for type detection on smaller subsets.
    • Transitioning to Spark helps when the dataset size exceeds the memory limits of a single machine.

5. Partitioning in Spark

  • Understanding Partitions:
    • In Spark, a partition is a chunk of data that can be processed independently on an executor.
    • The number of partitions affects parallelism; more partitions allow for better utilization of available executors.
  • Practical Example:
    • A large CSV file might be read as one single partition, causing only one executor to process it. This creates a bottleneck.
  • Repartitioning:

    • The repartition(n) method can be used to change the number of partitions.
    • Example:

      df_repartitioned = df.repartition(24)
      
      
    • Note: Repartitioning is a lazy operation. It does not occur until an action (such as writing data) is executed.

  • Cluster and File Distribution:

    • Files stored in a data lake (or cloud storage) might be split into multiple partitions automatically if there are many files.
    • Balancing the number of partitions with available executor resources is key for optimal performance.

6. Writing Data in Parquet Format

  • Why Parquet?
    • Parquet is a columnar storage format that offers efficient data compression and encoding schemes.
    • It is optimized for both storage size and query performance.
  • Process Overview:

    • After reading and processing the data, the DataFrame is written out in Parquet format:

      df_repartitioned.write.parquet("path/to/output_folder")
      
      
    • During the write operation, Spark may repartition the data further to meet the specified or default number of partitions.

  • Benefits:

    • Smaller file size (often 4x reduction compared to CSV).
    • Faster read times due to the columnar structure.
    • Better integration with many big data tools.
  • Additional Note:

    • Spark writes a _SUCCESS file (or similar marker) to indicate job completion. If rerunning, ensure you manage overwrites appropriately (e.g., using .mode("overwrite")).

7. Monitoring with the Spark Master UI

  • Overview of the UI:
    • The Spark Master UI (typically accessible on port 4040) provides real-time monitoring of jobs, stages, and tasks.
    • It shows information about:
      • Active and completed jobs.
      • Task execution details, including the number of partitions processed.
      • Executor usage and data shuffling (exchange) during repartitioning.
  • Usage:
    • Refreshing the UI can help you observe the progress of a job, see how many tasks are running, and troubleshoot potential performance issues.

8. Additional Information and Best Practices

8.1 Lazy Evaluation

  • Concept:
    • Operations like reading data, filtering, and repartitioning are lazily evaluated.
    • Actions such as show(), collect(), or writing data trigger the actual execution.
  • Why It Matters:
    • Optimizes the execution plan and can combine multiple transformations into a single job.
    • Helps in debugging and performance tuning.

8.2 Memory Management and Type Selection

  • Choosing Data Types:
    • Be mindful of using the most efficient data types (e.g., IntegerType over LongType if the range allows) to save memory.
  • Schema Enforcement:
    • Explicitly defining the schema not only speeds up reading but also avoids errors later in the processing pipeline.

8.3 Differences Between Spark DataFrame and Pandas DataFrame

  • Spark DataFrame:
    • Distributed, supports lazy evaluation, and is ideal for processing large datasets.
    • Operations are optimized across a cluster of machines.
  • Pandas DataFrame:
    • In-memory and more suitable for smaller datasets.
    • Rich API for data manipulation but not optimized for scale.

8.4 Troubleshooting Common Issues

  • Port Forwarding:
    • Ensure correct configuration (e.g., port 4040 for the Spark UI).
  • File Overwrite Errors:

    • If a write operation complains about existing paths, consider using an overwrite mode:

      df.write.mode("overwrite").parquet("path/to/output_folder")
      
      
  • Data Type Discrepancies:

    • Use Pandas to inspect a sample when unsure about the correct schema, then enforce it in Spark.

9. Conclusion

  • Summary:
    • You’ve learned how to read large CSV files using PySpark, define schemas explicitly, and convert data between Pandas and Spark DataFrames.
    • The importance of partitioning for parallel processing and the efficiency gains from writing in Parquet format were emphasized.
    • The Spark Master UI is a valuable tool for monitoring the execution of your jobs.
  • Next Steps:
    • Explore further topics such as Spark DataFrame operations, advanced optimizations, and cluster configuration.
    • Practice by running sample jobs and observing how partitioning and lazy evaluation affect performance.

Study Notes 5.3.2 - Spark DataFrames

1. Introduction to Spark DataFrames

  • Definition:

    A Spark DataFrame is a distributed collection of data organized into named columns, similar to a table in a relational database or a Pandas DataFrame. They are part of the higher-level Spark SQL API, which makes data processing more expressive and concise.

  • Why Use DataFrames?

    • Ease of Use: Offers an intuitive API for data manipulation using both SQL-like operations and Python/Scala/Java code.
    • Optimizations: Leverages Spark’s Catalyst optimizer to optimize query plans automatically.
    • Scalability: Works well on large-scale data processing tasks by distributing data and computation across clusters.

2. Reading and Writing Data

  • File Formats:
    • CSV:
      • Easy to read but does not store schema information.
      • Typically larger in size since values are stored as text.
    • Parquet:
      • A columnar storage format that embeds schema information.
      • More efficient storage (e.g., uses 4 bytes per value for integers) and allows for faster querying due to predicate pushdown.
  • Example (from Transcript):

    Using a previously saved Parquet file that automatically includes the schema, so there’s no need to specify it again when reading the data.

  • Key Takeaway:

    Parquet files often yield better performance and reduced storage compared to CSV files because they store metadata (schema) and allow for efficient compression and encoding.


3. Transformations vs. Actions

  • Transformations:
    • Operations that define a new DataFrame from an existing one (e.g., select, filter, withColumn).
    • Lazy Evaluation: They are not executed immediately. Spark builds up a DAG (Directed Acyclic Graph) of transformations.
    • Examples:
      • df.select("column1", "column2")
      • df.filter(df.license_number == "XYZ")
  • Actions:
    • Operations that trigger the execution of the DAG and return a result to the driver program.
    • Eager Execution: When an action is called, all the preceding transformations are computed.
    • Examples:
      • show(), collect(), take(n), write.csv(), write.parquet()
  • Visualizing the Concept:

    Imagine building a recipe (transformations) that isn’t cooked until you actually serve it (action). This lazy evaluation allows Spark to optimize the whole chain of operations before executing them.


4. Basic DataFrame Operations

  • Selecting Columns:

    Use select() to pick specific columns from a DataFrame.

    df_selected = df.select("pickup_datetime", "dropoff_datetime", "pickup_location_id", "dropoff_location_id")
    
    
  • Filtering Rows:

    Use filter() (or where()) to return rows that meet specific conditions.

    df_filtered = df.filter(df.license_number == "XYZ")
    
    
  • Adding or Modifying Columns:

    Use withColumn() along with built-in functions to add new columns or modify existing ones.

    • Example: Adding a date column by converting a datetime:

      from pyspark.sql import functions as f
      df = df.withColumn("pickup_date", f.to_date("pickup_datetime"))
      
      
    • Note on Overwriting:
      If you provide a column name that already exists, Spark will replace the old column with the new one. This can change semantics (e.g., from datetime to date).

  • Grouping and Aggregation:

    Although not detailed in the transcript, you can use groupBy() to perform aggregations like sum, count, or average on DataFrame columns.


5. Spark SQL Functions

  • Built-In Functions:
    Spark comes with a rich library of SQL functions that simplify data manipulation. These functions can be imported using:

    from pyspark.sql import functions as f
    
    
  • Usage Example:
    The function to_date() is used to extract the date part from a datetime column:

    df = df.withColumn("pickup_date", f.to_date("pickup_datetime"))
    
    
  • Exploring Functions:
    Typing f. in an interactive environment (like PySpark shell or a notebook) typically shows the list of available functions.


6. User Defined Functions (UDFs)

  • When to Use UDFs:

    Sometimes you need to implement custom business logic that isn’t available through built-in functions or is too complex to express in SQL. UDFs allow you to write custom Python functions and apply them to DataFrame columns.

  • Creating a UDF:

    • Step 1: Define a standard Python function that encapsulates your custom logic.

      def crazy_stuff(dispatching_base):
          # Custom logic: for example, check divisibility and return a formatted string
          base = int(dispatching_base[1:])  # assume the first character is not numeric
          if base % 7 == 0:
              return f"s/{base}"
          else:
              return f"e/{base}"
      
      
    • Step 2: Convert the Python function to a Spark UDF and specify the return type (default is StringType unless specified otherwise).

      from pyspark.sql.types import StringType
      crazy_stuff_udf = f.udf(crazy_stuff, StringType())
      
      
    • Step 3: Apply the UDF to a DataFrame column:

      df = df.withColumn("waste_id", crazy_stuff_udf("dispatching_base"))
      
      
  • Benefits:

    • Testability: You can test your Python functions separately (unit tests) before integrating them into Spark.
    • Flexibility: Complex business rules can be easily implemented in Python rather than wrestling with complex SQL expressions.
  • Considerations:

    UDFs may introduce performance overhead compared to native Spark SQL functions. When possible, try to use built-in functions.


7. SQL vs. DataFrame API

  • Using SQL in Spark:

    • Spark allows you to run SQL queries directly on DataFrames by creating temporary views.
    • Example:

      df.createOrReplaceTempView("rides")
      spark.sql("SELECT pickup_date, COUNT(*) FROM rides GROUP BY pickup_date").show()
      
      
  • When to Use Which:

    • SQL:
      • Preferred for simple aggregations, joins, or when you need a familiar SQL interface.
      • Useful for analysts who are comfortable with SQL syntax.
    • DataFrame API (with Python/Scala/Java):
      • Offers greater flexibility for complex transformations.
      • Easier to integrate with other programming logic and unit tests (especially in Python).
  • Integration Benefits:

    Spark’s architecture allows you to mix SQL and DataFrame API operations in the same workflow, giving you the best of both worlds.


8. Additional Concepts and Best Practices

  • Lazy Evaluation and DAG Optimization:
    • Spark delays execution of transformations until an action is called. This allows Spark to optimize the entire chain of operations (using the Catalyst optimizer) before executing them.
    • Understanding this helps in debugging performance issues and ensuring that you’re not inadvertently triggering multiple jobs.
  • Partitioning:
    • Data is split into partitions across the cluster.
    • Good partitioning strategy is key to performance. Over-partitioning or under-partitioning can affect job speed.
  • Caching and Persistence:
    • Use caching (df.cache()) to store frequently accessed DataFrames in memory, reducing computation time for iterative operations.
  • Testing and Code Quality:
    • Unit test your transformation logic and UDFs independently.
    • Keep business logic modular so that it can be maintained, reused, and easily tested.
  • Monitoring:
    • Use the Spark UI (accessed via the Spark master web UI) to monitor job execution, inspect DAGs, and diagnose performance issues.
  • Performance Considerations:
    • Prefer native Spark functions over UDFs when possible.
    • Be mindful of data serialization, shuffling, and network I/O as these can become performance bottlenecks.
  • Use Cases in Data Engineering:
    • ETL processes: Reading from various sources, transforming data, and writing to efficient storage formats.
    • Data Cleaning: Applying transformations and filtering to cleanse raw data.
    • Machine Learning: Preprocessing data before feeding it into ML models, often mixing SQL with Python-based UDFs for feature engineering.

9. Summary

  • Spark DataFrames provide a powerful and flexible way to process large-scale data.
  • Lazy evaluation allows Spark to optimize the execution plan, differentiating between transformations (lazy) and actions (eager).
  • Built-in SQL functions and DataFrame API offer a rich toolset for common data manipulation tasks.
  • UDFs extend Spark’s functionality by letting you apply custom Python logic when necessary.
  • Both SQL and DataFrame API have their strengths, and Spark lets you combine them in a seamless workflow.

10. Further Reading and Resources

  • Official Apache Spark Documentation:Spark SQL, DataFrames and Datasets Guide
  • DE Zoomcamp Materials: Additional videos and tutorials on Spark and data engineering best practices.
  • Community Tutorials: Blogs, courses, and GitHub repositories related to Spark best practices for data engineers.

Comments 0 total

    Add comment