技术控

    今日:124| 主题:49179
收藏本版 (1)
最新软件应用技术尽在掌握

[其他] How-to: Do Scalable Graph Analytics with Apache Spark

[复制链接]
喜大普奔 发表于 2016-10-4 06:12:55
140 1

立即注册CoLaBug.com会员,免费获得投稿人的专业资料,享用更多功能,玩转个人品牌!

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
Get started with scalable graph analysis via simple examples that utilize GraphFrames and Spark SQL on HDFS.
   Graphs—also known as “networks”—are ubiquitous across web applications. As a refresher, a graph consists of nodes and edges . A node can be any object, such as a person or an airport, and an edge is a relation between two nodes, such as a friendship or an airline connection between two cities. Social networks and content networks (which comprise interlinked documents, such as web pages or citation networks) are other very common examples of a graph. Finally, the Internet of Things is basically a huge “graph of graphs”.
   Why should one care about graph theory ? Because the relations between things, or rather the links in a network, contain information that complements that about individual components. One common relevant use case here is topology analysis , from which one can derive information about graph properties. If you can track changes in topology, you can track changes across the entire system. Thus, topological analysis is a handy tool in multiple fields, including financial risk management and predictive maintenance.
   In the remainder of this post, you will get some introductory hands-on exposure to graph analysis using the GraphFrames package in Apache Spark. As you’ll see, there are some powerful advantages to using GraphFrames and GraphX on HDFS in combination with Spark SQL. ( Important note : Neither GraphX nor GraphFrames are considered production-ready, nor are they currently supported by Cloudera. This information presented here is for self-educational purposes with respect to graph analytics, only.)
  Prerequisities

   For both examples, you’ll need to install Cloudera QuickStart, which is available in VM and Docker versions. (CDH 5.5/Spark 1.5/GraphFrames 0.1.0 are used for the examples here, but the current versions—CDH 5.8/Spark 1.6/GraphFrames 0.2.0—should work just as well.) The GraphFrame package is available in the spark-packages repository. In addition to understanding the basic concepts of graph analysis, you’ll also need to know how to use RDDs and DataFrames in Spark.
   Some code is skipped here for brevity, but complete exercise scripts, sample data, and the full tutorial as PDF file are available in this repo .
  Exercise 1: Calculating PageRank using Scalable Graph Analysis

   Google’s PageRank —perhaps the most famous graph-processing algorithm in Internet history—is calculated per node (or vertex). A web page’s PageRank is related to the importance of a node in the network and results from the topology (represented by links). In this example, we will use GraphFrames to calculate the PageRank for each node. Furthermore, calculating the clustering coefficient of a graph is based on the number of triplets and the number of fully connected triangles is possible, using GraphX.
   First, download this data file from the SNAP website. This file can be downloaded, decompressed, and loaded into HDFS using this bootstrap script; or, you can do all that manually:
  [code]$ wgethttps://snap.stanford.edu/data/facebook_combined.txt.gz
$ gunzip *.gz
$ hdfsdfs -putfacebook_combined.txt
[/code]  Start a spark-shell with the optional packages to work with GraphFrame and the CSV processing libraries:
  [code]$ spark-shell --packagesgraphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
[/code]   Note that the \ symbol indicates that input continues on the next line. The --package option allows you to specify a comma separated list of Apache Maven coordinates for required third-party packages used in your spark-shell session. Those will automatically be loaded by the Spark cluster from a central repository. Here we need the GraphFrame package and a library for CSV parsing from Databricks.
  Building the Graph

  Next, we’ll define a DataFrame by loading data from a CSV file, which is stored in HDFS.
   Our datafile facebook_combined.txt contains two columns to represent links between network nodes. The first column is called source ( src ), and the second is the destination ( dst ) of the link. (Some other systems, such as Gephi, use “source” and “target” instead.)
   First we define a custom schema, and than we load the DataFrame, using SQLContext .
  [code]scala> valcustomSchema = StructType(Array(StructField("src", IntegerType, true),StructField("dst", IntegerType, true)))
