HI WELCOME TO KANSIRIS

Coding/Practical PySpark Interview Questions

Leave a Comment

 

1. Read a CSV file and calculate the average salary by department.

 

To read a CSV file and calculate the average salary by department using PySpark, follow these steps:

  • Initialize SparkSession: Create a SparkSession, which is the entry point for using Spark functionality.

Python

    from pyspark.sql import SparkSession
     from pyspark.sql.functions import avg

 

     spark = SparkSession.builder \

        .appName("Average Salary by Department") \

        .getOrCreate()

  • Read the CSV File: Load the CSV file into a PySpark DataFrame. Specify header=True if your CSV has a header row and inferSchema=True to automatically detect data types.

Python

    df = spark.read.csv("path/to/your/salary_data.csv", header=True, inferSchema=True)

Replace "path/to/your/salary_data.csv" with the actual path to your CSV file. Ensure your CSV file contains columns like 'Department' and 'Salary'.

  • Calculate Average Salary by Department: Group the DataFrame by the 'Department' column and then calculate the average of the 'Salary' column.

Python

    average_salary_by_department = df.groupBy("Department").agg(avg("Salary").alias("AverageSalary"))

  • Display the Result: Show the resulting DataFrame containing the average salary for each department.

Python

    average_salary_by_department.show()

  • Stop SparkSession: It is good practice to stop the SparkSession when you are finished.

Python

    spark.stop()

This sequence of operations will read your CSV data, group it by department, and compute the average salary for each department, displaying the result in a new DataFrame.

 

 

2. Find the top 3 highest-paid employees in each department using window functions.

 

 To find the top 3 highest-paid employees in each department, use a Common Table Expression (CTE) with a window function like ROW_NUMBER(), RANK(), or DENSE_RANK() partitioned by department and ordered by salary in descending order. The outer query then filters the results to include only rows with a rank of 3 or less, such as by adding WHERE rank <= 3.  

Example SQL Query

Code

WITH RankedEmployees AS (
     SELECT

         employee_id,

         name,

         salary,

         department_id,

         ROW_NUMBER() OVER (PARTITION BY department_id ORDER BY salary DESC) AS rank

     FROM

         employees

 )

 SELECT

     employee_id,

     name,

     salary,

     department_id

 FROM

     RankedEmployees

 WHERE

     rank <= 3

 
ORDER BY
     department_id, rank;

Explanation of the Query

  1. WITH clause 

(Common Table Expression - CTE): The CTE RankedEmployees creates a temporary result set with the added rank column. 

  1. ROW_NUMBER() OVER (PARTITION BY department_id ORDER BY salary DESC):

  • PARTITION BY department_id: Divides the employees into separate groups (partitions) based on their department_id.

  • ORDER BY salary DESC: Within each department, it sorts the employees by their salary in descending order, placing the highest earners first. 

  • ROW_NUMBER(): Assigns a unique, sequential rank to each employee within their department, starting from 1 for the highest salary. 

  1. Outer SELECT statement: 

Selects the desired columns from the CTE. 

  1. WHERE rank <= 3: 

Filters the results to include only those employees whose rank is 1, 2, or 3, effectively selecting the top three highest-paid individuals in each department. 

  1. ORDER BY department_id, rank: 

Sorts the final output by department and then by rank within each department for clarity. 

Alternative Window Functions

  • RANK(): 

Similar to ROW_NUMBER() but assigns the same rank to employees with identical salaries and leaves gaps in the ranking (e.g., 1, 2, 2, 4). 

  • DENSE_RANK(): 

Also assigns the same rank to employees with identical salaries but does not leave gaps in the ranking (e.g., 1, 2, 2, 3). 

 

3. Remove duplicates from a DataFrame based on multiple columns.

 To remove duplicate rows from a PySpark DataFrame based on multiple columns, use the dropDuplicates() method and provide a list of the columns to consider.

Here is an example:

