Nested XML Processing - AWS Glue with DynamicFrames, Relationalize, and Databricks Spark-XML
This blog tackles efficient methods for reading complex XML structures with dynamic data types. We explore three approaches: 1. AWS Glue & DynamicFrame: Reads XML, adapts to data types (arrays, structs), but complexity increases with nested structures.2. AWS Glue, Databricks & Relationalize: Flattens nested data during reading using Databricks Spark-XML for precise control. 3. AWS Glue & Defined Schema: Declares schema upfront for straightforward reading of multi-nested data, but requires schema knowledge.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# Prepare the Person Name
# Get the Data Type and check if Array or struct Data type
name_type = product_df.schema["DescriptiveDetail"].dataType["Contributor"].dataType
if "array" in name_type.typeName():
print('array')
product_df = product_df.withColumn("name", explode("DescriptiveDetail.Contributor"))
product_df = product_df.withColumn("PersonName", col("name.PersonName"))
elif "struct" in name_type.typeName():
print('struct')
product_df = product_df.withColumn("PersonName", when(col("DescriptiveDetail.Contributor.struct.PersonName").isNotNull(),
col("DescriptiveDetail.Contributor.struct.PersonName")
).otherwise(None))
# Prepare the Language
language_type = product_df.schema["DescriptiveDetail"].dataType["Language"].dataType
if "array" in language_type.typeName():
print('lang array')
product_df = product_df.withColumn("Language", explode("DescriptiveDetail.Language"))
product_df = product_df.withColumn("Language", when (col("Language.LanguageRole") == 1, col("Language.LanguageCode")).otherwise(None))
elif "struct" in language_type.typeName():
print(' lang struct')
product_df = product_df.withColumn("Language", when(col("DescriptiveDetail.Language.LanguageRole") ==1,
col("DescriptiveDetail.Language.LanguageCode")
).otherwise(None))
# Prepare the book_id
product_type = product_df.schema["ProductIdentifier"].dataType
if "array" in product_type.typeName():
print('product array')
product_df = product_df.withColumn("book", explode("ProductIdentifier"))
product_df = product_df.withColumn("book_id",col("book.book_id"))
elif "struct" in product_type.typeName():
print(' product struct')
product_df = product_df.withColumn("book_id", col("ProductIdentifier.IDValue"))
book_df=product_df.select(col("book_id"),col("PersonName"),col("Language")).show()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# Prepare the Person Name
# Get the Data Type and check if Array or struct Data type
name_type = product_df.schema["DescriptiveDetail"].dataType["Contributor"].dataType
if "array" in name_type.typeName():
print('array')
product_df = product_df.withColumn("name", explode("DescriptiveDetail.Contributor"))
product_df = product_df.withColumn("PersonName", col("name.PersonName"))
elif "struct" in name_type.typeName():
print('struct')
product_df = product_df.withColumn("PersonName", when(col("DescriptiveDetail.Contributor.struct.PersonName").isNotNull(),
col("DescriptiveDetail.Contributor.struct.PersonName")
).otherwise(None))
# Prepare the Language
language_type = product_df.schema["DescriptiveDetail"].dataType["Language"].dataType
if "array" in language_type.typeName():
print('lang array')
product_df = product_df.withColumn("Language", explode("DescriptiveDetail.Language"))
product_df = product_df.withColumn("Language", when (col("Language.LanguageRole") == 1, col("Language.LanguageCode")).otherwise(None))
elif "struct" in language_type.typeName():
print(' lang struct')
product_df = product_df.withColumn("Language", when(col("DescriptiveDetail.Language.LanguageRole") ==1,
col("DescriptiveDetail.Language.LanguageCode")
).otherwise(None))
# Prepare the book_id
product_type = product_df.schema["ProductIdentifier"].dataType
if "array" in product_type.typeName():
print('product array')
product_df = product_df.withColumn("book", explode("ProductIdentifier"))
product_df = product_df.withColumn("book_id",col("book.book_id"))
elif "struct" in product_type.typeName():
print(' product struct')
product_df = product_df.withColumn("book_id", col("ProductIdentifier.IDValue"))
book_df=product_df.select(col("book_id"),col("PersonName"),col("Language")).show()


