我正在使用多个线程在 dynamodb 中创建/更新一组。这是我正在使用的代码
# sends a request to add or update
def update(key, value_to_be_added_to_set):
# creates a key and add value to the mySet column if it doesn't exist
# else it will just add value to mySet
response = table.update_item(
Key={
'key_name': key
},
UpdateExpression='ADD mySet :val',
ExpressionAttributeValues={
':val': {value_to_be_added_to_set}
},
ReturnConsumedCapacity='INDEXES'
)
return response
我在AWS文档中找不到任何关于此操作是否保证线程安全的内容。也就是说,如果我将 [value=1] 和 [value=2] 添加到集合中,结果应始终为 value={1,2}。
所以我写了这个脚本来测试它。
import threading
from random import randrange
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('my-test')
key = f'test{randrange(1, 1000000000)}'
max_concurrency = 50
multiplier = 10
# sends a request to add or update
def update(key, value_to_be_added_to_set):
# this call will create a key and add value to the mySet column if it doesn't exist
# else it will add value to mySet
response = table.update_item(
Key={
'key_name': key
},
UpdateExpression='ADD mySet :val',
ExpressionAttributeValues={
':val': {value_to_be_added_to_set}
},
ReturnConsumedCapacity='INDEXES'
)
return response
# this method will be called by every thread
# every thread receives a unique number start from 1 to 50
def create_or_update_values(num):
start = num * multiplier
# if the thread receives 0, it will add the add values to the set from 1 to 10
# similarly thread 2 will add values from 11 to 20
# ..
# thread 49 will add values from 491 to 500
for i in range(start + 1, start + multiplier + 1):
resp = update(key, i)
print(f"Thread {i} has finished")
threads = []
# spin up threads
for i in range(0, max_concurrency):
t = threading.Thread(target=create_or_update_values, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
print("All threads have finished.")
# get mySet, convert it to list and sort it
l = list(table.get_item(Key={
'key_name': key
}, )['Item']['mySet'])
l.sort()
# verify if list contains values from 1 to 500
for i in range(1, max_concurrency * multiplier + 1):
assert int(l[i]) == i
此测试每次运行都会通过。
假设我同时更新 50 个相同的 key ,我可以安全地假设这里的线程安全吗?
最佳答案
DynamoDB 架构
DynamoDB 将项目存储在 partitions 中,它们位于称为存储节点的服务器上。
DynamoDB 遵循领导者/跟随者架构,其中所有写入(和强一致性读取)均由该分区组的领导节点提供服务。
序列化写入
所有写入均由领导节点序列化,这意味着所有更新都将按照节点接收到的顺序发生。然后,这些更改会以最终一致的方式复制到跟随者节点。
可串行化隔离可确保多个并发操作的结果相同,就像在前一个操作完成之前没有操作开始一样。 src
有关 DynamoDB 架构的更多信息,请参阅此 YouTube Video
关于amazon-web-services - 更新 dynamodb 中的集合线程安全吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75238453/