Python

from pyspark.sql import SparkSession
 

 
# Create a SparkSession
 spark = SparkSession.builder.appName("RemoveDuplicates").getOrCreate()

 

 
# Sample data
 data = [

     ("Alice", "New York", 25),

     ("Bob", "London", 30),

     ("Alice", "New York", 25), 
# Duplicate based on all columns
     ("Charlie", "Paris", 35),

     ("Bob", "London", 40),  
# Duplicate based on name and city, different age
     ("Alice", "London", 28)

 ]

 

 
# Define schema
 schema = ["name", "city", "age"]

 

 
# Create DataFrame
 df = spark.createDataFrame(data, schema)

 

 
# Show original DataFrame
 print("Original DataFrame:")

 df.show()

 

 
# Remove duplicates based on 'name' and 'city' columns
 df_no_duplicates = df.dropDuplicates(["name", "city"])

 

 
# Show DataFrame after removing duplicates
 print("DataFrame after removing duplicates based on 'name' and 'city':")

 df_no_duplicates.show()

 

 
# Stop SparkSession
 spark.stop()

Explanation:

  • spark.createDataFrame(data, schema): This creates a PySpark DataFrame from the provided sample data and schema.

  • df.dropDuplicates(["name", "city"]): This is the core operation. The dropDuplicates() method is called on the DataFrame df.

  • The argument ["name", "city"] is a list of column names. PySpark will consider a row a duplicate if the values in both the 'name' and 'city' columns are identical to another row.

  • By default, dropDuplicates() keeps the first occurrence of a unique combination and discards subsequent duplicates.

The output will show the DataFrame with rows where the combination of 'name' and 'city' is unique. For example, the row ("Bob", "London", 40) will be removed because ("Bob", "London", 30) was encountered first, and the 'name' and 'city' values are the same.

 

 

4. Count word frequency in a text file using RDD and DataFrame API.

 Using RDD API

Python

from pyspark import SparkContext

 

sc = SparkContext("local", "WordCountRDD")

text_file = sc.textFile("path/to/your/textfile.txt")

 

word_counts = (

    text_file.flatMap(lambda line: line.split())

             .map(lambda word: (word, 1))

             .reduceByKey(lambda a, b: a + b)

)

 

word_counts.collect()

 

Using DataFrame API

Python

from pyspark.sql import SparkSession

from pyspark.sql.functions import explode, split

 

spark = SparkSession.builder.appName("WordCountDF").getOrCreate()

df = spark.read.text("path/to/your/textfile.txt")

 

words_df = df.select(explode(split(df.value, " ")).alias("word"))

word_counts_df = words_df.groupBy("word").count()

 

word_counts_df.show()

 

๐Ÿ” Key Differences

Feature

RDD API

DataFrame API

Abstraction

Low-level

High-level

Performance

Less optimized

Catalyst optimizer improves speed

Ease of Use

More control, more verbose

Concise and SQL-like

 

 

5. Write PySpark code to join two DataFrames and handle null values.

 Here’s a simple example of how to join two DataFrames in PySpark and handle null values using the DataFrame API:

 

๐Ÿ”น Sample DataFrames

Python

from pyspark.sql import SparkSession

from pyspark.sql.functions import coalesce

 

spark = SparkSession.builder.appName("JoinHandleNulls").getOrCreate()

 

# First DataFrame

data1 = [("1", "Alice"), ("2", "Bob"), ("3", "Charlie")]

df1 = spark.createDataFrame(data1, ["id", "name"])

 

# Second DataFrame

data2 = [("1", "HR"), ("2", None), ("4", "Finance")]

df2 = spark.createDataFrame(data2, ["id", "department"])

 

 

๐Ÿ”น Join the DataFrames

Python

joined_df = df1.join(df2, on="id", how="left")

 

 

๐Ÿ”น Handle Null Values

You can replace nulls using coalesce() or fillna():