1
2
3
4
5
6
7
8
%idle_timeout 30
%glue_version 4.0
%%configure
{
"--extra-jars": "s3://glue-xml-file/jarfile/spark-xml_2.12-0.13.0.jar",
"--conf": "spark.jars.packages=com.databricks:spark-xml_2.12:0.13.0"
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.functions import col, slice, size, when, array_contains, to_date, lit, broadcast, split, udf,explode
from pyspark.sql.types import BooleanType
args = {'JOB_NAME':'book-test', 'database_name':'iceberg_dataplatform', 'input_path':'s3://glue-xml-file/bookdetail/2', 'output_path':'s3://glue-xml-file/warehouse/output'}
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
input_path = args['input_path']
output_path = args['output_path']
database_name = args['database_name']
catalog_name = "glue_catalog"
warehouse_path = f"s3://glue-xml-file/warehouse/"
xml_input_path = f"{input_path}/"
glue_temp_storage = f"s3://glue-xml-file/warehouse/GLUE_TMP"
s3path = {"paths": [input_path]}
print(f"Loading data from {s3path}")
product_df = spark.read.format('xml').options(rowTag='Product', excludeAttribute=True).load(f'{xml_input_path}')
#product_df = dynamicFrameProducts.toDF()
product_df.printSchema()
1
2
3
4
5
dyf = DynamicFrame.fromDF(product_df, glueContext, "dyf")
dfc = dyf.relationalize("root", "s3://glue-xml-file/temp-dir/")
dfc.keys()
all_keys = list(dfc.keys())
print(all_keys)

1
2
3
4
5
6
7
8
dyf = DynamicFrame.fromDF(product_df, glueContext, "dyf")
dfc = dyf.relationalize("root", "s3://glue-xml-file/temp-dir/")
dfc.keys()
all_keys = list(dfc.keys())
print(all_keys)
final_df = dfc.select('root')
final_df=final_df.toDF()
final_df.show()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
dyf = DynamicFrame.fromDF(product_df, glueContext, "dyf")
dfc = dyf.relationalize("root", "s3://glue-xml-file/temp-dir/")
dfc.keys()
all_keys = list(dfc.keys())
print(all_keys)
final_df = dfc.select('root')
final_df=final_df.toDF()
fin_df = dfc.select('root_DescriptiveDetail.Language')
fin_df=fin_df.toDF()
fin_df.createOrReplaceTempView(f"language")
final_df.createOrReplaceTempView(f"final_df")
result=spark.sql(""" select * from final_df fd join
language ct on fd.`DescriptiveDetail.Language` =ct.id""")
result.createOrReplaceTempView(f"language")
spark.sql("""select `ProductIdentifier.IDValue` isbn,`DescriptiveDetail.Language.val.LanguageCode` LanguageCode,
`DescriptiveDetail.Language.val.LanguageRole` LanguageRole from language
""").show()


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.functions import col, slice, size, when, array_contains, to_date, lit, broadcast, split, udf,explode
from pyspark.sql.types import BooleanType
from pyspark.sql.types import (
StructType,
StructField,
StringType,
IntegerType,
ArrayType
)
from pyspark.sql.functions import col, udf, from_json
args = {'JOB_NAME':'book-test', 'database_name':'iceberg_dataplatform', 'input_path':'s3://glue-xml-file/bookdetail/2', 'output_path':'s3://glue-xml-file/warehouse/output'}
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
input_path = args['input_path']
output_path = args['output_path']
database_name = args['database_name']
catalog_name = "glue_catalog"
warehouse_path = f"s3://glue-xml-file/warehouse/"
glue_temp_storage = f"s3://glue-xml-file/warehouse/GLUE_TMP"
s3path = {"paths": [input_path]}
print(f"Loading data from {s3path}")
spark = SparkSession.builder \
.config("spark.sql.warehouse.dir", warehouse_path) \
.config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path) \
.config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
product_options = {"rowTag": "Product", "attachFilename": "book_format"}
dynamicFrameProducts = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options=s3path,
format="xml",
format_options=product_options,
transformation_ctx="dynamicFrameProducts",
)
product_df = dynamicFrameProducts.toDF()
product_df.printSchema()
schema=ArrayType(StructType([StructField('LanguageCode', StringType()),StructField('LanguageRole', StringType())]))
datachoice0 = ResolveChoice.apply(
frame=dynamicFrameProducts,
choice="cast:string",
transformation_ctx="datachoice0",
)
# In string form, look to see if the string is in a square bracket [, indicating an array, if not add them
@udf(returnType=StringType())
def struct_to_array(order):
if order:
return f"[{order}]" if order[:1] != "[" else order
# Handle case where "array" is empty
return "[]"
map0 = datachoice0.toDF().withColumn(
"Language_array", from_json(struct_to_array(col("DescriptiveDetail.Language")),schema)
)
fromdataframe = DynamicFrame.fromDF(map0, glueContext, "fromdataframe0")
fromdataframe=fromdataframe.toDF()
fromdataframe.printSchema()
schema=ArrayType(StructType([StructField('LanguageCode', StringType()),StructField('LanguageRole', StringType())]))
datachoice0 = ResolveChoice.apply(
frame=dynamicFrameProducts,
choice="cast:string",
transformation_ctx="datachoice0",
)
# In string form, look to see if the string is in a square bracket [, indicating an array, if not add them
@udf(returnType=StringType())
def struct_to_array(order):
if order:
return f"[{order}]" if order[:1] != "[" else order
# Handle case where "array" is empty
return "[]"
map0 = datachoice0.toDF().withColumn(
"Language_array", from_json(struct_to_array(col("DescriptiveDetail.Language")),schema)
)
fromdataframe = DynamicFrame.fromDF(map0, glueContext, "fromdataframe0")
product_df=fromdataframe.toDF()
product_df.printSchema()
product_df.createOrReplaceTempView(f"language")
#spark.sql(""" select * from Language""").show()
language_type = product_df.schema["Language_array"].dataType
if "array" in language_type.typeName():
print('lang array')
product_df = product_df.withColumn("Language", explode("Language_array"))
product_df = product_df.withColumn("Language", when (col("Language.LanguageRole") == 1, col("Language.LanguageCode")).otherwise(None))
elif "struct" in language_type.typeName():
print(' lang struct')
product_df = product_df.withColumn("Language", when(col("Language_array.LanguageRole") ==1,
col("Language_array.LanguageCode")
).otherwise(None))
# Prepare the book_id
product_type = product_df.schema["ProductIdentifier"].dataType
if "array" in product_type.typeName():
print('ID array')
product_df = product_df.withColumn("book", explode("ProductIdentifier"))
product_df = product_df.withColumn("book_id",col("book.book_id"))
elif "struct" in product_type.typeName():
print(' ID struct')
product_df = product_df.withColumn("book_id", col("ProductIdentifier.IDValue"))
#product_df.printSchema()
book_df=product_df.select(col("book_id"),col("Language")).show()