scala> valdf = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("delimiter", " ").schema(customSchema).load("facebook_combined.txt")
[/code]   Here we use space (” “) as a specific delimiter. More options can be defined as shown in the documentation for the CSV parsing library.
  In theory, this edge list contains all information to build a graph, but a GraphFrame needs nodes data and edges in separate DataFrames. Let’s transform the edge list to also get a node list.
   First, we have to extract all unique node IDs into a column with name name and than we add a column with name id to it. Currently, there is no equivalent to a row_number() function available, but in version 1.6 the component called  MonotonicallyIncreasingID.scala  supports monotonically increasing 64-bit integers. For now, it is fine to use the node ID as row id as well.
  [code]valn1 = edges.select("src").distinct()
valn2 = edges.select("dst").distinct()
val n = n1.unionAll(n2).withColumnRenamed("src","name").distinct()
valnodes = n.withColumn("id", n("name"))
edges.show()
nodes.show()
valg1 = GraphFrame(nodes, edges)
[/code]  All this work was preparation to get the graph data into the right shape by using DataFrames. Now, we are ready to analyze the graph.
  Enter GraphFrames

   There are multiple options for doing scalable graph analysis on top of HDFS. Currently, Apache Spark contains GraphX , but in the past, the deprecated Bagel project was used.
  Bagel was a low-level implementation of Google’s Pregel graph-processing framework. According to the docs, “Bagel operates on a graph represented as a distributed dataset of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state.” Its core idea was that “In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.” (Apache Giraph implements the same concept.) Although this is a low-level abstraction, it was a useful starting point for implementation and development of new graph-analysis algorithms.
   In contrast, the GraphX framework uses a  property graph model  , which is a popular high-level abstraction for representing a graph. In GraphX, the data is managed by RDDs – one for nodes and one for edges. The graph operations needen’t be implemented by a particular compute function, as in Bagel; instead, the GraphX library offers the graph class, which provides access to nodes, edges, and even to functionality to analyze the graph. The Pregel API is also available on this level in GraphX.
   So, what is relationship between GraphFrames and GraphX? I personally like to answer that question with an analogy: What a DataFrame is to an RDD, a GraphFrame is to the graph class in GraphX. With GraphFrames, you can simply define a graph from your DataFrames. One holds the node list and the other the edge list. No matter how the data is stored in HDFS, DataFrames does all the preparation—like filtering grouping and projections—before the specific graph algorithms, implemented in the GraphFrame class, are applied. Results of such computations can be new columns in the node list. And because this is a DataFrame, we can easily join and finally export results from multiple operations (as demonstrated later).
  Graph Analysis Made Easy

  Next, let’s do some graph analysis on the Facebook ego-net graph.
  We sort the nodes by degree and calculate the PageRank. Finally, we count the triangles in which a node is embedded.
  [code]val k = g1.degrees.sort(desc("degree"))
k.show()
valpr2 = g1.pageRank.resetProbability(0.15).maxIter(10).run()
pr2.vertices.show()
[/code]  Which are the nodes with the highest PageRank?
  [code]valpr3 = pr2.vertices.sort(desc("pagerank"))
pr3.show()
+----+----+------------------+
|name| id| pagerank|
+----+----+------------------+
|3434|3434|18.184854992469376|
|1911|1911|18.123485211404812|
|2655|2655|17.525235604357597|
|1902|1902| 17.35563914066005|
|1888|1888| 13.34316669756043|
+----+----+------------------+
onlyshowingtop 5 rows
[/code]  Into how many triangles are our nodes embedded?
  [code]valtcr = g1.triangleCount.run()
[/code]   Because we plan to use this data later again, we can use the persist() function, but this makes only sense if the dataset is not too big.
  [code]tcr.persist()
tcr.sort(desc("count")).limit(4).show()
+-----+----+----+
|count|name| id|
+-----+----+----+
|30025|1912|1912|
|15502|2543|2543|
|15471|2233|2233|
|15213|2464|2464|
+-----+----+----+
onlyshowingtop 4 rows
[/code]   Finally, we use these results to evaluate if the number of triangles is related to the PageRank. We join both results and export the table as a CSV file. This allows us to draw the scatter plot in gnuplot or R.
   tcr is the triangle-count result; we did not persist it, so we have to calculate it again next time we do the join. The PageRank was calculated as a per-vertex value in the pr2.vertices DataFrame.
  [code]scala> tcr.printSchema