Python

# Replace nulls in 'department' column with 'Unknown'

cleaned_df = joined_df.withColumn("department", coalesce(joined_df["department"], lit("Unknown")))

 

# Alternatively using fillna

# cleaned_df = joined_df.fillna({"department": "Unknown"})

 

๐Ÿ”น Show Result

Python

cleaned_df.show()

 


 

 

6. Find the nth highest salary using PySpark DataFrame API.

 To find the nth highest salary using the PySpark DataFrame API, you can use the dense_rank() or row_number() window function. Here's a complete example:

 

PySpark Code to Find the nth Highest Salary

Python

from pyspark.sql import SparkSession

from pyspark.sql.window import Window

from pyspark.sql.functions import col, dense_rank

 

# Initialize Spark session

spark = SparkSession.builder.appName("NthHighestSalary").getOrCreate()

 

# Sample employee salary data

data = [("Alice", 5000), ("Bob", 6000), ("Charlie", 7000),

        ("David", 6000), ("Eve", 8000), ("Frank", 7000)]

df = spark.createDataFrame(data, ["name", "salary"])

 

# Define window specification

window_spec = Window.orderBy(col("salary").desc())

 

# Add rank column

ranked_df = df.withColumn("rank", dense_rank().over(window_spec))

 

# Set the value of n

n = 3

 

# Filter for nth highest salary

nth_highest_salary_df = ranked_df.filter(col("rank") == n)

 

nth_highest_salary_df.show()

 

Show less

๐Ÿ” Explanation

  • dense_rank() ensures that if two people have the same salary, they get the same rank.

  • You can change n = 3 to any other number to get the nth highest salary.

  • If you want to exclude duplicates, use row_number() instead of dense_rank().

Would you like to see how to do this using SQL or RDD as well?

 

 

7. Flatten a nested JSON column in PySpark DataFrame.

 To flatten a nested JSON column in a PySpark DataFrame, you can use the explode() function for arrays and selectExpr() or withColumn() for nested structs. Here's a step-by-step example:

Sample Nested JSON

Assume you have a DataFrame like this:

Python

from pyspark.sql import SparkSession

from pyspark.sql.functions import explode

 

spark = SparkSession.builder.appName("FlattenJSON").getOrCreate()

 

# Sample nested JSON data

data = [

    {

        "id": 1,

        "name": "Alice",

        "address": {

            "city": "Hyderabad",

            "state": "Telangana"

        },

        "phones": ["12345", "67890"]

    }

]

 

df = spark.read.json(spark.sparkContext.parallelize(data))

df.printSchema()

 

Show more lines

๐Ÿ”น Flatten Struct Columns

Python

flat_df = df.select(

    "id",

    "name",

    "address.city",

    "address.state",

    "phones"

)


 

 

8. Write a PySpark job to calculate moving average of stock prices.

 Here’s a PySpark job to calculate the moving average of stock prices using the DataFrame API and window functions:

PySpark Code: Moving Average of Stock Prices

Python

from pyspark.sql import SparkSession

from pyspark.sql.window import Window

from pyspark.sql.functions import col, avg

 

# Initialize Spark session

spark = SparkSession.builder.appName("MovingAverageStockPrices").getOrCreate()

 

# Sample stock price data: (date, stock, price)

data = [

    ("2025-09-01", "AAPL", 180),

    ("2025-09-02", "AAPL", 182),

    ("2025-09-03", "AAPL", 185),

    ("2025-09-04", "AAPL", 183),

    ("2025-09-05", "AAPL", 187),

]

 

df = spark.createDataFrame(data, ["date", "stock", "price"])

 

# Define window spec for moving average (e.g., 3-day window)

window_spec = Window.partitionBy("stock").orderBy("date").rowsBetween(-2, 0)

 

# Calculate moving average

df_with_ma = df.withColumn("moving_avg", avg("price").over(window_spec))

 

