Skip to content

Glue Spark

  • RDD: Datasets which storage data in distributed format
  • Transformations: Datasets iterations
  • Actions: Run the transformations on RDD
  • Output data: Result of transformations

Spark runs lazy, so transformations are not immediately executed, they stay in a logical plan to be executed when the actions are called.

Actions examples:

  • take()
  • collect()
  • show()
  • save()
  • count()

Types of transformations

Narrow

Act only on one partition of the output data (Map, Filter)

  • Does not requires to share data within another workers
  • Are independent of other partitions
  • Are computationally efficient

Wide

Can work on several partitions of output data (Join, GroupBy)

Running locally

  1. Docker Compose file:
services:
  glue:
    image: amazon/aws-glue-libs:glue_libs_4.0.0_image_01
    container_name: glue-jupyter
    ports:
      - "8889:8888"
      - "4040:4040"
    volumes:
      - .:/home/glue_user/workspace/jupyter_workspace
      - ~/.aws:/home/glue_user/.aws:ro
    working_dir: /home/glue_user/workspace
    environment:
      AWS_PROFILE: default
      DISABLE_SSL: true
    command: >
      /home/glue_user/jupyter/jupyter_start.sh
  1. Run image:
docker compose run --rm glue
  1. Connect to jupyter server within vscode:
  • Select kernel
  • Existing jupyter server
  • https://127.0.1:8889/lab

Import packages

1
2
3
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
Create session

spark = SparkSession.builder.appName("example").getOrCreate()
Create a dataframe

data = [
    ("John", "Sales", 3000),
    ("Bryan", "Sales", 4200),
    ("Selena", "Sales", 4600),
    ("Alice", "Sales", 3500),
    ("Mark", "Sales", 3900),
    ("Sophia", "Sales", 4100),
    ("Daniel", "Sales", 3800),
    ("Emma", "Sales", 4400),
    ("Lucas", "Sales", 3600),
    ("Olivia", "Sales", 4700),
]
df = spark.createDataFrame(data, ["name", "department", "salary"])
Visualize data

df.show()

+------+----------+------+

|  name|department|salary|

+------+----------+------+

|  John|     Sales|  3000|

| Bryan|     Sales|  4200|

|Selena|     Sales|  4600|

| Alice|     Sales|  3500|

|  Mark|     Sales|  3900|

|Sophia|     Sales|  4100|

|Daniel|     Sales|  3800|

|  Emma|     Sales|  4400|

| Lucas|     Sales|  3600|

|Olivia|     Sales|  4700|

+------+----------+------+

Filtering dataset (narrow)

df_filtered = df.filter(df["salary"] > 4000)
df_filtered.show()

+------+----------+------+

|  name|department|salary|

+------+----------+------+

| Bryan|     Sales|  4200|

|Selena|     Sales|  4600|

|Sophia|     Sales|  4100|

|  Emma|     Sales|  4400|

|Olivia|     Sales|  4700|

+------+----------+------+

Group By (Wide)

df_grouped = df.groupby("department").count()
df_grouped.show()

+----------+-----+

|department|count|

+----------+-----+

|     Sales|   10|

+----------+-----+

Pushdown Predicate

Reduce the total data read applying conditional filters to the data columns.

  • Depends on file formats (Parquet and ORC)
  1. Query submitted to spark
  2. Planning phase using Catalyst optimizer
  3. Filters identification (predicate)
  4. Pushdown: try to bring the filter to reading

Using pure spark WHERE clause is enough, when using AWS Glue, pushdown_predicate argument is needed.

Catalyst

Is a mechanism to optimize spark jobs utilizing functional programming principles to identify logical plan improvements (performance and execution time)

Catalyst has 19 rules (Spark 3+) to improve query execution.

Example:

  • Pushdown predicate
  • Rearrange filters
  • Decimal operations conversion to long integer
  • Regex rewriting in Java native code (startswith and contains)
  • If-else simplification

spark = SparkSession.builder.appName("catalyst-join").getOrCreate()

employees_data = [
    (1, "John", "Sales", 3000),
    (2, "Bryan", "Sales", 4200),
    (3, "Selena", "Sales", 4600),
    (4, "Alice", "Sales", 3500),
    (5, "Mark", "Sales", 3900),
]

df_employees = spark.createDataFrame(
    employees_data, ["employee_id", "name", "department", "base_salary"]
)

sales_data = [
    (1, 120_000),
    (2, 180_000),
    (3, 220_000),
    (4, 95_000),
    (5, 160_000),
]

