Analyze using SQL
Create PerfGazer views
PerfGazer exposes SQL queries (called snippets) to create temporary views to access the PerfGazer data produced by the Spark application.
You can run those snippets to perform analytics on the SQL queries, jobs, etc.
Within the Spark application, you can access such snippets by doing:
import com.amadeus.perfgazer.PerfGazer
val perfGazer = PerfGazer.instance.getOrElse(throw new RuntimeException("Oops"))
val snippets: Set[String] = perfGazer.getSnippets
// snippets.foreach(println) // print them
// snippets.foreach(spark.sql) // launch them
Additionally, at Spark application shutdown, PerfGazer will display those snippets in the logs (info log level). You can copy and paste them in a notebook to start performing investigations.
-- Copy and paste the snippets shown in the logs by Perfgazer during shutdown (info level)
CREATE OR REPLACE TEMPORARY VIEW sql ...
CREATE OR REPLACE TEMPORARY VIEW job ...
CREATE OR REPLACE TEMPORARY VIEW stage ...
CREATE OR REPLACE TEMPORARY VIEW task ...
Query across all runs
The snippets above point to the current run. To create a view spanning all runs available, you can use ** with a basePath.
For example:
CREATE OR REPLACE TEMPORARY VIEW [sql|job|stage|...]
USING json
OPTIONS (
path "<base_path>/**/[sql|job|stage|...]-reports-*.json",
basePath "<base_path>/"
);
Replace <base_path> with your actual base destination.
The basePath option indicates Spark from which point start performing auto-discover of partition columns (e.g. applicationId).
Mind that if you use basePath and new partitions are discovered, the joins between the views will have to take into account partition columns if meaningful to associate correctly jobs/stages/... from different runs.
Analyze PerfGazer data
The SQL queries below are available as constants in com.amadeus.perfgazer.AnalysisQueries. That class is the definitive source of truth for these queries and is tested in the integration test suite.
You can start deep diving into all tasks with their parent stage and job with a query like the following:
SELECT *
FROM job j
JOIN stage s ON ARRAY_CONTAINS(j.stages, s.stageId)
JOIN task t ON t.stageId = s.stageId;
Below we provide a collection of queries you can run to explore various performance aspects of your Spark application.
Jobs by CPU usage
Aggregates executor CPU time across all stages of each job, converted from nanoseconds to seconds.
Code reference: AnalysisQueries.JobsByCpuUsage
SELECT j.jobId,
j.jobName,
ROUND(SUM(s.execCpuNs) / 1e9, 2) AS cpuTimeSec
FROM job j
JOIN stage s ON ARRAY_CONTAINS(j.stages, s.stageId)
GROUP BY j.jobId, j.jobName
ORDER BY cpuTimeSec DESC;
Sample output
| jobId | jobName | cpuTimeSec |
|---|---|---|
| 2 | save at MyApp.scala:42 | 1782.43 |
| 1 | count at MyApp.scala:28 | 624.18 |
| 0 | read at MyApp.scala:15 | 84.92 |
Jobs by I/O volumes
Shows input, output, shuffle read/write and total I/O per job, all in MB.
Code reference: AnalysisQueries.JobsByIoVolumes
SELECT j.jobId,
j.jobName,
ROUND(SUM(s.readBytes) / 1048576, 2) AS inputMb,
ROUND(SUM(s.writeBytes) / 1048576, 2) AS outputMb,
ROUND(SUM(s.shuffleReadBytes) / 1048576, 2) AS shuffleReadMb,
ROUND(SUM(s.shuffleWriteBytes) / 1048576, 2) AS shuffleWriteMb,
ROUND(SUM(s.readBytes + s.writeBytes
+ s.shuffleReadBytes + s.shuffleWriteBytes) / 1048576, 2) AS totalIoMb
FROM job j
JOIN stage s ON ARRAY_CONTAINS(j.stages, s.stageId)
GROUP BY j.jobId, j.jobName
ORDER BY totalIoMb DESC;
Sample output
| jobId | jobName | inputMb | outputMb | shuffleReadMb | shuffleWriteMb | totalIoMb |
|---|---|---|---|---|---|---|
| 2 | save at MyApp.scala:42 | 1024.00 | 512.34 | 256.78 | 248.91 | 2042.03 |
| 1 | count at MyApp.scala:28 | 512.00 | 0.00 | 128.45 | 130.12 | 770.57 |
| 0 | read at MyApp.scala:15 | 256.00 | 0.00 | 0.00 | 0.00 | 256.00 |
Jobs with spill
Lists only jobs where memory or disk spill occurred, in MB.
Code reference: AnalysisQueries.JobsWithSpill
SELECT j.jobId,
j.jobName,
ROUND(SUM(s.memoryBytesSpilled) / 1048576, 2) AS memorySpillMb,
ROUND(SUM(s.diskBytesSpilled) / 1048576, 2) AS diskSpillMb
FROM job j
JOIN stage s ON ARRAY_CONTAINS(j.stages, s.stageId)
GROUP BY j.jobId, j.jobName
HAVING SUM(s.memoryBytesSpilled) > 0
OR SUM(s.diskBytesSpilled) > 0
ORDER BY diskSpillMb DESC;
Sample output
| jobId | jobName | memorySpillMb | diskSpillMb |
|---|---|---|---|
| 2 | save at MyApp.scala:42 | 2048.00 | 384.56 |
| 1 | count at MyApp.scala:28 | 512.00 | 64.12 |
All joins with CPU and I/O from their job
Explodes the SQL plan nodes to find join operators, then enriches them with the aggregated CPU time and I/O volumes of the parent job.
WITH job_stats AS (
SELECT j.jobId,
j.jobName,
j.sqlId,
ROUND(SUM(s.execCpuNs) / 1e9, 2) AS cpuTimeSec,
ROUND(SUM(s.readBytes + s.writeBytes
+ s.shuffleReadBytes + s.shuffleWriteBytes) / 1048576, 2) AS totalIoMb
FROM job j
JOIN stage s ON ARRAY_CONTAINS(j.stages, s.stageId)
GROUP BY j.jobId, j.jobName, j.sqlId
)
SELECT sq.sqlId,
sq.description,
n.nodeName AS joinNode,
js.jobId,
js.cpuTimeSec,
js.totalIoMb
FROM sql sq
JOIN job_stats js ON js.sqlId = CAST(sq.sqlId AS STRING)
LATERAL VIEW EXPLODE(sq.nodes) AS n
WHERE n.nodeName LIKE '%Join%'
ORDER BY js.cpuTimeSec DESC;
Sample output
| sqlId | description | joinNode | jobId | cpuTimeSec | totalIoMb |
|---|---|---|---|---|---|
| 2 | Join orders with customers | SortMergeJoin | 2 | 124.57 | 2042.03 |
| 1 | Enrich transactions | BroadcastHashJoin | 1 | 58.23 | 770.57 |
| 0 | Aggregate daily totals | ShuffledHashJoin | 0 | 32.11 | 256.00 |
Join node metrics
Explodes SQL plan nodes and returns metrics for join operators. Useful for inspecting the number of output rows produced by each join.
Code reference: AnalysisQueries.JoinNodeMetrics
SELECT sqlId,
node.nodeName,
node.jobName,
FROM_JSON(TO_JSON(node.metrics), 'MAP<STRING, STRING>') AS metrics
FROM (SELECT sqlId, EXPLODE(nodes) AS node FROM sql) subquery
WHERE node.nodeName LIKE '%Join%';
Sample output
| sqlId | nodeName | jobName | metrics |
|---|---|---|---|
| 1 | BroadcastHashJoin | jobjoin | {number of output rows -> 2} |
| 2 | SortMergeJoin | bigjoin | {number of output rows -> 10000} |
Scan node metrics
Explodes SQL plan nodes and returns metrics for scan parquet operators. Useful for checking how many files and rows were read by each scan.
Code reference: AnalysisQueries.ScanNodeMetrics
SELECT sqlId,
node.nodeName,
node.jobName,
FROM_JSON(TO_JSON(node.metrics), 'MAP<STRING, STRING>') AS metrics
FROM (SELECT sqlId, EXPLODE(nodes) AS node FROM sql) subquery
WHERE node.nodeName LIKE '%Scan parquet%';
Sample output
| sqlId | nodeName | jobName | metrics |
|---|---|---|---|
| 1 | Scan parquet delta./path |
jobjoin | {number of files read -> 1, number of output rows -> 252} |
| 1 | Scan parquet delta./path2 |
jobjoin | {number of files read -> 4, number of output rows -> 9000} |
Wall clock duration of jobs
Computes the elapsed wall-clock time of each job in seconds.
Code reference: AnalysisQueries.WallClockDurationOfJobs
SELECT j.jobId,
j.jobName,
ROUND((j.jobEndTime - j.jobStartTime) / 1000, 2) AS wallClockSec
FROM job j
ORDER BY wallClockSec DESC;
Sample output
| jobId | jobName | wallClockSec |
|---|---|---|
| 2 | save at MyApp.scala:42 | 245.67 |
| 1 | count at MyApp.scala:28 | 98.34 |
| 0 | read at MyApp.scala:15 | 15.21 |
Skew detection
Detects task-level skew per job/stage using statistical thresholds. Reports stages where the maximum task duration exceeds 1.5x the 75th percentile, indicating that a few tasks are significantly slower than the rest. The skewFactor column quantifies how much the slowest task deviates from the pack.
Code reference: AnalysisQueries.SkewDetection
SELECT j.jobId,
j.jobName,
t.stageId,
COUNT(1) AS taskCount,
ROUND(MAX(t.executorRunTime) / 1000, 2) AS maxDurationSec,
ROUND(PERCENTILE(t.executorRunTime, 0.5) / 1000, 2) AS medianDurationSec,
ROUND(PERCENTILE(t.executorRunTime, 0.75) / 1000, 2) AS p75DurationSec,
ROUND(STDDEV(t.executorRunTime) / 1000, 2) AS stddevDurationSec,
ROUND(MAX(t.executorRunTime) / PERCENTILE(t.executorRunTime, 0.75), 2) AS skewFactor
FROM job j
JOIN stage s ON ARRAY_CONTAINS(j.stages, s.stageId)
JOIN task t ON t.stageId = s.stageId
GROUP BY j.jobId, j.jobName, t.stageId
HAVING MAX(t.executorRunTime) > 1.5 * PERCENTILE(t.executorRunTime, 0.75)
ORDER BY skewFactor DESC;
Sample output
| jobId | jobName | stageId | taskCount | maxDurationSec | medianDurationSec | p75DurationSec | stddevDurationSec | skewFactor |
|---|---|---|---|---|---|---|---|---|
| 2 | save at MyApp.scala:42 | 3 | 200 | 45.20 | 2.10 | 3.50 | 8.42 | 12.91 |
| 1 | count at MyApp.scala:28 | 1 | 100 | 12.80 | 1.50 | 2.00 | 3.21 | 6.40 |