from pyspark.sql import SparkSession # Sample data data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Alice", 25)] df = spark.createDataFrame(data, ["id", "name", "age"]) df.createOrReplaceTempView("people") # Select distinct names unique_names_df = spark.sql( "SELECT DISTINCT name" "FROM people" ) # Show result unique_names_df.show()
# Count the number of rows in the DataFrame count_df = spark.sql( "SELECT COUNT(*) as total_count" "FROM people" ) # Show result count_df.show()
# Group by name and count occurrences group_by_count_df = spark.sql( "SELECT name, COUNT(*) as name_count" "FROM people" "GROUP BY name" ) # Show result group_by_count_df.show()
# Sample data data = [("Alice", 1000), ("Bob", 1500), ("Alice", 2000)] df2 = spark.createDataFrame(data, ["name", "salary"]) df2.createOrReplaceTempView("salaries") # Group by name and sum salaries sum_salaries_df = spark.sql( "SELECT name, SUM(salary) as total_salary" "FROM salaries" "GROUP BY name" ) # Show result sum_salaries_df.show()
# Group by name and calculate average salary avg_salaries_df = spark.sql( "SELECT name, AVG(salary) as avg_salary" "FROM salaries" "GROUP BY name" ) # Show result avg_salaries_df.show()
# Filter records where salary is greater than 1200 filter_df = spark.sql( "SELECT * " "FROM salaries" "WHERE salary > 1200" ) # Show result filter_df.show()
# Order records by salary in descending order order_by_df = spark.sql( "SELECT * " "FROM salaries" "ORDER BY salary DESC" ) # Show result order_by_df.show()
from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, col # Sample data data = [("Hello world",), ("Hello PySpark",), ("Spark is great",)] df = spark.createDataFrame(data, ["text"]) # Split the text into words words_df = df.select(explode(split(col("text"), " ")).alias("word")) # Count occurrences of each word word_count_df = words_df.groupBy("word").count() # Show result word_count_df.show()
# Sample data data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Cathy", 22)] df = spark.createDataFrame(data, ["id", "name", "age"]) # Filter rows where age > 25 filtered_df = df.filter(col("age") > 25) # Show result filtered_df.show()
# Sample data for DataFrame 1 data1 = [(1, "Alice"), (2, "Bob"), (3, "Cathy")] df1 = spark.createDataFrame(data1, ["id", "name"]) # Sample data for DataFrame 2 data2 = [(1, "HR"), (2, "Engineering"), (4, "Marketing")] df2 = spark.createDataFrame(data2, ["id", "department"]) # Inner join on 'id' joined_df = df1.join(df2, on="id", how="inner") # Show result joined_df.show()
from pyspark.sql.functions import avg # Sample data data = [("Alice", "HR", 25), ("Bob", "Engineering", 30), ("Cathy", "HR", 28)] df = spark.createDataFrame(data, ["name", "department", "age"]) # Group by department and calculate average age avg_age_df = df.groupBy("department").agg(avg("age").alias("avg_age")) # Show result avg_age_df.show()
from pyspark.sql.functions import udf from pyspark.sql.types import StringType # Sample data data = [(1, "Alice"), (2, "Bob"), (3, "Cathy")] df = spark.createDataFrame(data, ["id", "name"]) # Define a UDF to add a prefix to a name def add_prefix(name): return "Mr./Ms. " + name add_prefix_udf = udf(add_prefix, StringType()) # Apply the UDF df_with_prefix = df.withColumn("name_with_prefix", add_prefix_udf(col("name"))) # Show result df_with_prefix.show()
# Sample data with missing values data = [(1, "Alice", 25), (2, "Bob", None), (3, "Cathy", 28)] df = spark.createDataFrame(data, ["id", "name", "age"]) # Fill missing values in 'age' with a default value of 0 filled_df = df.na.fill({"age": 0}) # Show result filled_df.show()
# Sample data data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Cathy", 22)] df = spark.createDataFrame(data, ["id", "name", "age"]) # Write DataFrame to CSV df.write.csv("/path/to/output", header=True)
In this guide, we will deep dive into how to manage data lakes on Databricks using SQL within PySpark. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.
Let's start by loading data into a data lake using PySpark SQL queries.
spark.sql("SELECT * FROM '/mnt/data/sample.csv'").show()
After loading data into the data lake, you can perform transformations using SQL queries in PySpark. For example, let's perform a group by operation.
spark.sql("SELECT category, SUM(price) FROM sales GROUP BY category").show()
Once you have processed the data, you can write it back to your data lake in various formats such as Parquet using PySpark.
df_grouped.write.format("parquet").save("/mnt/data/output/")
This code finds duplicate records in the data based on a specific column using SQL commands in PySpark.
spark.sql("SELECT email, COUNT(email) FROM customers GROUP BY email HAVING COUNT(email) > 1").show()
This code retrieves the top categories by the total price using SQL queries in PySpark.
spark.sql("SELECT category, SUM(price) FROM sales GROUP BY category ORDER BY SUM(price) DESC LIMIT 10").show()
In this guide, we will deep dive into how to manage data lakes on Databricks using PySpark. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.
Let's start by loading data into a data lake using PySpark on Databricks.
df = spark.read.format("csv").option("header", "true").load("/mnt/data/sample.csv")
df.show()
After loading data into the data lake, you can perform transformations using PySpark. For example, let's perform a group by operation.
df_grouped = df.groupBy("category").sum("price")
df_grouped.show()
Once you have processed the data, you can write it back to your data lake in various formats such as Parquet or Delta.
df_grouped.write.format("parquet").save("/mnt/data/output/")
This code finds duplicate records in the data based on a specific column.
df_duplicates = df.groupBy("email").count().filter("count > 1")
df_duplicates.show()
This code retrieves the top categories by the total price, similar to how you might use SQL's GROUP BY.
df_top_categories = df.groupBy("category").sum("price").orderBy("sum(price)", ascending=False)
df_top_categories.show(10)