JSON
Streaming
→ Batch Processing
→ Stream Processing
→ Socket Processing
So far, we have only worked with CSV files
Spark has great support for JSON data as well!
JSON data can be saved as either RDDs or Dataframes
The choice of which one to choose depends on the data format, and the intended analysis
For RDDs, json.loads
is used to translate JSON strings into Python objects
import json
path = "/FileStore/tables/sample_users.json"
rdd = spark.sparkContext.textFile(path).map(json.loads)
print(rdd.take(3))
filtered_rdd = rdd.filter(lambda x: \
any(event["type"] == "purchase" for event in x["events"]))
filtered_results = filtered_rdd.collect()
for record in filtered_results:
print(record)
for entry in rdd.collect():
print(type(entry))
print(f'{entry["name"]} with id: {entry["id"]} '
f'has {len(entry["events"])} events.'
)
read.json()
translates JSON strings into DFs
For nested arrays, the explod
method might be used
from pyspark.sql.functions import col, explode
path = "/FileStore/tables/sample_users.json"
df = spark.read.json(path)
display(df)
df_exploded= df.withColumn("event", explode("events"))
display(df_exploded)
df_exploded2 = df.withColumn("event", explode("events")) \
.select(
col("id"),
col("name"),
col("event.type").alias("type"),
col("event.timestamp").alias("timestamp")
)
display(df_exploded2)
Real-time data processing
Stream processing requires working with data that is constantly being updated
Data can come from multiple sources: sensors, IoTs, web applications, APIs, etc.
Apache Spark provides a structured streaming architecture
The data stream is treated as an unbounded table that is continuously appended
New data in the stream results in new rows in the unbounded table
There are three major components for streaming:
Data Source - Where the data comes from:
Files, Sockets, or an API
Streaming Engine- What we do with the data:
Filtering, Grouping, Sorting
Data Sink - Where we save our results:
Memory, Console, File, etc.
Allow us to go over all files in a given directory
Helpful for going over data spread over multiple files
from pyspark.sql.types import *
inputPath = "/databricks-datasets/structured-streaming/events/"
jsonSchema = StructType([StructField("time", TimestampType(),True),\
StructField("action", StringType(), True)])
staticInputDF = spark.read.schema(jsonSchema).json(inputPath)
display(staticInputDF)
from pyspark.sql.functions import window, col
staticCountsDF = staticInputDF.groupBy(col("action"), \
window(col("time"), "30 minutes")).count()
display(staticCountsDF)
The window()
function groups together rows that fall within a specified interval
Not to be confused with the Window()
class
It generates a single column with two stringType components: one for the start and one for the end time
from pyspark.sql.functions import date_format
display(staticCountsDF.groupBy(staticCountsDF["action"]).sum())
display(staticCountsDF.withColumn("time", \
date_format(staticCountsDF["window"].end, "MMM-dd HH:mm"))\
.drop(staticCountsDF["window"])\
.orderBy("time","action"))
date_format()
outputs a formatted timestamp
For Stream and Socket Processing, a data sink is needed
Data sinks need to be queried using the SQL language*
staticCountsDF.createOrReplaceTempView("static_counts")
%sql
select action, sum(count) as total_count from
static_counts group by action
select action, date_format(window.end, "MMM-dd HH:mm") as time,
count from static_counts order by time, action
Allow us to continuously go over all files in a given directory
The directory can be updated in real time
Helpful for automating tasks
from pyspark.sql.types import *
inputPath = "/databricks-datasets/structured-streaming/events/"
jsonSchema = StructType([StructField("time",TimestampType(),True),\
StructField("action", StringType(), True)])
streamingInputDF = spark.readStream \
.schema(jsonSchema) \
.option("maxFilesPerTrigger", 1) \
.json(inputPath)
streamingCountsDF = streamingInputDF.groupBy(col("action"), \
window(col("time"), "1 hour")).count()
streamingCountsDF.writeStream \
.format("memory")\
.queryName("counts")\
.outputMode("complete")\
.trigger(processingTime = "5 seconds")\
.start()
queryName()
sets up the name of the SQL table
outputMode()
options are append, complete, and update (link)
%sql
select action, sum(count) from counts group by action
select action, date_format(window.end, "MMM-dd HH:mm") as time, count
from counts order by time, action
Allow us to treat data coming from a socket
Frequently associated with IoT, sensors, or APIs
In our example, we will collect messages from a Netcat server
lines = spark \
.readStream \
.format("socket") \
.option("host", "159.203.31.149") \
.option("port", 9999) \
.load()
from pyspark.sql.functions import explode, split
words = lines.select(explode(
split(lines.value, " ")).alias("word"))
wordCounts = words.groupBy("word").count()
In the previous example, split
is used to break down each message into a list with individual words
explode
is used to break down each list into multiple rows
data = [("Alice", "apple,banana,grape"), ("Bob", "orange,mango")]
df = spark.createDataFrame(data, ["name", "fruits"])
df = df.withColumn("fruits", split(df["fruits"], ","))
display(df)
df_explode = df.select("name", explode(df["fruits"]).alias("fruit"))
display(df_explode)
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("memory")\
.queryName("wordCounts")\
.trigger(processingTime = "1 seconds")\
.start()
%sql
select * from wordCounts