![Fixing... err... I mean "implementing" my Amazon DynamoDB approach with Gen-AI [Part 2]](/_next/image?url=https%3A%2F%2Fassets.community.aws%2Fa%2F2ihPAYuW33YDVDaQMmU0DuXAvWb%2Fcoff.webp%3FimgSize%3D1200x630&w=3840&q=75)
Fixing... err... I mean "implementing" my Amazon DynamoDB approach with Gen-AI [Part 2]
Second part of the series where I build a serverless app to track my Green Card priority date and implement storing the data in DynamoDB.
pk = f"{filing_type}#{category}"
and the secondary key as sk = f"{country}"
. The data that I'm trying to store has a date for when the Visa Bulletin was published (bulletin_date
), and each bulletin has two tables I'm interested in for the priority date (filing_type
for Final Date
and Application Date
). Each of those tables has a row per visa category (category
), and column for the country groupings. The date for each combination of country and category is what I'm trying to store. This will allow me to query the historic data specific to me, with category='3rd'
, filing_type='Final Date'
, and country='All Chargeability Areas Except Those Listed'
.store_data
function with structure below (and also implement handling table request throttling), with a quick detour to dig into PROVISIONED
vs PAY_PER_REQUEST
for my table access. Now we can update our Terraform with the new table definition:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
resource "aws_dynamodb_table" "visa_bulletin_data" {
name = "VisaBulletinData"
billing_mode = "PAY_PER_REQUEST"
hash_key = "pk"
range_key = "sk"
attribute {
name = "pk"
type = "S"
}
attribute {
name = "sk"
type = "S"
}
}
bulletin_date
will be used for the primary key (pk
), and a composite secondary key (sk
) using the filing_type
, country
, and category
to group the data. Our pk
and sk
will be defined in our code as:1
2
pk = f"BULLETIN_DATE#{bulletin_date.strftime('%Y-%m')}"
sk = f"FILING_TYPE#{filing_type}#COUNTRY#{country}#CATEGORY#{category}"
scrape_visa_bulletin
function, I'm looping over each of the bulletins to extract the data, and adding that URL to the ProcessedURLs
table so I don't process it again, but if writing to the database fails, it won't reprocess the pages that weren't saved. We now have the following for our store_data
function:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def store_data(data):
try:
print("Storing data")
for item in data:
filing_type = item['filing_type']
country = item['country']
category = item['category']
bulletin_date = datetime.strptime(item['bulletin_date'], "%Y-%m-%d")
date = datetime.strptime(item['date'], "%Y-%m-%d")
pk = f"BULLETIN_DATE#{bulletin_date.strftime('%Y-%m')}"
sk = f"FILING_TYPE#{filing_type}#COUNTRY#{country}#CATEGORY#{category}"
table.put_item(
Item={
'pk': pk,
'sk': sk,
'filing_type': filing_type,
'country': country,
'category': category,
'bulletin_date': bulletin_date.strftime("%Y-%m-%d"),
'date': date.strftime("%Y-%m-%d")
}
)
print("Done storing data")
except Exception as e:
print(f"Unable to store the data, error: {e}")
terraform apply
, grabbing some fresh coffee, finishing it, and cleaning my keyboard, I can test the updated code.TIL
: it takes time to change a DynamoDB table's access mode.
1
2
3
4
5
6
7
8
aws_dynamodb_table.visa_bulletin_data: Modifications complete after 17m41s [id=VisaBulletinData]
aws_lambda_function.visa_bulletin_scraper: Modifying... [id=visa-bulletin-scraper]
aws_lambda_function.visa_bulletin_scraper: Still modifying... [id=visa-bulletin-scraper, 10s elapsed]
aws_lambda_function.visa_bulletin_scraper: Still modifying... [id=visa-bulletin-scraper, 20s elapsed]
aws_lambda_function.visa_bulletin_scraper: Modifications complete after 25s [id=visa-bulletin-scraper]
Apply complete! Resources: 1 added, 3 changed, 1 destroyed.
(package) ➜ uscis-priority-date-tracker git:(main) ✗
read_data
method to retrieve it.read_data
to use the new structure, and printing the output provides the following code (I'm using defaults for now) after needing to query using the wrong pk
and sk
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def read_data(filing_type='Final Date', category='3rd', country='All Chargeability Areas Except Those Listed'):
pk_prefix = "BULLETIN_DATE#"
sk_prefix = f"FILING_TYPE#{filing_type}#COUNTRY#{country}#CATEGORY#{category}"
response = table.query(
KeyConditionExpression=Key('pk').begins_with(pk_prefix) & Key('sk').begins_with(sk_prefix),
ScanIndexForward=False
)
items = response['Items']
# Sort the items by bulletin_date in descending order
sorted_items = sorted(items, key=lambda x: x['bulletin_date'], reverse=True)
# Print the header
print(f"Retrieved the [{filing_type}] data for [{category}] for [{country}]:")
# Print each item
for item in sorted_items:
bulletin_date = item['bulletin_date']
date = item['date']
print(f"Bulletin {bulletin_date}: {date}")
return sorted_items
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Traceback (most recent call last):
File "/Users/cobusb/projects/uscis-priority-date-tracker/src/local_test.py", line 21, in <module>
result = lambda_handler(mock_event, mock_context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/cobusb/projects/uscis-priority-date-tracker/src/handler.py", line 190, in lambda_handler
eb3_data = read_data()
^^^^^^^^^^^
File "/Users/cobusb/projects/uscis-priority-date-tracker/src/handler.py", line 123, in read_data
response = table.query(
^^^^^^^^^^^^
File "/Users/cobusb/projects/terraform-samples/us-visa-dates-checker/src/package/lib/python3.12/site-packages/boto3/resources/factory.py", line 581, in do_action
response = action(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/cobusb/projects/terraform-samples/us-visa-dates-checker/src/package/lib/python3.12/site-packages/boto3/resources/action.py", line 88, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/cobusb/projects/terraform-samples/us-visa-dates-checker/src/package/lib/python3.12/site-packages/botocore/client.py", line 565, in _api_call
return self._make_api_call(operation_name, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/cobusb/projects/terraform-samples/us-visa-dates-checker/src/package/lib/python3.12/site-packages/botocore/client.py", line 1021, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the Query operation: Query key condition not supported
venv
from that folder, but as long as my code runs, I'm going to ignore it for now. Let's fix the database first.store_data
and read_data
methods to use the new schema:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def store_data(data):
try:
for item in data:
filing_type = item['filing_type']
country = item['country']
category = item['category']
bulletin_date = datetime.strptime(item['bulletin_date'], "%Y-%m-%d")
date = datetime.strptime(item['date'], "%Y-%m-%d")
pk = f"FILING_TYPE#{filing_type}#CATEGORY#{category}#COUNTRY#{country}"
sk = f"BULLETIN_DATE#{bulletin_date.strftime('%Y-%m-%d')}"
table.put_item(
Item={
'pk': pk,
'sk': sk,
'filing_type': filing_type,
'country': country,
'category': category,
'bulletin_date': bulletin_date.strftime("%Y-%m-%d"),
'date': date.strftime("%Y-%m-%d")
}
)
except Exception as e:
print(f"Unable to store the data, error: {e}")
def read_data(filing_type='Final Date', category='3rd', country='All Chargeability Areas Except Those Listed'):
pk = f"FILING_TYPE#{filing_type}#CATEGORY#{category}#COUNTRY#{country}"
response = table.query(
KeyConditionExpression=Key('pk').eq(pk),
ScanIndexForward=False # Reverse the order to get the latest bulletin_date first
)
items = response['Items']
# Sort the items by bulletin_date in descending order
sorted_items = sorted(items, key=lambda x: x['sk'], reverse=True)
# Print the header
print(f"Retrieved the [{filing_type}] data for [{category}] for [{country}]:")
# Print each item
for item in sorted_items:
date = item['date']
print(f"Bulletin {bulletin_date}: {date}")
return sorted_items
TRUNCATE TABLE
statement, and it doesn't look like it exists. I'm now very curious what will happen to the data, so I'm going to update the table definition in Terraform and YOLO it.terraform apply
, the code works! Here's the first few lines of output:1
2
3
4
5
6
7
8
9
Retrieved the [Final Date] data for [3rd] for [All Chargeability Areas Except Those Listed]:
Bulletin 2024-07-01: 2021-12-01
Bulletin 2024-06-01: 2022-11-22
Bulletin 2024-05-01: 2022-11-22
Bulletin 2024-04-01: 2022-11-22
Bulletin 2024-03-01: 2022-09-08
Bulletin 2024-02-01: 2022-09-01
Bulletin 2024-01-01: 2022-08-01
Bulletin 2023-12-01: 2021-12-01
pk
and sk
for the data that didn't change the table schema, I think I'm starting to think about the data structures in the right way. And repeating the recap from the first piece, the overall process of trial and error by patching different solutions together is different. Instead of the searching and multiple tabs, I now rarely leave my IDE.article-2
, and pushed the tag. Source code for this article.read_data
function. At least, that is the plan, right after I double check the message I had after pushing this code:1
2
3
4
remote: Resolving deltas: 100% (4/4), completed with 4 local objects.
remote:
remote: GitHub found 1 vulnerability on build-on-aws/uscis-priority-date-tracker's default branch (1 moderate). To find out more, visit:
remote: https://github.com/build-on-aws/uscis-priority-date-tracker/security/dependabot/1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
resource "aws_dynamodb_table" "visa_bulletin_data" {
name = "VisaBulletinData"
billing_mode = "PROVISIONED"
read_capacity = 5
write_capacity = 5
hash_key = "pk"
range_key = "sk"
attribute {
name = "pk"
type = "S"
}
attribute {
name = "sk"
type = "S"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
resource "aws_dynamodb_table" "visa_bulletin_data" {
name = "VisaBulletinData"
billing_mode = "PROVISIONED" # or "PAY_PER_REQUEST"
read_capacity = 5 # Set appropriate read/write capacity units for provisioned mode
write_capacity = 5
hash_key = "pk" # Partition key
range_key = "sk" # Sort key
attribute {
name = "pk"
type = "S" # String type
}
attribute {
name = "sk"
type = "S" # String type
}
# Additional settings, global secondary indexes, etc.
}
pk
) and a sort key (sk
). Both keys are defined as strings (S
).1
2
pk = "BULLETIN_DATE#<BulletinDate>"
sk = "FILING_TYPE#<FilingType>#COUNTRY#<Country>#CATEGORY#<Category>"
BulletinDate
of "2023-05", a FilingType
of "priority_date", a Country
of "India", and a Category
of "EB3", the primary key would be: [2]1
2
pk = "BULLETIN_DATE#2023-05"
sk = "FILING_TYPE#priority_date#COUNTRY#India#CATEGORY#EB3"
PriorityDate
value as an attribute in the DynamoDB item.pk
and sk
attributes to allow efficient querying based on different combinations of the primary key attributes.<BulletinDate>
, <FilingType>
, <Country>
, and <Category>
) with your actual values when constructing the primary key.1
2
3
4
5
6
7
employment_based_data.append({
'filing_type': filing_type,
'country': countries[cell_id - 1],
'category': category_value,
'bulletin_date': bulletin_date.strftime("%Y-%m-%d"),
'date': cell_date.strftime("%Y-%m-%d")
})
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import boto3
from datetime import datetime
# Initialize DynamoDB resource
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('VisaBulletinData')
def store_data(data):
for item in data:
filing_type = item['filing_type']
country = item['country']
category = item['category']
bulletin_date = datetime.strptime(item['bulletin_date'], "%Y-%m-%d")
date = datetime.strptime(item['date'], "%Y-%m-%d")
pk = f"BULLETIN_DATE#{bulletin_date.strftime('%Y-%m')}"
sk = f"FILING_TYPE#{filing_type}#COUNTRY#{country}#CATEGORY#{category}"
table.put_item(
Item={
'pk': pk,
'sk': sk,
'filing_type': filing_type,
'country': country,
'category': category,
'bulletin_date': bulletin_date.strftime("%Y-%m-%d"),
'date': date.strftime("%Y-%m-%d")
}
)
# Assuming you have the employment_based_data list populated
store_data(employment_based_data)
store_data
function works:- We iterate over each item in the
employment_based_data
list. - For each item, we extract the
filing_type
,country
,category
,bulletin_date
, anddate
values. - We construct the partition key (
pk
) using theBULLETIN_DATE
prefix and thebulletin_date
in the formatYYYY-MM
. - We construct the sort key (
sk
) using theFILING_TYPE
,filing_type
,COUNTRY
,country
, andCATEGORY
,category
values. - We use the
put_item
method of the DynamoDB table to store the item with the constructed primary key (pk
andsk
) and the other attributes (filing_type
,country
,category
,bulletin_date
, anddate
). [2]
datetime
module to convert the bulletin_date
and date
strings to datetime
objects, and then back to strings in the desired format (YYYY-MM
for bulletin_date
and YYYY-MM-DD
for date
).store_data
function to achieve this:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import boto3
from datetime import datetime
import time
import random
import math
# Initialize DynamoDB resource
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('VisaBulletinData')
def store_data(data):
for item in data:
filing_type = item['filing_type']
country = item['country']
category = item['category']
bulletin_date = datetime.strptime(item['bulletin_date'], "%Y-%m-%d")
date = datetime.strptime(item['date'], "%Y-%m-%d")
pk = f"BULLETIN_DATE#{bulletin_date.strftime('%Y-%m')}"
sk = f"FILING_TYPE#{filing_type}#COUNTRY#{country}#CATEGORY#{category}"
max_retries = 5 # Maximum number of retries
retry_delay = 1 # Initial retry delay in seconds
max_delay = 60 # Maximum retry delay in seconds
for attempt in range(max_retries):
try:
table.put_item(
Item={
'pk': pk,
'sk': sk,
'filing_type': filing_type,
'country': country,
'category': category,
'bulletin_date': bulletin_date.strftime("%Y-%m-%d"),
'date': date.strftime("%Y-%m-%d")
}
)
break # Exit the retry loop if the write succeeds
except table.meta.client.exceptions.ProvisionedThroughputExceededException as e:
if attempt == max_retries - 1:
raise e # Raise the exception if the maximum number of retries is reached
else:
# Apply exponential backoff with jitter
delay = retry_delay * (2 ** attempt) + random.uniform(0, 1)
delay = min(delay, max_delay)
print(f"Write capacity exceeded, retrying in {delay} seconds...")
time.sleep(delay)
except Exception as e:
print(f"Error writing item to DynamoDB: {e}")
break
# Assuming you have the employment_based_data list populated
store_data(employment_based_data)
store_data
function:- We define the maximum number of retries (
max_retries
) and the initial retry delay (retry_delay
) in seconds. - We also define the maximum retry delay (
max_delay
) in seconds to prevent excessive delays. - We wrap the
put_item
operation in afor
loop that runs up tomax_retries
times. - If the
ProvisionedThroughputExceededException
is raised, indicating that the write capacity has been exceeded, we apply an exponential backoff with jitter before retrying the write operation. - The exponential backoff delay is calculated as
retry_delay * (2 ** attempt) + random.uniform(0, 1)
, whereattempt
is the current retry attempt. This introduces a random jitter to prevent multiple clients from retrying at the same time. - The calculated delay is capped at
max_delay
to prevent excessive delays. - If the maximum number of retries is reached, the
ProvisionedThroughputExceededException
is raised, allowing you to handle it as needed. - For any other exceptions, we print an error message and break out of the retry loop.
ProvisionedThroughputExceededException
specifically, as DynamoDB will automatically handle the scaling for you.- In this mode, you specify the number of read and write capacity units you expect your application to require.
- You are billed for the provisioned capacity, regardless of whether you consume it or not.
- Provisioned capacity is suitable for workloads with predictable or consistent traffic patterns, where you can forecast your capacity requirements. [2]
- You can enable auto scaling to automatically adjust the provisioned capacity based on your utilization, helping to optimize costs and performance.
- If your application exceeds the provisioned capacity, you may experience throttling, which can be mitigated by increasing the provisioned capacity or enabling auto scaling.
- In this mode, you don't need to provision any capacity upfront.
- You pay for the data reads and writes your application performs, based on the number of read and write request units consumed.
- On-demand capacity is ideal for workloads with unpredictable or [3]
read_data
function to retrieve and sort the data based on the filing_type
, category
, and country
parameters, using the bulletin_date
for sorting:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from boto3.dynamodb.conditions import Key
def read_data(filing_type='Final Date', category='3rd', country='All Chargeability Areas Except Those Listed'):
pk = f"FILING_TYPE#{filing_type}#CATEGORY#{category}"
sk_prefix = f"COUNTRY#{country}"
response = table.query(
KeyConditionExpression=Key('pk').eq(pk) & Key('sk').begins_with(sk_prefix),
ScanIndexForward=False # Reverse the order to get the latest bulletin_date first
)
items = response['Items']
# Sort the items by bulletin_date in descending order
sorted_items = sorted(items, key=lambda x: x['bulletin_date'], reverse=True)
return sorted_items
read_data
function works:- We construct the partition key (
pk
) using theFILING_TYPE
,filing_type
, andCATEGORY
,category
values. - We construct the sort key prefix (
sk_prefix
) using theCOUNTRY
andcountry
values. - We use the
query
method of the DynamoDB table to retrieve items where the partition key (pk
) matches the constructed value, and the sort key (sk
) begins with the constructed prefix (sk_prefix
). - We set the
ScanIndexForward
parameter toFalse
to reverse the order of the results, so that the latestbulletin_date
is returned first. - We store the retrieved items in the
items
variable. - We sort the
items
list in descending order based on thebulletin_date
attribute using thesorted
function and a lambda function as thekey
. - We return the sorted list of items.
read_data
function, you can call it with the desired filing_type
, category
, and country
parameters:1
2
3
4
5
# Retrieve and sort data for 'Final Date', '3rd' category, and 'All Chargeability Areas Except Those Listed' country
sorted_data = read_data()
# Retrieve and sort data for 'Filing Date', 'EB2' category, and 'India' country
sorted_data = read_data(filing_type='Filing Date', category='EB2', country='India')
sorted_data
variable will contain a list of items sorted in descending order by bulletin_date
, filtered by the specified filing_type
, category
, and country
.query
operation in DynamoDB is efficient for retrieving data based on the primary key (partition key and sort key). If you need to filter the data based on other attributes, you may need to consider using a Global Secondary Index (GSI) or performing a scan operation, which can be less efficient for large datasets.The code provided by this response isn't quite correct, it is using the previous table schema for thepk
andsk
, so I just copied the lines fromstore_data
to replace them.
read_data
function to print the desired output format:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from boto3.dynamodb.conditions import Key
def read_data(filing_type='Final Date', category='3rd', country='All Chargeability Areas Except Those Listed'):
pk = f"FILING_TYPE#{filing_type}#CATEGORY#{category}"
sk_prefix = f"COUNTRY#{country}"
response = table.query(
KeyConditionExpression=Key('pk').eq(pk) & Key('sk').begins_with(sk_prefix),
ScanIndexForward=False # Reverse the order to get the latest bulletin_date first
)
items = response['Items']
# Sort the items by bulletin_date in descending order
sorted_items = sorted(items, key=lambda x: x['bulletin_date'], reverse=True)
# Print the header
print(f"Retrieved the [{filing_type}] data for [{category}] for [{country}]:")
# Print each item
for item in sorted_items:
bulletin_date = item['bulletin_date']
date = item['date']
print(f"Bulletin {bulletin_date}: {date}")
return sorted_items
read_data
function, we've added the following changes:- After sorting the items, we print the header line using the
print
function and an f-string. The header line will be in the format:Retrieved the [Final Date] data for [EB3] for [All Chargeability Areas Except Those Listed]: - We then iterate over the
sorted_items
list using afor
loop. - For each item, we extract the
bulletin_date
anddate
values. - We print a line for each item using the
print
function and an f-string. The line will be in the format:Bulletin 2024-07-01: 2021-11-01
read_data
function, you can call it with the desired filing_type
, category
, and country
parameters:1
2
3
4
5
# Retrieve and print data for 'Final Date', '3rd' category, and 'All Chargeability Areas Except Those Listed' country
read_data()
# Retrieve and print data for 'Filing Date', 'EB2' category, and 'India' country
read_data(filing_type='Filing Date', category='EB2', country='India')
bulletin_date
and date
values.bulletin_date
and date
). If the data structure or attribute names are different, you'll need to adjust the code accordingly.1
2
pk = f"BULLETIN_DATE#{bulletin_date.strftime('%Y-%m')}"
sk = f"FILING_TYPE#{filing_type}#COUNTRY#{country}#CATEGORY#{category}"
read_data
function with the correct primary key construction:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from boto3.dynamodb.conditions import Key
def read_data(filing_type='Final Date', category='3rd', country='All Chargeability Areas Except Those Listed'):
pk_prefix = "BULLETIN_DATE#"
sk_prefix = f"FILING_TYPE#{filing_type}#COUNTRY#{country}#CATEGORY#{category}"
response = table.query(
KeyConditionExpression=Key('pk').begins_with(pk_prefix) & Key('sk').begins_with(sk_prefix),
ScanIndexForward=False # Reverse the order to get the latest bulletin_date first
)
items = response['Items']
# Sort the items by bulletin_date in descending order
sorted_items = sorted(items, key=lambda x: x['bulletin_date'], reverse=True)
# Print the header
print(f"Retrieved the [{filing_type}] data for [{category}] for [{country}]:")
# Print each item
for item in sorted_items:
bulletin_date = item['bulletin_date']
date = item['date']
print(f"Bulletin {bulletin_date}: {date}")
return sorted_items
- We define the
pk_prefix
as"BULLETIN_DATE#"
to match the partition key structure we defined earlier. - We construct the
sk_prefix
using thefiling_type
,country
, andcategory
values, following the sort key structure we defined earlier. - In the
KeyConditionExpression
of thequery
method, we use thebegins_with
condition for both the partition key (pk
) and the sort key (sk
) to retrieve items where thepk
begins with"BULLETIN_DATE#"
and thesk
begins with the constructedsk_prefix
.
read_data
function should now correctly retrieve and print the data based on the primary key structure we defined earlier.1
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the Query operation: Query key condition not supported
ValidationException
is occurring because the KeyConditionExpression
we're using is not supported for the query
operation in DynamoDB. [1]query
operation in DynamoDB requires the partition key to be an equality condition, and the sort key can either be an equality condition or a range condition (using comparison operators or the begins_with
function).read_data
function to use the partition key as an equality condition and the sort key as a range condition. Here's the updated code:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from boto3.dynamodb.conditions import Key
def read_data(filing_type='Final Date', category='3rd', country='All Chargeability Areas Except Those Listed'):
pk = f"BULLETIN_DATE#{bulletin_date.strftime('%Y-%m')}"
sk_prefix = f"FILING_TYPE#{filing_type}#COUNTRY#{country}#CATEGORY#{category}"
response = table.query(
KeyConditionExpression=Key('pk').eq(pk) & Key('sk').begins_with(sk_prefix),
ScanIndexForward=False # Reverse the order to get the latest bulletin_date first
)
items = response['Items']
# Sort the items by bulletin_date in descending order
sorted_items = sorted(items, key=lambda x: x['bulletin_date'], reverse=True)
# Print the header
print(f"Retrieved the [{filing_type}] data for [{category}] for [{country}]:")
# Print each item
for item in sorted_items:
bulletin_date = item['bulletin_date']
date = item['date']
print(f"Bulletin {bulletin_date}: {date}")
return sorted_items
- We construct the partition key (
pk
) using theBULLETIN_DATE
prefix and thebulletin_date
in the formatYYYY-MM
. - In the
KeyConditionExpression
of thequery
method, we use theeq
condition for the partition key (pk
) to match the constructedpk
value exactly. - For the sort key (
sk
), we use thebegins_with
condition with the constructedsk_prefix
.
eq
condition for the partition key and the begins_with
condition for the sort key, we should be able to query the data correctly without encountering the ValidationException
.bulletin_date
parameter to the read_data
function, or you can use a default value if you want to retrieve the latest data by default.filing_type
, category
, and country
.1
2
pk = "FILING_TYPE#<filing_type>#CATEGORY#<category>#COUNTRY#<country>"
sk = "BULLETIN_DATE#<bulletin_date>"
pk
) will be a composite key consisting of the filing_type
, category
, and country
values, separated by a delimiter (#
). The sort key ( sk
) will be the bulletin_date
.store_data
and read_data
functions to use this new primary key structure:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def store_data(data):
for item in data:
filing_type = item['filing_type']
country = item['country']
category = item['category']
bulletin_date = datetime.strptime(item['bulletin_date'], "%Y-%m-%d")
date = datetime.strptime(item['date'], "%Y-%m-%d")
pk = f"FILING_TYPE#{filing_type}#CATEGORY#{category}#COUNTRY#{country}"
sk = f"BULLETIN_DATE#{bulletin_date.strftime('%Y-%m-%d')}"
table.put_item(
Item={
'pk': pk,
'sk': sk,
'filing_type': filing_type,
'country': country,
'category': category,
'bulletin_date': bulletin_date.strftime("%Y-%m-%d"),
'date': date.strftime("%Y-%m-%d")
}
)
def read_data(filing_type, category, country):
pk = f"FILING_TYPE#{filing_type}#CATEGORY#{category}#COUNTRY#{country}"
response = table.query(
KeyConditionExpression=Key('pk').eq(pk),
ScanIndexForward=False # Reverse the order to get the latest bulletin_date first
)
items = response['Items']
# Sort the items by bulletin_date in descending order
sorted_items = sorted(items, key=lambda x: x['sk'], reverse=True)
# Print the header
print(f"Retrieved the [{filing_type}] data for [{category}] for [{country}]:")
# Print each item
for item in sorted_items:
date = item['date']
print(f"Bulletin {bulletin_date}: {date}")
return sorted_items
store_data
function:- We construct the partition key (
pk
) using theFILING_TYPE
,filing_type
,CATEGORY
,category
, andCOUNTRY
,country
values. - We construct the sort key (
sk
) using theBULLETIN_DATE
prefix and thebulletin_date
value in the formatYYYY-MM-DD
.
read_data
function:- We construct the partition key (
pk
) using thefiling_type
,category
, andcountry
values passed as arguments. - We use the
query
method with theKeyConditionExpression
set toKey('pk').eq(pk)
to retrieve items where the partition key matches the constructedpk
value exactly. - We sort the retrieved items by the sort key (
sk
) in descending order. - When printing the data, we extract the
bulletin_date
from the sort key (sk
) by splitting it on the#
delimiter and taking the second part.
filing_type
, category
, and country
combination by passing those values to the read_data
function:1
2
3
4
5
# Retrieve data for 'Final Date', 'EB3', and 'All Chargeability Areas Except Those Listed'
read_data('Final Date', 'EB3', 'All Chargeability Areas Except Those Listed')
# Retrieve data for 'Filing Date', 'EB2', and 'India'
read_data('Filing Date', 'EB2', 'India')
filing_type
, category
, and country
.scan
operation to retrieve all items in the table, and then use the batch-write-item
operation to delete them in batches[1].1
aws dynamodb scan --table-name VisaBulletinData --output json > items.json
VisaBulletinData
table and store the retrieved items in a JSON file named items.json
.batch-write-item
operation:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import json
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('VisaBulletinData')
# Load the items from the JSON file
with open('items.json', 'r') as f:
items = json.load(f)['Items']
# Batch size for the write operation
batch_size = 25
# Delete items in batches
for i in range(0, len(items), batch_size):
batch = items[i:i+batch_size]
with table.batch_writer() as batch_writer:
for item in batch:
print(f"Deleted batch {i//batch_size+1}")
print("All items deleted from the table.")
- It loads the items from the
items.json
file created by thescan
operation. - It defines a
batch_size
for thebatch-write-item
operation. In this example, it's set to 25, but you can adjust it based on your requirements. - It iterates over the items in batches of
batch_size
. - For each batch, it uses the
batch_writer
context manager to perform thebatch-write-item
operation. - Inside the
batch_writer
context, it iterates over the items in the batch and calls thedelete_item
method with the primary key (pk
andsk
) of each item. - After each batch is processed, it prints a message indicating the batch number.
- Finally, it prints a message indicating that all items have been deleted from the table.
'VisaBulletinData'
with the name of your DynamoDB table.DeleteTable
operation, which deletes the entire table and all its data. [1]1
aws dynamodb delete-table --table-name VisaBulletinData
VisaBulletinData
with the name of your DynamoDB table.DELETING
state for a short period, and then it will be completely removed from DynamoDB.DeleteTable
operation to complete before creating a new table with the same name.DeleteTable
operation. [2]DeleteTable
operation is an irreversible operation, and all data in the table will be permanently deleted. If you need to keep a backup of the data, you should export it before running the DeleteTable
operation.Interesting thatttl
andtags
suddenly pop up in this response, I'm going to strip that out as I don't need or want that.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
resource "aws_dynamodb_table" "visa_bulletin_data" {
name = "VisaBulletinData"
billing_mode = "PAY_PER_REQUEST"
hash_key = "pk"
range_key = "sk"
attribute {
name = "pk"
type = "S"
}
attribute {
name = "sk"
type = "S"
}
ttl {
attribute_name = "TimeToExist"
enabled = false
}
tags = {
Name = "VisaBulletinData"
Environment = "production"
}
}
- It defines an
aws_dynamodb_table
resource with the namevisa_bulletin_data
. - The
name
attribute sets the name of the DynamoDB table toVisaBulletinData
. - The
billing_mode
is set toPAY_PER_REQUEST
, which enables on-demand pricing for the table. [2] - The
hash_key
andrange_key
attributes define the partition key (pk
) and sort key (sk
) for the table, respectively. - The
attribute
blocks define the attributes for the partition key (pk
) and sort key (sk
), both of which are strings (S
). - The
ttl
block is used to configure the Time to Live (TTL) feature for the table. In this case, it's disabled by settingenabled
tofalse
. - The
tags
block defines two tags for the DynamoDB table:Name
andEnvironment
.
VisaBulletinData
with the specified primary key structure ( pk
and sk
) and using on-demand pricing.terraform init
and terraform apply
to create the DynamoDB table.Any opinions in this post are those of the individual author and may not reflect the opinions of AWS.