In a world where data is being generated at an extremely fast rate, the correct analysis of the data and providing useful and meaningful results at the right time can provide helpful solutions for many domains dealing with data products. This can be applied in Health Care and Finance to Media, Retail, Travel Services and etc. some solid examples include Netflix providing personalized recommendations at real-time, Amazon tracking your interaction with different products on its platform and providing related products immediately, or any business that needs to stream large amount of data at real-time and implement different analysis on it.
One of the amazing frameworks that can handle big data in real-time and perform different analysis, is Apache Spark . I think there are many resources that provide information about different features of Spark and how popular it is in the big data community but shortly mentioning the core features of Spark: it does fast big data processing employing Resilient Distributed Datasets (RDDs), streaming and Machine learning on a large scale at real-time. in this article, we will go through with a hands-on session to solve a business problem using Spark Streaming component with pySpark.
Nobody can deny the importance of social media in today’s society. a lot of businesses collect their audience’s insight , feedback , and interests through twitter activities. many businesses have realized that a prominent analysis on social media activities can help them better understand the patterns in their customer’s behavior. often, the feedback from social media can change rapidly and real-time analysis of that feedback is an essential process in the success of a business. as a result, there are different ways to get a better understanding of how people respond to a new product , brand or an event . one way is to evaluate the sentiment of the tweets related to a specific topic; this topic can be anything related to a product, brand or an event. I have implemented sort of a similar idea previously in this project . another smart way would be extracting the top #tags related to our desired topic during specific periods of time, like every few minutes since tweets with #tags have a higher engagement rate.
we are going to work with pySpark to utilize Spark Streaming component and use TCP Sockets in python to get connected to Twitter’s streaming API through Tweepy library. after streaming tweets and having them on RDDs in Spark, we apply some operations on tweets to extract the top hashtags. next we save the top hashtags on a temporary SQL table using Spark SQL. finally, we apply some visualization techniques in python to depict the results in a graph. below image represents the overall architecture of our program.
it is also good to know some other ways that Spark Streaming can be used:
Our program is going to have two modules:
1- receive-Tweets, this module handles the authentication and connection to the Twitter’s streaming API through Tweepy library. this module gets triggered only when it receives the call from Spark Streaming module and sends the tweets to Spark engine through TCP socket.
2- top#tags-streaming, this module initiates StreamingContext in pySpark and receives data through socketTextStream. next we apply some lambda functions to clear tweets and extract the #tags. and then we visualize the top 10 #tags related to our desired topis that we searched for (new product, brand or an event).
- we will briefly go through steps of the first module:
we import Tweepy and its required packages. for accessing Twitter’s API, we need to have four authentication keys which we can obtain by visiting developer.twitter.com and register a new app. as you can see, I have left them blank since sharing your authentication keys with public is not safe. next we create the TweetsListener class that inherits from StreamListener module in Tweepy and we override the on_data() method so we can send the tweets through the socket. we can search for the word “Football” as our desired topic since it is quite popular on twitter and we can get tweets very fast but we can always change the search word based on our use case. we are not going to run this python file (receive-Tweets.py) yet, since we need to set up our SparkStreamingContext first.
for more information about networking in python and how Tweepy Streaming works, please see the below links:
Streaming With Tweepy - tweepy 3.5.0 documentation
In Tweepy, an instance of tweepy.Stream establishes a streaming session and routes messages to StreamListener instance… docs.tweepy.org
- for the second module, we are going to use findspark library to locate Spark on our local machine and then we import necessary packages from pyspark.
import findspark findspark.init('directory that contains Apache Spark')
# import necessary packages
from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from pyspark.sql.functions import desc
next we initiate SparkContext(). SparkContext is the entry point to any spark functionality. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. SparkContext here uses Py4J to launch a JVM and creates a JavaSparkContext ( source ). it is important to note that only one SparkContext may be running at each session. after that, we initiate the StreamingContext() with 10-second batch intervals, it means that the input streams will be divided into batches every 10 seconds during the streaming.
sc = SparkContext()
# we initiate the StreamingContext with 10 second batch interval. #next we initiate our sqlcontext ssc = StreamingContext(sc, 10) sqlContext = SQLContext(sc)
the next step is to assign our input source of streaming and then put the incoming data in lines:
# initiate streaming text from a TCP (socket) source: socket_stream = ssc.socketTextStream("127.0.0.1", 5555)
# lines of tweets with socket_stream window of size 60, or 60 #seconds windows of time lines = socket_stream.window(60)
it is important to note that we are using the same port number (5555) as we used in the first module to send the tweets and the IP address is the same since we are running things on our local machine. in addition, we are using the window() function to determine that we are analyzing tweets every minute (60 seconds) to see what the top 10 #tags are during that time.
here is a useful image to better understand how window() function works in Spark:
now that we have passed all the per-requisites, lets see how we can clean the tweets that start with # and save the top 10 in a temporary SQL table:
# just a tuple to assign names
from collections import namedtuple
fields = ("hashtag", "count" ) Tweet = namedtuple( 'Tweet', fields )
# here we apply different operations on the tweets and save them to #a temporary sql table
( lines.flatMap( lambda text: text.split( " " ) ) #Splits to a list # Checks for hashtag calls .filter( lambda word: word.lower().startswith("#") ) .map( lambda word: ( word.lower(), 1 ) ) # Lower cases the word .reduceByKey( lambda a, b: a + b ) # Stores in a Tweet Object .map( lambda rec: Tweet( rec, rec ) ) # Sorts Them in a dataframe .foreachRDD( lambda rdd: rdd.toDF().sort( desc("count") ) # Registers only top 10 hashtags to a table. .limit(10).registerTempTable("tweets") ) )
we created a namedtuple object to save the tags with their counts. next we used flatmap() to create an array of tokenized tweets. we are using lambda functions because they require less memory and run faster. after that, we filter tweets that do not start with #. foreachRDD() is an important output function in pySpark that can help run faster operation on the RDDs. here we apply it to each RDD to convert it to a dataframe and finally we save it to a temporary table called “tweets”.
Now, we can run receive-Tweets.py and after that we can start streaming by running:
# start streaming and wait couple of minutes to get enought tweets ssc.start()
as we can see, I have two terminals open in Linux. one which is running my Jupiter notebook for pySpark streaming and in the other one, we run our read-tweets.py file.
after running receive-Tweets.py, we must wait couple of minutes so we have enough tweets to work with:
next we import our visualization libraries and draw the graph for top 10 #tags in “Football” topic. here we run our streaming to check the top 10 #tags every minute only few (5) times just for the sake of learning purpose and you can see that the # tags do not change that often but if you want to see better results, you can keep it running for a while:
# import libraries to visualize the results
import time from IPython import display import matplotlib.pyplot as plt import seaborn as sns import pandas %matplotlib inline
count = 0 while count < 5: time.sleep(5) top_10_tags = sqlContext.sql( 'Select hashtag, count from tweets' ) top_10_df = top_10_tags.toPandas() display.clear_output(wait=True) plt.figure( figsize = ( 10, 8 ) ) sns.barplot( x="count", y="hashtag", data=top_10_df) plt.show() count = count + 1 print(conut)
and here are the results:
Important Take Outs:
- this was a simple example to understand how the Streaming component of Apache spark works. there are always more sophisticated ways to apply the same approach to process different kinds of input streams like user interaction data from a popular website like youtube or Amazon (media, retail). and the Stock Market is another use case that streaming massive amount of data and running different analysis in real-time is crucial for Stock broker companies and Apache Spark can be a top choice tool.
another important component of Apache Spark is MLlib that enables the users to train and deploy useful machine learning models on large scales. in my next article, I go through an interesting project with detailed explanation on how to train and build a model in Spark MLlib.
like always, the code and jupyter notebook is available on my Github .
Questions and Comments are highly appreciated.
Hands-On Big Data Streaming, Apache Spark at scale