1. Read a CSV file and calculate the average salary by department.
To read a CSV file and calculate the average salary by department using PySpark, follow these steps:
Initialize SparkSession: Create a SparkSession, which is the entry point for using Spark functionality.
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg
spark = SparkSession.builder \
.appName("Average Salary by Department") \
.getOrCreate()
Read the CSV File: Load the CSV file into a PySpark DataFrame. Specify header=True if your CSV has a header row and inferSchema=True to automatically detect data types.
Python
df = spark.read.csv("path/to/your/salary_data.csv", header=True, inferSchema=True)
Replace "path/to/your/salary_data.csv" with the actual path to your CSV file. Ensure your CSV file contains columns like 'Department' and 'Salary'.
Calculate Average Salary by Department: Group the DataFrame by the 'Department' column and then calculate the average of the 'Salary' column.
Python
average_salary_by_department = df.groupBy("Department").agg(avg("Salary").alias("AverageSalary"))
Display the Result: Show the resulting DataFrame containing the average salary for each department.
Python
average_salary_by_department.show()
Stop SparkSession: It is good practice to stop the SparkSession when you are finished.
Python
spark.stop()
This sequence of operations will read your CSV data, group it by department, and compute the average salary for each department, displaying the result in a new DataFrame.
2. Find the top 3 highest-paid employees in each department using window functions.
To find the top 3 highest-paid employees in each department, use a Common Table Expression (CTE) with a window function like ROW_NUMBER(), RANK(), or DENSE_RANK() partitioned by department and ordered by salary in descending order. The outer query then filters the results to include only rows with a rank of 3 or less, such as by adding WHERE rank <= 3.
Example SQL Query
Code
WITH RankedEmployees AS (
SELECT
employee_id,
name,
salary,
department_id,
ROW_NUMBER() OVER (PARTITION BY department_id ORDER BY salary DESC) AS rank
FROM
employees
)
SELECT
employee_id,
name,
salary,
department_id
FROM
RankedEmployees
WHERE
rank <= 3
ORDER BY
department_id, rank;
Explanation of the Query
(Common Table Expression - CTE): The CTE RankedEmployees creates a temporary result set with the added rank column.
ROW_NUMBER() OVER (PARTITION BY department_id ORDER BY salary DESC):
PARTITION BY department_id: Divides the employees into separate groups (partitions) based on their department_id.
ORDER BY salary DESC: Within each department, it sorts the employees by their salary in descending order, placing the highest earners first.
ROW_NUMBER(): Assigns a unique, sequential rank to each employee within their department, starting from 1 for the highest salary.
Outer SELECT statement:
Selects the desired columns from the CTE.
WHERE rank <= 3:
Filters the results to include only those employees whose rank is 1, 2, or 3, effectively selecting the top three highest-paid individuals in each department.
ORDER BY department_id, rank:
Sorts the final output by department and then by rank within each department for clarity.
Alternative Window Functions
RANK():
Similar to ROW_NUMBER() but assigns the same rank to employees with identical salaries and leaves gaps in the ranking (e.g., 1, 2, 2, 4).
DENSE_RANK():
Also assigns the same rank to employees with identical salaries but does not leave gaps in the ranking (e.g., 1, 2, 2, 3).
3. Remove duplicates from a DataFrame based on multiple columns.
To remove duplicate rows from a PySpark DataFrame based on multiple columns, use the dropDuplicates() method and provide a list of the columns to consider.
Here is an example:
Python
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.appName("RemoveDuplicates").getOrCreate()
# Sample data
data = [
("Alice", "New York", 25),
("Bob", "London", 30),
("Alice", "New York", 25), # Duplicate based on all columns
("Charlie", "Paris", 35),
("Bob", "London", 40), # Duplicate based on name and city, different age
("Alice", "London", 28)
]
# Define schema
schema = ["name", "city", "age"]
# Create DataFrame
df = spark.createDataFrame(data, schema)
# Show original DataFrame
print("Original DataFrame:")
df.show()
# Remove duplicates based on 'name' and 'city' columns
df_no_duplicates = df.dropDuplicates(["name", "city"])
# Show DataFrame after removing duplicates
print("DataFrame after removing duplicates based on 'name' and 'city':")
df_no_duplicates.show()
# Stop SparkSession
spark.stop()
Explanation:
spark.createDataFrame(data, schema): This creates a PySpark DataFrame from the provided sample data and schema.
df.dropDuplicates(["name", "city"]): This is the core operation. The dropDuplicates() method is called on the DataFrame df.
The argument ["name", "city"] is a list of column names. PySpark will consider a row a duplicate if the values in both the 'name' and 'city' columns are identical to another row.
By default, dropDuplicates() keeps the first occurrence of a unique combination and discards subsequent duplicates.
The output will show the DataFrame with rows where the combination of 'name' and 'city' is unique. For example, the row ("Bob", "London", 40) will be removed because ("Bob", "London", 30) was encountered first, and the 'name' and 'city' values are the same.
4. Count word frequency in a text file using RDD and DataFrame API.
Using RDD API
Python
from pyspark import SparkContext
sc = SparkContext("local", "WordCountRDD")
text_file = sc.textFile("path/to/your/textfile.txt")
word_counts = (
text_file.flatMap(lambda line: line.split())
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
)
word_counts.collect()
✅ Using DataFrame API
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
spark = SparkSession.builder.appName("WordCountDF").getOrCreate()
df = spark.read.text("path/to/your/textfile.txt")
words_df = df.select(explode(split(df.value, " ")).alias("word"))
word_counts_df = words_df.groupBy("word").count()
word_counts_df.show()
๐ Key Differences
Feature | RDD API | DataFrame API |
Abstraction | Low-level | High-level |
Performance | Less optimized | Catalyst optimizer improves speed |
Ease of Use | More control, more verbose | Concise and SQL-like |
5. Write PySpark code to join two DataFrames and handle null values.
Here’s a simple example of how to join two DataFrames in PySpark and handle null values using the DataFrame API:
๐น Sample DataFrames
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import coalesce
spark = SparkSession.builder.appName("JoinHandleNulls").getOrCreate()
# First DataFrame
data1 = [("1", "Alice"), ("2", "Bob"), ("3", "Charlie")]
df1 = spark.createDataFrame(data1, ["id", "name"])
# Second DataFrame
data2 = [("1", "HR"), ("2", None), ("4", "Finance")]
df2 = spark.createDataFrame(data2, ["id", "department"])
๐น Join the DataFrames
Python
joined_df = df1.join(df2, on="id", how="left")
๐น Handle Null Values
You can replace nulls using coalesce() or fillna():
Python
# Replace nulls in 'department' column with 'Unknown'
cleaned_df = joined_df.withColumn("department", coalesce(joined_df["department"], lit("Unknown")))
# Alternatively using fillna
# cleaned_df = joined_df.fillna({"department": "Unknown"})
๐น Show Result
Python
cleaned_df.show()
6. Find the nth highest salary using PySpark DataFrame API.
To find the nth highest salary using the PySpark DataFrame API, you can use the dense_rank() or row_number() window function. Here's a complete example:
✅ PySpark Code to Find the nth Highest Salary
Python
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, dense_rank
# Initialize Spark session
spark = SparkSession.builder.appName("NthHighestSalary").getOrCreate()
# Sample employee salary data
data = [("Alice", 5000), ("Bob", 6000), ("Charlie", 7000),
("David", 6000), ("Eve", 8000), ("Frank", 7000)]
df = spark.createDataFrame(data, ["name", "salary"])
# Define window specification
window_spec = Window.orderBy(col("salary").desc())
# Add rank column
ranked_df = df.withColumn("rank", dense_rank().over(window_spec))
# Set the value of n
n = 3
# Filter for nth highest salary
nth_highest_salary_df = ranked_df.filter(col("rank") == n)
nth_highest_salary_df.show()
Show less
๐ Explanation
dense_rank() ensures that if two people have the same salary, they get the same rank.
You can change n = 3 to any other number to get the nth highest salary.
If you want to exclude duplicates, use row_number() instead of dense_rank().
Would you like to see how to do this using SQL or RDD as well?
7. Flatten a nested JSON column in PySpark DataFrame.
To flatten a nested JSON column in a PySpark DataFrame, you can use the explode() function for arrays and selectExpr() or withColumn() for nested structs. Here's a step-by-step example:
✅ Sample Nested JSON
Assume you have a DataFrame like this:
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
spark = SparkSession.builder.appName("FlattenJSON").getOrCreate()
# Sample nested JSON data
data = [
{
"id": 1,
"name": "Alice",
"address": {
"city": "Hyderabad",
"state": "Telangana"
},
"phones": ["12345", "67890"]
}
]
df = spark.read.json(spark.sparkContext.parallelize(data))
df.printSchema()
Show more lines
๐น Flatten Struct Columns
Python
flat_df = df.select(
"id",
"name",
"address.city",
"address.state",
"phones"
)
8. Write a PySpark job to calculate moving average of stock prices.
Here’s a PySpark job to calculate the moving average of stock prices using the DataFrame API and window functions:
✅ PySpark Code: Moving Average of Stock Prices
Python
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, avg
# Initialize Spark session
spark = SparkSession.builder.appName("MovingAverageStockPrices").getOrCreate()
# Sample stock price data: (date, stock, price)
data = [
("2025-09-01", "AAPL", 180),
("2025-09-02", "AAPL", 182),
("2025-09-03", "AAPL", 185),
("2025-09-04", "AAPL", 183),
("2025-09-05", "AAPL", 187),
]
df = spark.createDataFrame(data, ["date", "stock", "price"])
# Define window spec for moving average (e.g., 3-day window)
window_spec = Window.partitionBy("stock").orderBy("date").rowsBetween(-2, 0)
# Calculate moving average
df_with_ma = df.withColumn("moving_avg", avg("price").over(window_spec))
df_with_ma.show()
Show more lines
๐ Explanation
rowsBetween(-2, 0) defines a 3-day moving window ending at the current row.
You can adjust the window size by changing the bounds.
partitionBy("stock") ensures the moving average is calculated per stock.
๐ Sample Output
+----------+-----+-----+-----------+
| date|stock|price|moving_avg|
+----------+-----+-----+-----------+
|2025-09-01| AAPL| 180| 180.0|
|2025-09-02| AAPL| 182| 181.0|
|2025-09-03| AAPL| 185| 182.33|
|2025-09-04| AAPL| 183| 183.33|
|2025-09-05| AAPL| 187| 185.0|
+----------+-----+-----+-----------+
Would you like to apply this to a real dataset or visualize the moving average trend? You can upload your file and I’ll help you process it.
9. Explain how to implement a custom UDF (User Defined Function) in PySpark.
To implement a custom UDF (User Defined Function) in PySpark, follow these steps:
✅ Step-by-Step: Creating a UDF in PySpark
1. Import Required Modules
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
2. Initialize Spark Session
Python
spark = SparkSession.builder.appName("CustomUDFExample").getOrCreate()
3. Create Sample DataFrame
Python
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])
4. Define a Python Function
Python
def age_category(age):
if age < 30:
return "Young"
elif age < 40:
return "Adult"
else:
return "Senior"
5. Register the Function as a UDF
Python
age_category_udf = udf(age_category, StringType())
6. Apply the UDF to the DataFrame
Python
df_with_category = df.withColumn("category", age_category_udf(df["age"]))
df_with_category.show()
๐ Output
+-------+---+--------+
| name|age|category|
+-------+---+--------+
| Alice| 25| Young|
| Bob| 30| Adult|
|Charlie| 35| Adult|
+-------+---+--------+
๐ Notes
UDFs are less efficient than built-in functions because they bypass Spark’s Catalyst optimizer.
Prefer using Spark SQL functions when possible for better performance.
You can also use Pandas UDFs for vectorized operations if you're working with large datasets.
Would you like to see how to register a UDF for use in Spark SQL queries or how to use Pandas UDFs for performance?
10. Write a PySpark streaming job that reads from Kafka and counts events per minute.
Here’s a complete example of a PySpark Structured Streaming job that reads data from Kafka and counts events per minute:
✅ PySpark Streaming Job: Count Events Per Minute from Kafka
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col
# Initialize Spark session
spark = SparkSession.builder \
.appName("KafkaEventCountPerMinute") \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# Read from Kafka topic
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "your_topic_name") \
.option("startingOffsets", "latest") \
.load()
# Convert Kafka value to string and add timestamp
events_df = kafka_df.selectExpr("CAST(value AS STRING)", "timestamp")
# Count events per minute using window
events_per_minute = events_df \
.groupBy(window(col("timestamp"), "1 minute")) \
.count()
# Write output to console (can be changed to other sinks like HDFS, Kafka, etc.)
query = events_per_minute.writeStream \
.outputMode("complete") \
.format("console") \
.option("truncate", "false") \
.start()
query.awaitTermination()
Show less
๐ Key Notes
Kafka Setup: Ensure Kafka is running and the topic (your_topic_name) exists.
Windowing: window(timestamp, "1 minute") groups events into 1-minute buckets.
Output Mode: "complete" outputs the full aggregation result each time.
Sink Options: You can replace console with parquet, Kafka, or memory for production use.
Would you like help setting up Kafka locally or modifying this job to include filtering or custom parsing of the Kafka messages?
how to program a file data that is partitioning into four files in pyspark??
To partition a file's data into four separate files using PySpark, you can follow these general steps:
✅ Step-by-Step Guide
Initialize SparkSession
Read the input file into a DataFrame
Repartition the DataFrame into 4 partitions
Write each partition to a separate file
Here’s a sample code snippet to demonstrate this:
Python
from pyspark.sql import SparkSession
# Step 1: Create SparkSession
spark = SparkSession.builder \
.appName("Partition File into Four") \
.getOrCreate()
# Step 2: Read the input file (e.g., CSV)
df = spark.read.csv("path/to/input_file.csv", header=True, inferSchema=True)
# Step 3: Repartition into 4 partitions
df_repartitioned = df.repartition(4)
# Step 4: Write each partition to separate files
df_repartitioned.write \
.mode("overwrite") \
.option("header", True) \
.csv("path/to/output_folder")
Show less
๐ Notes:
The output will be saved as four part files (e.g., part-00000, part-00001, etc.) inside the output_folder.
You can change the format from CSV to Parquet, JSON, etc., by replacing .csv() with .parquet() or .json().
Would you like help adapting this to a specific file format or dataset you're working with? You can also upload the file if you'd like me to generate the exact code for it.
how to program a file data that is partitioning into four files by couloum and date in pyspark??
To programmatically partition a CSV file into four files based on a column and date using PySpark, you can follow this structured approach:
✅ Goal
Split the data into 4 separate files:
Based on date ranges (e.g., quarterly or custom ranges).
Each file is partitioned by a column (e.g., category, region, etc.).
๐งช Sample PySpark Code
Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Step 1: Create SparkSession
spark = SparkSession.builder.appName("Partition CSV by Column and Date").getOrCreate()
# Step 2: Read the CSV file
df = spark.read.csv("path/to/input.csv", header=True, inferSchema=True)
# Step 3: Define your date ranges (customize as needed)
df1 = df.filter(col("date") < "2023-01-01")
df2 = df.filter((col("date") >= "2023-01-01") & (col("date") < "2023-06-01"))
df3 = df.filter((col("date") >= "2023-06-01") & (col("date") < "2024-01-01"))
df4 = df.filter(col("date") >= "2024-01-01")
# Step 4: Write each filtered DataFrame partitioned by a column (e.g., 'category')
df1.write.partitionBy("category").csv("output/part1", mode="overwrite", header=True)
df2.write.partitionBy("category").csv("output/part2", mode="overwrite", header=True)
df3.write.partitionBy("category").csv("output/part3", mode="overwrite", header=True)
df4.write.partitionBy("category").csv("output/part4", mode="overwrite", header=True)
Show less
๐ Output Structure
Each folder (part1, part2, etc.) will contain subfolders like:
output/part1/
├── category=A/
│ └── part-00000.csv
├── category=B/
│ └── part-00000.csv
...
๐ ️ Customization Options
You can change the partition column to any other column like region, type, etc.
You can adjust the date ranges to suit your data (monthly, quarterly, yearly).
You can also write in Parquet or JSON by replacing .csv() with .parquet() or .json().


0 comments:
Post a Comment
Note: only a member of this blog may post a comment.