This is a guest post from CERN, the European Organization for Nuclear Research. In this blog, Luca Canali of CERN investigates performance improvements in Apache Spark 2.0 from whole-stage code generation using flame graphs. The blog was originally posted on the CERN website. Luca will be speaking at Spark Summit Europe Oct 25 – 27 on this topic.
The idea for this post comes from a performance troubleshooting case that has come up recently at CERN database services. It started with a user reporting slow response time from a query for a custom report in a relational database. After investigations and initial troubleshooting, the query was still running slow (running in about 12 hours). It was understood that the query was mostly running “ on CPU ” and spending most of its time in evaluating a non-equijoin condition repeated 100s of millions of times. Most importantly it was also found that the query was easily parallelizable , this was good news as it meant that we could simply “throw hardware at it” to make it run faster. One way that the team (see the acknowledgments section at the end of the post) used to parallelize the workload (without affecting the production database), is to export the data to a Hadoop cluster and run the query there using Spark SQL (the cluster used has 14 nodes, installed with CDH 5.7, Spark version 1.6). This way it was possible to bring the execution time down to less than 20 minutes. All this with relatively low effort, as the query could be run basically unchanged .
Apache Spark 2.0 enters the scene
As I write this post, Apache Spark 1.6 is installed in our production clusters and Apache Spark 2.0 is still relatively new (it has been released at the end of July 2016). Notably Spark 2.0 has very interesting improvements over the previous versions, among others improvements in the area of performance that I was eager to test ( see this blog post by Databricks )
My first test was to try the query discussed in the previous paragraph on a test server with Spark 2.0 and I found that it was running considerably faster than in the tests with Spark 1.6. The best result I achieved, this time a large box with 60 CPU cores and using Spark 2.0, was an elapsed time of about 2 minutes (to be compared with 20 minutes in Spark 1.6). I have previously noticed Spark 1.6 and Impala 2.5 performed comparably on our workloads, but I’m impressed by Spark 2.0’s speedups over Spark 1.6, so I decided to investigate further.
The test case
Rather than using the original query and data, I will report here on a synthetic test case that hopefully illustrates the main points of the original case and at the same is simple and easy to reproduce on your test systems, if you wish to do so. This test uses pyspark , the Python interface to Spark (if you are not familiar with how to run Spark, see further on in this post some hints on how to build a test system).
The preparation of the test data proceeds as follows: (1) it creates a DataFrame and registers it as table “t0” with 10 million rows. (2) Table t0 is used to create the actual test data, which is composed of an “id” column and three additional columns of randomly generated data, all integers. The resulting DataFrame is cached in memory and “registered” as a temporary table called “t1”. Spark SQL interface for DataFrames makes this preparation task straightforward:
[code]$ pyspark --driver-memory 2g
sqlContext.sql("select id, floor(200*rand()) bucket, floor(1000*rand()) val1, floor(10*rand()) val2 from t0").cache().registerTempTable("t1")[/code] The following commands are additional checks to make sure the table t1 has been created correctly and is first read into memory. In particular, note that “t1” has the required test_numrows (10M) rows and the description of its column from the output of the command “desc”:
[code]sqlContext.sql("select count(*) from t1").show()
| id| bigint| |
| bucket| bigint| |
| val1| bigint| |
| val2| bigint| |
+--------+---------+-------+[/code] The actual test query is here below. It consists of a join with two conditions: an equality predicate on the column bucket, which becomes an obvious point of where the query can be executed in parallel , and a more resource-intensive non-equality condition. Notably the query has also an aggregation operation. Some additional boilerplate code is added for timing the duration of the query:
sqlContext.sql("select a.bucket, sum(a.val2) tot from t1 a, t1 b where a.bucket=b.bucket and a.val1+b.val1[/code] The results are that Spark 2.0 is about 7 times faster than Spark 1.6 when running the test query on the test server (see details below): Spark 2.0 completed the job in about 15 minutes of elapsed time and 390 minutes of CPU time, while Spark 1.6 took about 100 minutes of elapsed time and 2840 minutes of CPU time. There are fluctuations on the actual job execution time between runs, however you can ignore the fine details and focus on the main finding that the performance difference is striking between runs using Spark 1.6 and Spark 2.0 (Spark 2.0 being much faster). This is worth further investigations into the internals and root causes .
I have run the tests using Spark in its simplest configuration (local mode) using a standalone (non-clustered) server with 16 cores (2 x E5-2650) and 128 GB of RAM (the virtual memory allocated by the test workload is about 16 GB) running Linux (kernel 2.6.32, RHEL 6.7). If you want to run it on a smaller machine you can scale down the preparation phase by setting test_numrows to a smaller value (for example to 1e6). In that case you probably could do also with using the default value of 1g for the driver-memory.
The tests have been performed on a single server alternating runs with Spark 1.6 and 2.0. In both cases monitoring with OS tools showed that the jobs were CPU-bound , that is with 32 threads (16 cores x 2 for multithreading) running on CPU and utilizing the available resources on the test box. During the tests, no additional significant workload was running on the box.
Drilling down into the execution plans
The physical execution plan generated and executed by Spark (in particular by Catalyst , the optimizer and Tungsten , the execution engine) has important differences in Spark 2.0 compared to Spark 1.6. The logical plan for executing the query however deploys a sort merge join in both cases. Please note in the execution plans reported below that in the case of Spark 2.0 several steps in the execution plan are marked with a star (*) around them. This marks steps optimized with whole-stage code generation.
Physical execution plan in Spark 1.6
Note that a sort merge join operation is central to the execution plan of this query. Another important step after the join is the aggregation operation, used to compute “sum(a.val2)” as seen in the query text:
Physical execution plan in Spark 2.0
Note in particular the steps marked with (*) , they are optimized with whole-stage code generation :