df_with_ma.show()

 

Show more lines

๐Ÿ” Explanation

  • rowsBetween(-2, 0) defines a 3-day moving window ending at the current row.

  • You can adjust the window size by changing the bounds.

  • partitionBy("stock") ensures the moving average is calculated per stock.

๐Ÿ“ˆ Sample Output

+----------+-----+-----+-----------+

|      date|stock|price|moving_avg|

+----------+-----+-----+-----------+

|2025-09-01| AAPL|  180|      180.0|

|2025-09-02| AAPL|  182|      181.0|

|2025-09-03| AAPL|  185|      182.33|

|2025-09-04| AAPL|  183|      183.33|

|2025-09-05| AAPL|  187|      185.0|

+----------+-----+-----+-----------+

Would you like to apply this to a real dataset or visualize the moving average trend? You can upload your file and I’ll help you process it.

 

 

9. Explain how to implement a custom UDF (User Defined Function) in PySpark.

 To implement a custom UDF (User Defined Function) in PySpark, follow these steps:

 

Step-by-Step: Creating a UDF in PySpark

1. Import Required Modules

Python

from pyspark.sql import SparkSession

from pyspark.sql.functions import udf

from pyspark.sql.types import StringType

 

2. Initialize Spark Session

Python

spark = SparkSession.builder.appName("CustomUDFExample").getOrCreate()

 

3. Create Sample DataFrame

Python

data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]

df = spark.createDataFrame(data, ["name", "age"])

 

4. Define a Python Function

Python

def age_category(age):

    if age < 30:

        return "Young"

    elif age < 40:

        return "Adult"

    else:

        return "Senior"

 

5. Register the Function as a UDF

Python

age_category_udf = udf(age_category, StringType())

 

6. Apply the UDF to the DataFrame

Python

df_with_category = df.withColumn("category", age_category_udf(df["age"]))

df_with_category.show()

 

๐Ÿ“Œ Output

+-------+---+--------+

|   name|age|category|

+-------+---+--------+

|  Alice| 25|   Young|

|    Bob| 30|   Adult|

|Charlie| 35|   Adult|

+-------+---+--------+

๐Ÿ” Notes

  • UDFs are less efficient than built-in functions because they bypass Spark’s Catalyst optimizer.

  • Prefer using Spark SQL functions when possible for better performance.

  • You can also use Pandas UDFs for vectorized operations if you're working with large datasets.

Would you like to see how to register a UDF for use in Spark SQL queries or how to use Pandas UDFs for performance?

 

 

10. Write a PySpark streaming job that reads from Kafka and counts events per minute.

 Here’s a complete example of a PySpark Structured Streaming job that reads data from Kafka and counts events per minute:

PySpark Streaming Job: Count Events Per Minute from Kafka

Python

from pyspark.sql import SparkSession

from pyspark.sql.functions import window, col

# Initialize Spark session

spark = SparkSession.builder \

    .appName("KafkaEventCountPerMinute") \

    .getOrCreate()

 

spark.sparkContext.setLogLevel("WARN")

 

# Read from Kafka topic

kafka_df = spark.readStream \

    .format("kafka") \

    .option("kafka.bootstrap.servers", "localhost:9092") \

    .option("subscribe", "your_topic_name") \

    .option("startingOffsets", "latest") \

    .load()

 

# Convert Kafka value to string and add timestamp

events_df = kafka_df.selectExpr("CAST(value AS STRING)", "timestamp")

 

# Count events per minute using window

events_per_minute = events_df \

    .groupBy(window(col("timestamp"), "1 minute")) \

    .count()

 

# Write output to console (can be changed to other sinks like HDFS, Kafka, etc.)

query = events_per_minute.writeStream \

    .outputMode("complete") \

    .format("console") \

    .option("truncate", "false") \

    .start()

 

query.awaitTermination()

 

Show less