root
|-- count: long (nullable = true)
|-- name: integer (nullable = true)
|-- id: integer (nullable = true)
scala> pr2.vertices.printSchema
root
|-- name: integer (nullable = true)
|-- id: integer (nullable = true)
|-- pagerank: double (nullable = true)
[/code]  Since both datasets are DataFrames, we can simply use the DataFrame functionality to join both on the id column:
  [code]scala> valscatter = pr2.vertices.join(tcr, pr2.vertices.col("id").equalTo(tcr("id")))
scala> scatter.limit(10).show
16/05/10 03:24:53 INFODAGScheduler: Job 18 finished: showat :51, took 653,309080 s
+----+----+-------------------+-----+----+----+
|name| id| pagerank|count|name| id|
+----+----+-------------------+-----+----+----+
| 31| 31|0.16137995810046757| 110| 31| 31|
| 231| 231|0.35635621183428823| 95| 231| 231|
| 431| 431| 0.2495015120580725| 1248| 431| 431|
| 631| 631|0.23890006729816274| 41| 631| 631|
| 831| 831| 0.3651072380090138| 64| 831| 831|
|1031|1031|0.15013201313406596| 5|1031|1031|
|1231|1231| 0.2566120648129746| 1452|1231|1231|
|1431|1431|0.44670540616673726|10441|1431|1431|
|1631|1631| 0.285116150576581| 35|1631|1631|
|1831|1831| 2.1506290231917156| 226|1831|1831|
+----+----+-------------------+-----+----+----+
[/code]   We export this result as a CSV file but to get only one result file, we use the coalesce function of the DataFrame:
  [code]$ spark-shell --packagesgraphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
0[/code]   Alternatively, we could also have used the getmerge command from HDFS to combine all the parts files into one single file—but in this case, it is important to use the option header=false , which turns the header lines off.
   Now, let’s plot our previous result data set in gnuplot. It was already exported into a folder named scatter_1.csv but to HDFS. If gnuplot is not available, simply install it (for example, by typing sudo yum install gnuplot on CentOS). We have to load the result file to the local folder and then we use the following gnuplot commands stored in a script file named plot.cmd :
  [code]$ spark-shell --packagesgraphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
1[/code]   We can easily plot the figure as a scatter plot (more about that below) assuming that the data set and the plot commands are in the same directory:
  [code]$ spark-shell --packagesgraphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
2[/code]   In aprevious post, we used the scalaplot library to create charts. You can simply add the Maven coordinates during startup:
  [code]$ spark-shell --packagesgraphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
3[/code]  And now, those imports:
  import org.sameersingh.scalaplot.Implicits._
   allow you to plot charts in gnuplot or based on the JFreeChart library within the Spark shell.
  Plotting the Results

  This was a fun ride, starting with some public data, loading the edges, creating the node list, and defining a GraphFrame followed by analyzing the graph and merging partial results for the final plot. Finally, we can visualize the final results.
   This scatter plot (see below) is not yet the ideal way to understand the data, but it’s a good starting point. A 2D histogram ( example ) would encode the number of (PageRank,Triangle) pairs, as well.
   
How-to: Do Scalable Graph Analytics with Apache Spark-1 (individual,friendship,documents,Internet,relevant)

  An alternative second approach to inspecting the data, a log-log plot (see below), would reveal a power-law distribution if a straight line were shown or a stretched power-low in the case of a bended curve. The relation between the PageRank of a node and the number of triangles of which a node is part is not a power law.

