Glue Spark¶
- RDD: Datasets which storage data in distributed format
- Transformations: Datasets iterations
- Actions: Run the transformations on RDD
- Output data: Result of transformations
Spark runs lazy, so transformations are not immediately executed, they stay in a logical plan to be executed when the actions are called.
Actions examples:
take()collect()show()save()count()
Types of transformations¶
Narrow¶
Act only on one partition of the output data (Map, Filter)
- Does not requires to share data within another workers
- Are independent of other partitions
- Are computationally efficient
Wide¶
Can work on several partitions of output data (Join, GroupBy)
Running locally¶
- Docker Compose file:
- Run image:
- Connect to jupyter server within vscode:
- Select kernel
- Existing jupyter server
- https://127.0.1:8889/lab
Import packages
+------+----------+------+
| name|department|salary|
+------+----------+------+
| John| Sales| 3000|
| Bryan| Sales| 4200|
|Selena| Sales| 4600|
| Alice| Sales| 3500|
| Mark| Sales| 3900|
|Sophia| Sales| 4100|
|Daniel| Sales| 3800|
| Emma| Sales| 4400|
| Lucas| Sales| 3600|
|Olivia| Sales| 4700|
+------+----------+------+
Filtering dataset (narrow)
Group By (Wide)
Pushdown Predicate¶
Reduce the total data read applying conditional filters to the data columns.
- Depends on file formats (Parquet and ORC)
- Query submitted to spark
- Planning phase using Catalyst optimizer
- Filters identification (predicate)
- Pushdown: try to bring the filter to reading
Using pure spark WHERE clause is enough, when using AWS Glue, pushdown_predicate argument is needed.
Catalyst¶
Is a mechanism to optimize spark jobs utilizing functional programming principles to identify logical plan improvements (performance and execution time)
Catalyst has 19 rules (Spark 3+) to improve query execution.
Example:
- Pushdown predicate
- Rearrange filters
- Decimal operations conversion to long integer
- Regex rewriting in Java native code (startswith and contains)
- If-else simplification
+-----------+------+----------+-----------+------------+
|employee_id| name|department|base_salary|annual_sales|
+-----------+------+----------+-----------+------------+
| 1| John| Sales| 3000| 120000|
| 2| Bryan| Sales| 4200| 180000|
| 3|Selena| Sales| 4600| 220000|
| 4| Alice| Sales| 3500| 95000|
| 5| Mark| Sales| 3900| 160000|
+-----------+------+----------+-----------+------------+
== Physical Plan ==
AdaptiveSparkPlan (11)
+- Project (10)
+- SortMergeJoin Inner (9)
:- Sort (4)
: +- Exchange (3)
: +- Filter (2)
: +- Scan ExistingRDD (1)
+- Sort (8)
+- Exchange (7)
+- Filter (6)
+- Scan ExistingRDD (5)
(1) Scan ExistingRDD
Output [4]: [employee_id#50L, name#51, department#52, base_salary#53L]
Arguments: [employee_id#50L, name#51, department#52, base_salary#53L], MapPartitionsRDD[18] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Filter
Input [4]: [employee_id#50L, name#51, department#52, base_salary#53L]
Condition : isnotnull(employee_id#50L)
(3) Exchange
Input [4]: [employee_id#50L, name#51, department#52, base_salary#53L]
Arguments: hashpartitioning(employee_id#50L, 1000), ENSURE_REQUIREMENTS, [id=#259]
(4) Sort
Input [4]: [employee_id#50L, name#51, department#52, base_salary#53L]
Arguments: [employee_id#50L ASC NULLS FIRST], false, 0
(5) Scan ExistingRDD
Output [2]: [employee_id#58L, annual_sales#59L]
Arguments: [employee_id#58L, annual_sales#59L], MapPartitionsRDD[23] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(6) Filter
Input [2]: [employee_id#58L, annual_sales#59L]
Condition : isnotnull(employee_id#58L)
(7) Exchange
Input [2]: [employee_id#58L, annual_sales#59L]
Arguments: hashpartitioning(employee_id#58L, 1000), ENSURE_REQUIREMENTS, [id=#260]
(8) Sort
Input [2]: [employee_id#58L, annual_sales#59L]
Arguments: [employee_id#58L ASC NULLS FIRST], false, 0
(9) SortMergeJoin
Left keys [1]: [employee_id#50L]
Right keys [1]: [employee_id#58L]
Join condition: None
(10) Project
Output [5]: [employee_id#50L, name#51, department#52, base_salary#53L, annual_sales#59L]
Input [6]: [employee_id#50L, name#51, department#52, base_salary#53L, employee_id#58L, annual_sales#59L]
(11) AdaptiveSparkPlan
Output [5]: [employee_id#50L, name#51, department#52, base_salary#53L, annual_sales#59L]
Arguments: isFinalPlan=false
Joins¶
flowchart LR
A[Start] --> B{Has user hint?};
B --> |Yes| C[Apply desired user join]
B --> |No| D{Dataset size allows to apply broadcast?}
D --> |Yes| E[Broadcast Join]
D --> |No| F{It fits on worker memory?}
F --> |Yes| G[Shuffle Hash Join]
F --> |No| H[Sort Merge Join - Default]
Broadcast Join¶
When broadcast join type is used, it avoids shuffle that is an expensive network exchange between nodes. This improve time, performance and cost.
When a dataset small, the duplication of the dataset in in all nodes is more effective to do joins. Spark defaults to a limit of 10MB to do it.
+----------+-----------+-------------+------------+
|country_id|customer_id|customer_name|country_name|
+----------+-----------+-------------+------------+
| 1| 101| Alice| Brazil|
| 2| 102| Bob| USA|
| 1| 103| Carol| Brazil|
| 3| 104| Dave| Germany|
+----------+-----------+-------------+------------+
Increasing the limit for broadcast join
BroadcastJoin, ShuffleHashJoin is more performant because it does not sort values like SortMergeJoin that is the Spark default
SortMergeJoin will be the most flexible. The join occurs through sorted keys in each partition
Repartition¶
Partition manipulation method, can be triggered manually or by AdaptativeQueryExecution
- Do not do shuffle
- Create partitions with different sizes
Coalesce¶
Partition manipulation method, can be triggered manually or by AdaptativeQueryExecution
- Increase or decrease the number of partitions
- Create partitions with equal sizes using shuffle
Shuffle¶
Spills¶
When data does not fit on memory or storage. Some times can lead to Out Of Memory Error
Persist¶
Stores the already done computations in partitions avoiding redoing shuffles or other operations. It should be applied when the same dataset will be used multiple times
Configs¶
spark.master = local
spark.eventLog.enabled = true
spark.network.crypto.keyLength = 256
spark.network.crypto.enabled = true
spark.repl.local.jars = file:///home/glue_user/livy/rsc-jars/arpack_combined_all-0.1.jar,file:///home/glue_user/livy/rsc-jars/core-1.1.2.jar,file:///home/glue_user/livy/rsc-jars/jniloader-1.1.jar,file:///home/glue_user/livy/rsc-jars/livy-api-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/livy-rsc-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/livy-thriftserver-session-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/native_ref-java-1.1.jar,file:///home/glue_user/livy/rsc-jars/native_system-java-1.1.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-armhf-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-osx-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-win-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-win-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-armhf-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-osx-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-win-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-win-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netty-all-4.1.17.Final.jar,file:///home/glue_user/livy/repl_2.12-jars/commons-codec-1.9.jar,file:///home/glue_user/livy/repl_2.12-jars/livy-core_2.12-0.7.1-incubating.jar,file:///home/glue_user/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating-LIVY-795-patched.jar
spark.app.name = livy-session-1
spark.jars = file:///home/glue_user/livy/rsc-jars/arpack_combined_all-0.1.jar,file:///home/glue_user/livy/rsc-jars/core-1.1.2.jar,file:///home/glue_user/livy/rsc-jars/jniloader-1.1.jar,file:///home/glue_user/livy/rsc-jars/livy-api-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/livy-rsc-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/livy-thriftserver-session-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/native_ref-java-1.1.jar,file:///home/glue_user/livy/rsc-jars/native_system-java-1.1.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-armhf-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-osx-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-win-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-win-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-armhf-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-osx-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-win-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-win-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netty-all-4.1.17.Final.jar,file:///home/glue_user/livy/repl_2.12-jars/commons-codec-1.9.jar,file:///home/glue_user/livy/repl_2.12-jars/livy-core_2.12-0.7.1-incubating.jar,file:///home/glue_user/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating-LIVY-795-patched.jar
spark.network.crypto.keyFactoryAlgorithm = PBKDF2WithHmacSHA256
spark.hadoop.dynamodb.customAWSCredentialsProvider = com.amazonaws.auth.DefaultAWSCredentialsProviderChain
spark.yarn.submit.waitAppCompletion = false
spark.driver.host = 1db0a48ffb16
spark.driver.port = 38725
spark.app.startTime = 1766755739882
spark.app.initial.jar.urls = spark://1db0a48ffb16:38725/jars/livy-repl_2.12-0.7.1-incubating-LIVY-795-patched.jar,spark://1db0a48ffb16:38725/jars/commons-codec-1.9.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-win-i686-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-win-i686-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/core-1.1.2.jar,spark://1db0a48ffb16:38725/jars/jniloader-1.1.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-linux-x86_64-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/native_system-java-1.1.jar,spark://1db0a48ffb16:38725/jars/livy-thriftserver-session-0.7.1-incubating.jar,spark://1db0a48ffb16:38725/jars/netty-all-4.1.17.Final.jar,spark://1db0a48ffb16:38725/jars/livy-api-0.7.1-incubating.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-linux-i686-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-win-x86_64-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-linux-x86_64-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-linux-armhf-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-linux-armhf-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-osx-x86_64-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-osx-x86_64-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/livy-core_2.12-0.7.1-incubating.jar,spark://1db0a48ffb16:38725/jars/arpack_combined_all-0.1.jar,spark://1db0a48ffb16:38725/jars/livy-rsc-0.7.1-incubating.jar,spark://1db0a48ffb16:38725/jars/native_ref-java-1.1.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-linux-i686-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-win-x86_64-1.1-natives.jar
spark.io.encryption.enabled = false
spark.yarn.maxAppAttempts = 1
spark.yarn.dist.archives =
spark.sql.dataPrefetch.enabled = false
spark.files = file:///home/glue_user/spark/conf/hive-site.xml
spark.yarn.heterogeneousExecutors.enabled = false
spark.submit.deployMode = client
spark.authenticate.secret = fca10096-9993-458c-b331-d05073d788b5
spark.repl.class.uri = spark://1db0a48ffb16:38725/classes
spark.app.id = local-1766755741527
spark.livy.spark_major_version = 3
spark.app.submitTime = 1766755730934
spark.executor.extraClassPath = /home/glue_user/spark/jars/*:/home/glue_user/aws-glue-libs/jars/*
spark.executor.id = driver
spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs = false
spark.unsafe.sorter.spill.read.ahead.enabled = false
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version = 2
spark.executor.cores = 2
spark.sql.warehouse.dir = file:/home/glue_user/workspace/spark-warehouse
spark.repl.class.outputDir = /tmp/spark668078796105942909
spark.driver.extraJavaOptions = -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djavax.net.ssl.trustStore=/home/glue_user/.certs/aws_tls_certs/InternalAndExternalAndAWSTrustStore.jks -Djavax.net.ssl.trustStoreType=JKS -Djavax.net.ssl.trustStorePassword=amazon
spark.history.fs.logDirectory = file:////tmp/spark-events
spark.sql.catalogImplementation = hive
spark.executor.extraJavaOptions = -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djavax.net.ssl.trustStore=/home/glue_user/.certs/aws_tls_certs/InternalAndExternalAndAWSTrustStore.jks -Djavax.net.ssl.trustStoreType=JKS -Djavax.net.ssl.trustStorePassword=amazon
spark.app.initial.file.urls = file:///home/glue_user/spark/conf/hive-site.xml
spark.network.crypto.saslFallback = false
spark.submit.pyFiles =
spark.yarn.isPython = true
spark.driver.extraClassPath = /home/glue_user/spark/jars/*:/home/glue_user/aws-glue-libs/jars/*
spark.authenticate = true
spark.driver.memory = 1000M