๐Ÿ” Key Notes

  • Kafka Setup: Ensure Kafka is running and the topic (your_topic_name) exists.

  • Windowing: window(timestamp, "1 minute") groups events into 1-minute buckets.

  • Output Mode: "complete" outputs the full aggregation result each time.

  • Sink Options: You can replace console with parquet, Kafka, or memory for production use.

Would you like help setting up Kafka locally or modifying this job to include filtering or custom parsing of the Kafka messages?

 

how to program a file data that is partitioning into four files in pyspark??

 To partition a file's data into four separate files using PySpark, you can follow these general steps:

Step-by-Step Guide

  1. Initialize SparkSession

  2. Read the input file into a DataFrame

  3. Repartition the DataFrame into 4 partitions

  4. Write each partition to a separate file

Here’s a sample code snippet to demonstrate this:

Python

from pyspark.sql import SparkSession

 

# Step 1: Create SparkSession

spark = SparkSession.builder \

    .appName("Partition File into Four") \

    .getOrCreate()

 

# Step 2: Read the input file (e.g., CSV)

df = spark.read.csv("path/to/input_file.csv", header=True, inferSchema=True)

 

# Step 3: Repartition into 4 partitions

df_repartitioned = df.repartition(4)

 

# Step 4: Write each partition to separate files

df_repartitioned.write \

    .mode("overwrite") \

    .option("header", True) \

    .csv("path/to/output_folder")

 

Show less

๐Ÿ” Notes:

  • The output will be saved as four part files (e.g., part-00000, part-00001, etc.) inside the output_folder.

  • You can change the format from CSV to Parquet, JSON, etc., by replacing .csv() with .parquet() or .json().

Would you like help adapting this to a specific file format or dataset you're working with? You can also upload the file if you'd like me to generate the exact code for it.

 

how to program a file data that is partitioning into four files by couloum and date in pyspark??

To programmatically partition a CSV file into four files based on a column and date using PySpark, you can follow this structured approach:

Goal

Split the data into 4 separate files:

  • Based on date ranges (e.g., quarterly or custom ranges).

  • Each file is partitioned by a column (e.g., category, region, etc.).

๐Ÿงช Sample PySpark Code

Python

from pyspark.sql import SparkSession

from pyspark.sql.functions import col

 

# Step 1: Create SparkSession

spark = SparkSession.builder.appName("Partition CSV by Column and Date").getOrCreate()

 

# Step 2: Read the CSV file

df = spark.read.csv("path/to/input.csv", header=True, inferSchema=True)

 

# Step 3: Define your date ranges (customize as needed)

df1 = df.filter(col("date") < "2023-01-01")

df2 = df.filter((col("date") >= "2023-01-01") & (col("date") < "2023-06-01"))

df3 = df.filter((col("date") >= "2023-06-01") & (col("date") < "2024-01-01"))

df4 = df.filter(col("date") >= "2024-01-01")

 

# Step 4: Write each filtered DataFrame partitioned by a column (e.g., 'category')

df1.write.partitionBy("category").csv("output/part1", mode="overwrite", header=True)

df2.write.partitionBy("category").csv("output/part2", mode="overwrite", header=True)

df3.write.partitionBy("category").csv("output/part3", mode="overwrite", header=True)

df4.write.partitionBy("category").csv("output/part4", mode="overwrite", header=True)

 

Show less

๐Ÿ“ Output Structure

Each folder (part1, part2, etc.) will contain subfolders like:

output/part1/

├── category=A/

│   └── part-00000.csv

├── category=B/

│   └── part-00000.csv

...

๐Ÿ› ️ Customization Options

  • You can change the partition column to any other column like region, type, etc.

  • You can adjust the date ranges to suit your data (monthly, quarterly, yearly).

  • You can also write in Parquet or JSON by replacing .csv() with .parquet() or .json().

 

 

0 comments:

Post a Comment

Note: only a member of this blog may post a comment.