Practice Set 1
Install everything necessary to make spark work
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark
Set the paths to the installs
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"
Find the spark installation
import findspark
findspark.init()
Start doing fancy pyspark stuff
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("programming")
.master("local")
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config('spark.ui.port', '4050')
.getOrCreate()
)
Start Here
Load the data from the files below into dataframes and answer the questions under 'Analyze the data'.
Resources
orders.csv
customers.csv
# TODO
orders_df = None
# TODO
customers_df = None
Analyze the data
Show total orders and revenue.
What's the average order revenue per day?
Which order has the most revenue each day?
How much revenue by customer zip code?
Who has been a customer the longest?
Which customer has the least amount of orders?
Practice Set 2
Install everything necessary to make spark work. :)
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark
Set the paths to the installs
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"
Find the spark installation
import findspark
findspark.init()
Start doing fancy pyspark stuff
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("programming")
.master("local")
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config('spark.ui.port', '4050')
.getOrCreate()
)
Create the table
OUTPUT_DELTA_PATH = './output/delta/'
spark.sql('CREATE DATABASE IF NOT EXISTS EXERCISE')
spark.sql('''
CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS(
worked_date date
, worker_id int
, delete_flag string
, hours_worked double
) USING DELTA
PARTITIONED BY (worked_date)
LOCATION "{0}"
'''.format(OUTPUT_DELTA_PATH)
)
Start Here Load all the files into the table created above, EXERCISE.WORKED_HOURS. Assume each file is received daily, to simulate a daily load process. Meaning, only one file should be read/loaded at a time. Rules:
A worker can only work once per day.
delete_flag = Y means the record should be removed from the table, if exists.
Resources:
20210609_worked_hours.csv
20210610_worked_hours.csv
20210611_worked_hours.csv
Load 2021-06-09 file and output table contents
Load 2021-06-10 file and output table contents
Load 2021-06-11 file and output table contents
Practice Set 3
Install everything necessary to make spark work.
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark
Set the paths to the installs
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"
Find the spark installation
import findspark
findspark.init()
Start doing fancy pyspark stuff
from pyspark.sql import SparkSession, DataFrame
spark = (
SparkSession
.builder
.appName("programming")
.master("local")
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config('spark.ui.port', '4050')
.getOrCreate()
)
from pyspark.sql.functions import expr, rand, col, unix_timestamp
# This will generate 30 rows per emp_count requested.
def get_sample_data(emp_count: int) -> DataFrame:
start_date = '2021-04-01'
end_date = '2021-04-30'
theseed = 42
return (
spark.sql("select explode(sequence({0}, {1})) as emp_id".format(1, emp_count))
.withColumn('start_datetime', expr("explode(sequence(cast('{0}' as timestamp), cast('{1}' as timestamp), interval 1 day))".format(start_date, end_date)))
.withColumn('rand_time', (rand(theseed) * 86401).cast('int')) # find a time for the shift to start
.withColumn('start_shift_datetime', (unix_timestamp(col('start_datetime')) + col('rand_time')).cast('timestamp'))
.withColumn('rand_shift_length', (rand(theseed) * 21600 + 7200).cast('int')) # assume a shift is between 2-8 hrs in length
.withColumn('end_shift_datetime', (unix_timestamp(col('start_shift_datetime')) + col('rand_shift_length')).cast('timestamp'))
.select(col('emp_id'), col('start_shift_datetime'), col('end_shift_datetime'))
)
Start Here
Find what day part (AM, PM, LUNCH) a shift mostly occurs in. If there is a tie indicate as such.
AM is before 1100
LUNCH is between 1100 (inclusive) and 1400 (exclusive)
PM is 1400 or later
Each hour a shift occurs in counts toward that day part.
For example, if a shift starts at 1045 and ends at 1530 the breakdown is as follows.
AM: 10
LUNCH: 11, 12, 13
PM: 14, 15
This shift occurs the most during the LUNCH daypart. Sample output below.
Start with an outline of how the problem could be solved. It is expected that this will not be fully completed within the self-imposed time limit of the exercise. This is used to guage how you think about and approach complex problems.
How does your solution scale as data quantity increases? Keeping in mind the resource limitations that Google Collab has.
def get_results(df: DataFrame) -> DataFrame:
result = (
df
# TODO: This is where the code goes to generate the result being returned.
)
# Make it actually do all the calculations but not save anywhere
result.write.format('noop').mode('overwrite').save()
return (
result
.select(
col('emp_id')
, col('start_shift_datetime')
, col('end_shift_datetime')
, col('day_part')
)
)
Use this section to do some light performance testing on your solution. Keep note of how the time changes as the records increase. Is the change linear, constant, etc?
import time
sample_data = get_sample_data(1000) # 1000 = 30000 records
s = time.time()
rsp = get_results(df = sample_data)
print("Took {0} seconds\n".format((time.time() - s)))
Practice Set 4
Install everything necessary to make spark work. :)
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark
Set the paths to the installs
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"
Find the spark installation
import findspark
findspark.init(
Start doing fancy pyspark stuff
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType
from pyspark.sql import DataFrame, SparkSession
INPUT_FILE = './resources/Order.json' # TODO: Change this based on actual location for your environment setup
OUTPUT_CSV_FILE = './output/files/output.csv'
OUTPUT_DELTA_PATH = './output/delta/'
spark = (
SparkSession
.builder
.appName("programming")
.master("local")
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config('spark.ui.port', '4050')
.getOrCreate()
)
def read_json(file_path: str, schema: StructType) -> DataFrame:
"""
The goal of this method is to parse the input json data using the schema from another method.
We are only interested in data starting at orderPaid attribute.
:param file_path: Order.json will be provided
:param schema: schema that needs to be passed to this method
:return: Dataframe containing records from Order.json
"""
return None
def get_struct_type() -> StructType:
"""
Build a schema based on the the file Order.json
:return: Structype of equivalent JSON schema
"""
discount_type = StructType([StructField("amount", IntegerType(), True),
StructField("description", StringType(), True)
])
child_item_type = StructType([StructField("lineItemNumber", StringType(), True),
StructField("itemLabel", StringType(), True),
StructField("quantity", DoubleType(), True),
StructField("price", IntegerType(), True),
StructField("discounts", "TODO", True),
])
item_type = StructType([StructField("lineItemNumber", StringType(), True),
StructField("itemLabel", StringType(), True),
StructField("quantity", DoubleType(), True),
StructField("price", IntegerType(), True),
StructField("discounts", "TODO", True),
StructField("childItems", "TODO", True),
])
order_paid_type = StructType([StructField("orderToken", StringType(), True),
StructField("preparation", StringType(), True),
StructField("items", "TODO", True),
])
message_type = StructType([StructField("orderPaid", "TODO", True)])
data_type = StructType([StructField("message", "TODO", True)])
body_type = StructType([StructField("id", StringType(), True),
StructField("subject", StringType(), True),
StructField("data", "TODO", True),
StructField("eventTime", StringType(), True),
])
return None
def get_rows_from_array(df: DataFrame) -> DataFrame:
"""
Input data frame contains columns of type array. Identify those columns and convert them to rows.
:param df: Contains column with data type of type array.
:return: The dataframe should not contain any columns of type array
"""
return None
def get_unwrapped_nested_structure(df: DataFrame) -> DataFrame:
"""
Convert columns that contain multiple attributes to columns of their own
:param df: Contains columns that have multiple attributes
:return: Dataframe should not contain any nested structures
"""
return None
def write_df_as_csv(df: DataFrame) -> None:
"""
Write the data frame to a local local destination of your choice with headers
:param df: Contains flattened order data
"""
return None
def create_delta_table(spark: SparkSession) -> None:
spark.sql('CREATE DATABASE IF NOT EXISTS EXERCISE')
spark.sql('''
CREATE TABLE IF NOT EXISTS EXERCISE.ORDERS(
OrderToken String,
Preparation String,
ItemLineNumber String,
ItemLabel String,
ItemQuantity Double,
ItemPrice Integer,
ItemDiscountAmount Integer,
ItemDiscountDescription String,
ChildItemLineNumber String,
ChildItemLabel String,
ChildItemQuantity Double,
ChildItemPrice Integer,
ChildItemDiscountAmount Integer,
ChildItemDiscountDescription String
) USING DELTA
LOCATION "{0}"
'''.format(OUTPUT_DELTA_PATH))
return None
def write_df_as_delta(df: DataFrame) -> None:
"""
Write the dataframe output to the table created, overwrite mode can be used
:param df: flattened data
:return: Data from the orders table
"""
pass
def read_data_delta(spark: SparkSession) -> DataFrame:
"""
Read data from the table created
:param spark:
:return:
"""
return None
if __name__ == '__main__':
input_schema = get_struct_type()
input_df = read_json(INPUT_FILE, input_schema)
arrays_to_rows_df = get_rows_from_array(input_df)
unwrap_struct_df = get_unwrapped_nested_structure(arrays_to_rows_df)
write_df_as_csv(unwrap_struct_df)
create_delta_table(spark)
write_df_as_delta(unwrap_struct_df)
result_df = read_data_delta(spark)
result_df.show(truncate=False)
Contact Us to get solution of above problem or need any other help related to Big data Spark at:
realcode4you@gmail.com
Comments