How-to: Do Scalable Graph Analytics with Apache Spark-2 (individual,friendship,documents,Internet,relevant)

  From the scatter plot, we can already conclude that a high number of triangles does not guarantee a high PageRank, and furthermore, that a high PageRank can be found for nodes that are embedded in a small number of triangles and also for nodes with more triangle embedding.
  Now, let’s do the same exercise with a different data set.
  Exercise 2: Topology Analysis of Web Crawl Results

   To prepare the data set for this example, I used the Apache Nutch web crawler engine (which was designed by Doug Cutting and Mike Cafarella before they built Hadoop). Although data ingestion from SQL databases and huge document repositories dominate in commercial use cases, a data scientist should also be aware of the advantages of Nutch for scraping web data, in addition to those of Apache Flume and Apache Sqoop. For example, instead of using APIs or writing clients to load data from specific web pages, portals, or social media applications, you can use Nutch to get the data all at once and then apply specific parsing, such as HTML parsing with JSoup or more advanced triple extraction using Apache Any23 .
  Our crawl data is loaded into a staging table (in this case, in Apache HBase). Beside the full HTML content, it contains inlinks and outlinks. Because HBase is managing the crawl data (and the state of the crawl dataset), we can add more links, especially inlinks to a particular page, while we crawl more and more data. To distinguish both types of links in visualization, we use link type 1 for inlinks and link type 2 for outlinks. Finally, to analyze the overall network, we need both lists in one homogeneous collection, where each link is described by source, target, and type.
  Creation of Node- and Link-Lists

  Using Spark SQL and DataFrames allows for inspection and rearrangement of the data in just a few steps. We start with a query that transforms the map of string pairs into individual rows. During the crawl procedure, Nutch aggregates all links in an adjacency list. We have to “explode” the data structure with all links per page to create a link list from the previously optimized representation.
  [code]$ spark-shell --packagesgraphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
4[/code]  As revealed above, an exploded map still lacks any useful column name. Let’s change that by defining the names for the two columns, represented by the string pair.
  [code]$ spark-shell --packagesgraphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
5[/code]   For our network, we need one column named source , which contains the URL and the page name. This is why we first explode the inlinks map into “virtual columns” called mt and mp . We can now concatenate both of them and create a new source column:
  [code]$ spark-shell --packagesgraphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
6[/code]  Now we apply this procedure to the second set of links: our outlinks.
  [code]$ spark-shell --packagesgraphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
7[/code]   To merge both link lists into one we can ignore the temporary columns. We simply select the required columns, use the unionAll operator, and store our page network as an Apache Parquet file in HDFS.
  [code]$ spark-shell --packagesgraphframes:graphframes:0.1.0-spark1.5,\ com.databricks:spark-csv_2.10:1.4.0
8[/code]   With this intermediate result, we can revisit GraphFrames with the variable g1 representing the graph loaded by Nutch. As homework, you can next apply the same analysis steps as in Exercise 1.
  Conclusion

  As you can see, Spark SQL provided access to many different data sources, no matter if we used the Apache Hive table in Parquet format or the HBase table via Hive. And for the many use cases in which the existing table layout cannot be used, Spark SQL made all the filtering, grouping, and projection really easy.
  Using GraphFrames, it is possible to turn data tables into graphs with just a few lines of code. The full power of the Pregel API, implemented in GraphX, is available in combination with Spark SQL. As a result, raw data stored in Hadoop in different flavors can easily be combined into huge multi-layer graphs, with graph analysis all done in place via Spark.
  Mirko Kämpf is a Solutions Architect at Cloudera.
友荐云推荐




上一篇:MySQL 8.0 General Tablespaces: File per Database (and no FRM files)
下一篇:Blood, Sweat, and Writing Automated Integration Tests for Failure Scenarios
酷辣虫提示酷辣虫禁止发表任何与中华人民共和国法律有抵触的内容!所有内容由用户发布,并不代表酷辣虫的观点,酷辣虫无法对用户发布内容真实性提供任何的保证,请自行验证并承担风险与后果。如您有版权、违规等问题,请通过"联系我们"或"违规举报"告知我们处理。

距离式メ想念 发表于 2016-10-4 09:11:21
说实话,光靠签到升级太慢,所以我复制了这一段话遇贴就回,有人就会说你这不是混经验吗,我TMD转身一巴掌,你这不是废话
回复 支持 反对

使用道具 举报

*滑动验证:
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

我要投稿

推荐阅读

扫码访问 @iTTTTT瑞翔 的微博
回页顶回复上一篇下一篇回列表手机版
手机版/CoLaBug.com ( 粤ICP备05003221号 | 文网文[2010]257号 )|网站地图 酷辣虫

© 2001-2016 Comsenz Inc. Design: Dean. DiscuzFans.

返回顶部 返回列表