df_sales = spark.createDataFrame(sales_data, ["employee_id", "annual_sales"])

spark.sparkContext.setJobGroup(
    "JOIN", "Join employees with annual sales performance"
)

df_join_result = df_employees.join(df_sales, on="employee_id", how="inner")

df_join_result.show()

df_join_result.explain(mode="formatted")

+-----------+------+----------+-----------+------------+

|employee_id|  name|department|base_salary|annual_sales|

+-----------+------+----------+-----------+------------+

|          1|  John|     Sales|       3000|      120000|

|          2| Bryan|     Sales|       4200|      180000|

|          3|Selena|     Sales|       4600|      220000|

|          4| Alice|     Sales|       3500|       95000|

|          5|  Mark|     Sales|       3900|      160000|

+-----------+------+----------+-----------+------------+



== Physical Plan ==

AdaptiveSparkPlan (11)

+- Project (10)

   +- SortMergeJoin Inner (9)

      :- Sort (4)

      :  +- Exchange (3)

      :     +- Filter (2)

      :        +- Scan ExistingRDD (1)

      +- Sort (8)

         +- Exchange (7)

            +- Filter (6)

               +- Scan ExistingRDD (5)





(1) Scan ExistingRDD

Output [4]: [employee_id#50L, name#51, department#52, base_salary#53L]

Arguments: [employee_id#50L, name#51, department#52, base_salary#53L], MapPartitionsRDD[18] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)



(2) Filter

Input [4]: [employee_id#50L, name#51, department#52, base_salary#53L]

Condition : isnotnull(employee_id#50L)



(3) Exchange

Input [4]: [employee_id#50L, name#51, department#52, base_salary#53L]

Arguments: hashpartitioning(employee_id#50L, 1000), ENSURE_REQUIREMENTS, [id=#259]



(4) Sort

Input [4]: [employee_id#50L, name#51, department#52, base_salary#53L]

Arguments: [employee_id#50L ASC NULLS FIRST], false, 0



(5) Scan ExistingRDD

Output [2]: [employee_id#58L, annual_sales#59L]

Arguments: [employee_id#58L, annual_sales#59L], MapPartitionsRDD[23] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)



(6) Filter

Input [2]: [employee_id#58L, annual_sales#59L]

Condition : isnotnull(employee_id#58L)



(7) Exchange

Input [2]: [employee_id#58L, annual_sales#59L]

Arguments: hashpartitioning(employee_id#58L, 1000), ENSURE_REQUIREMENTS, [id=#260]



(8) Sort

Input [2]: [employee_id#58L, annual_sales#59L]

Arguments: [employee_id#58L ASC NULLS FIRST], false, 0



(9) SortMergeJoin

Left keys [1]: [employee_id#50L]

Right keys [1]: [employee_id#58L]

Join condition: None



(10) Project

Output [5]: [employee_id#50L, name#51, department#52, base_salary#53L, annual_sales#59L]

Input [6]: [employee_id#50L, name#51, department#52, base_salary#53L, employee_id#58L, annual_sales#59L]



(11) AdaptiveSparkPlan

Output [5]: [employee_id#50L, name#51, department#52, base_salary#53L, annual_sales#59L]

Arguments: isFinalPlan=false

Joins

flowchart LR
A[Start] --> B{Has user hint?};
B --> |Yes| C[Apply desired user join]
B --> |No| D{Dataset size allows to apply broadcast?}
D --> |Yes| E[Broadcast Join]
D --> |No| F{It fits on worker memory?}
F --> |Yes| G[Shuffle Hash Join]
F --> |No| H[Sort Merge Join - Default]

Broadcast Join

When broadcast join type is used, it avoids shuffle that is an expensive network exchange between nodes. This improve time, performance and cost.

When a dataset small, the duplication of the dataset in in all nodes is more effective to do joins. Spark defaults to a limit of 10MB to do it.

spark = SparkSession.builder.appName("broadcast-join-example").getOrCreate()

countries_data = [
    (1, "Brazil"),
    (2, "USA"),
    (3, "Germany"),
]

df_countries = spark.createDataFrame(
    countries_data, ["country_id", "country_name"]
)

customers_data = [
    (101, "Alice", 1),
    (102, "Bob", 2),
    (103, "Carol", 1),
    (104, "Dave", 3),
]

df_customers = spark.createDataFrame(
    customers_data, ["customer_id", "customer_name", "country_id"]
)

spark.sparkContext.setJobGroup(
    "BROADCAST_JOIN", "Join customers with country lookup using broadcast"
)

df_result = df_customers.join(
    broadcast(df_countries), on="country_id", how="inner"
)

df_result.show()

+----------+-----------+-------------+------------+

|country_id|customer_id|customer_name|country_name|

+----------+-----------+-------------+------------+

|         1|        101|        Alice|      Brazil|

|         2|        102|          Bob|         USA|

|         1|        103|        Carol|      Brazil|

|         3|        104|         Dave|     Germany|

+----------+-----------+-------------+------------+

Increasing the limit for broadcast join

spark = (
    SparkSession.builder.appName("boardcast-join-threshold")
    .config(
        "spark.sql.autoBroadcastJoinThreshold",
        str(3 * 1024 * 1024 * 1024),  # 3GB
    )
    .getOrCreate()
)

glue_context = GlueContext(spark)
After BroadcastJoin, ShuffleHashJoin is more performant because it does not sort values like SortMergeJoin that is the Spark default

1
2
3
4
5
6
7
8
spark = (
    SparkSession.builder.appName("hash-shuffle-join")
    .config("spark.sql.join.preferSortMergeJoin", "false")
    .config("spark.sql.autoBroadcastJoinThreshold", -1)
    .getOrCreate()
)

glue_context = GlueContext(spark)
When we have a large volume of data and neither of previous joins work SortMergeJoin will be the most flexible. The join occurs through sorted keys in each partition

1
2
3
4
5
6
7
8
spark = (
    SparkSession.builder.appName("sort-merge-join")
    .config("spark.sql.join.preferSortMergeJoin", "true")
    .config("spark.sql.autoBroadcastJoinThreshold", -1)
    .getOrCreate()
)

glue_context = GlueContext(spark)

Repartition

Partition manipulation method, can be triggered manually or by AdaptativeQueryExecution

  • Do not do shuffle
  • Create partitions with different sizes

Coalesce

Partition manipulation method, can be triggered manually or by AdaptativeQueryExecution

  • Increase or decrease the number of partitions
  • Create partitions with equal sizes using shuffle

Shuffle

Spills

When data does not fit on memory or storage. Some times can lead to Out Of Memory Error

Persist

Stores the already done computations in partitions avoiding redoing shuffles or other operations. It should be applied when the same dataset will be used multiple times

Configs

spark = SparkSession.builder.appName("optimized-spark-session")


config_params = {}

for key, value in config_params.items():
    spark = spark.config(key, value)

spark = spark.enableHiveSupport().getOrCreate()

all_configs = spark.sparkContext.getConf().getAll()

for config in all_configs:
    print(f"{config[0]} = {config[1]}")

spark.master = local

spark.eventLog.enabled = true

spark.network.crypto.keyLength = 256

spark.network.crypto.enabled = true

spark.repl.local.jars = file:///home/glue_user/livy/rsc-jars/arpack_combined_all-0.1.jar,file:///home/glue_user/livy/rsc-jars/core-1.1.2.jar,file:///home/glue_user/livy/rsc-jars/jniloader-1.1.jar,file:///home/glue_user/livy/rsc-jars/livy-api-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/livy-rsc-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/livy-thriftserver-session-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/native_ref-java-1.1.jar,file:///home/glue_user/livy/rsc-jars/native_system-java-1.1.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-armhf-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-osx-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-win-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-win-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-armhf-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-osx-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-win-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-win-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netty-all-4.1.17.Final.jar,file:///home/glue_user/livy/repl_2.12-jars/commons-codec-1.9.jar,file:///home/glue_user/livy/repl_2.12-jars/livy-core_2.12-0.7.1-incubating.jar,file:///home/glue_user/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating-LIVY-795-patched.jar

spark.app.name = livy-session-1

spark.jars = file:///home/glue_user/livy/rsc-jars/arpack_combined_all-0.1.jar,file:///home/glue_user/livy/rsc-jars/core-1.1.2.jar,file:///home/glue_user/livy/rsc-jars/jniloader-1.1.jar,file:///home/glue_user/livy/rsc-jars/livy-api-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/livy-rsc-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/livy-thriftserver-session-0.7.1-incubating.jar,file:///home/glue_user/livy/rsc-jars/native_ref-java-1.1.jar,file:///home/glue_user/livy/rsc-jars/native_system-java-1.1.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-armhf-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-linux-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-osx-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-win-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_ref-win-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-armhf-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-linux-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-osx-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-win-i686-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netlib-native_system-win-x86_64-1.1-natives.jar,file:///home/glue_user/livy/rsc-jars/netty-all-4.1.17.Final.jar,file:///home/glue_user/livy/repl_2.12-jars/commons-codec-1.9.jar,file:///home/glue_user/livy/repl_2.12-jars/livy-core_2.12-0.7.1-incubating.jar,file:///home/glue_user/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating-LIVY-795-patched.jar

spark.network.crypto.keyFactoryAlgorithm = PBKDF2WithHmacSHA256

spark.hadoop.dynamodb.customAWSCredentialsProvider = com.amazonaws.auth.DefaultAWSCredentialsProviderChain

spark.yarn.submit.waitAppCompletion = false

spark.driver.host = 1db0a48ffb16

spark.driver.port = 38725

spark.app.startTime = 1766755739882

spark.app.initial.jar.urls = spark://1db0a48ffb16:38725/jars/livy-repl_2.12-0.7.1-incubating-LIVY-795-patched.jar,spark://1db0a48ffb16:38725/jars/commons-codec-1.9.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-win-i686-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-win-i686-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/core-1.1.2.jar,spark://1db0a48ffb16:38725/jars/jniloader-1.1.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-linux-x86_64-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/native_system-java-1.1.jar,spark://1db0a48ffb16:38725/jars/livy-thriftserver-session-0.7.1-incubating.jar,spark://1db0a48ffb16:38725/jars/netty-all-4.1.17.Final.jar,spark://1db0a48ffb16:38725/jars/livy-api-0.7.1-incubating.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-linux-i686-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-win-x86_64-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-linux-x86_64-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-linux-armhf-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-linux-armhf-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-osx-x86_64-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-osx-x86_64-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/livy-core_2.12-0.7.1-incubating.jar,spark://1db0a48ffb16:38725/jars/arpack_combined_all-0.1.jar,spark://1db0a48ffb16:38725/jars/livy-rsc-0.7.1-incubating.jar,spark://1db0a48ffb16:38725/jars/native_ref-java-1.1.jar,spark://1db0a48ffb16:38725/jars/netlib-native_system-linux-i686-1.1-natives.jar,spark://1db0a48ffb16:38725/jars/netlib-native_ref-win-x86_64-1.1-natives.jar

spark.io.encryption.enabled = false

spark.yarn.maxAppAttempts = 1

spark.yarn.dist.archives = 

spark.sql.dataPrefetch.enabled = false

spark.files = file:///home/glue_user/spark/conf/hive-site.xml

spark.yarn.heterogeneousExecutors.enabled = false

spark.submit.deployMode = client

spark.authenticate.secret = fca10096-9993-458c-b331-d05073d788b5

spark.repl.class.uri = spark://1db0a48ffb16:38725/classes

spark.app.id = local-1766755741527

spark.livy.spark_major_version = 3

spark.app.submitTime = 1766755730934

spark.executor.extraClassPath = /home/glue_user/spark/jars/*:/home/glue_user/aws-glue-libs/jars/*

spark.executor.id = driver

spark.hadoop.mapreduce.fileoutputcommitter.marksuccessfuljobs = false

spark.unsafe.sorter.spill.read.ahead.enabled = false

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version = 2

spark.executor.cores = 2

spark.sql.warehouse.dir = file:/home/glue_user/workspace/spark-warehouse

spark.repl.class.outputDir = /tmp/spark668078796105942909

spark.driver.extraJavaOptions = -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djavax.net.ssl.trustStore=/home/glue_user/.certs/aws_tls_certs/InternalAndExternalAndAWSTrustStore.jks -Djavax.net.ssl.trustStoreType=JKS -Djavax.net.ssl.trustStorePassword=amazon

spark.history.fs.logDirectory = file:////tmp/spark-events

spark.sql.catalogImplementation = hive

spark.executor.extraJavaOptions = -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djavax.net.ssl.trustStore=/home/glue_user/.certs/aws_tls_certs/InternalAndExternalAndAWSTrustStore.jks -Djavax.net.ssl.trustStoreType=JKS -Djavax.net.ssl.trustStorePassword=amazon

spark.app.initial.file.urls = file:///home/glue_user/spark/conf/hive-site.xml

spark.network.crypto.saslFallback = false

spark.submit.pyFiles = 

spark.yarn.isPython = true

spark.driver.extraClassPath = /home/glue_user/spark/jars/*:/home/glue_user/aws-glue-libs/jars/*

spark.authenticate = true

spark.driver.memory = 1000M