azure - 使用表存储azure中的Etag通过多线程更新属性

标签 azure azure-functions azure-blob-storage azure-table-storage

概述: 当我在容器/productID(folder)/blobName 下的 blob 存储中上传 blob 时,事件订阅会将此事件保存在存储队列中。之后, azure 函数轮询此事件并执行以下操作:

1- read from the corresponding table the current count property (how many blobs are stored under productID(folder))

2- increase the count + 1

3- write it back in the corresponding table

4- return

问题是一个竞争条件,我尝试将其放入 Lock() 中,如代码所示。当我同时上传 1000 个文件时,这有效。但是,如果我同时加载 10000 个文件并读取计数属性,它会返回超过 10000 个文件,这是错误的,它必须仅返回 10000。我还阻止了仅创建一个实例的扩展。 问题是否仍然是由竞争条件引起的(我不这么认为,但可能是),或者可能是 Azure 运行时在函数上运行了多个事件?我不确定发生了什么。任何想法都会很好

class _tableStorage:
    def __init__(self, account, key):
        self.table_service = TableService(account, key)

    def create_table(self, table_name):
        self.table_service.create_table(table_name)

    def insert_entity_table(self, table_name, entity):
        self.table_service.insert_or_replace_entity(table_name, entity)

    def exist_table(self, table_name):
        return self.table_service.exists(table_name)

    def get_entity_table(self, table_name, entity):
        return self.table_service.get_entity(
            table_name, entity.PartitionKey, entity.RowKey)

    def get_all_entities_table(self, table_name):
        try:
            list = self.table_service.query_entities(table_name)
        except:
            logging.info('unknown error by listing entities')
        return list


def get_blob_meta(url):
    parsed_url = urlparse.urlparse(url)
    return {
        "storage": parsed_url.netloc.split('.')[0],
        "contianer": parsed_url.path.split('/')[1],
        "folder": parsed_url.path.split('/')[2]
    }


threadLock = threading.Lock()


def main(msg: func.QueueMessage) -> None:
    url = json.loads(msg.get_body().decode(
        'utf-8').replace("'", "\""))['data']['url']
    logging.info(url)
    blob_meta = get_blob_meta(url)
    logging.info(blob_meta)
    table_service = _tableStorage(
        blob_meta['storage'],
        "xxxxxxxxxx")

    threadLock.acquire()
    entity = Entity()
    # should have same partition to be stored in one node.
    entity.PartitionKey = blob_meta['contianer']
    entity.RowKey = blob_meta['folder']
    if(not table_service.exist_table(blob_meta['contianer'])):
        table_service.create_table(blob_meta['contianer'])
        entity.count = 1
    else:
        entity.count = table_service.get_entity_table(
            blob_meta['contianer'], entity).count + 1

    table_service.insert_entity_table(blob_meta['contianer'], entity)
    threadLock.release()


最佳答案

两种解决方案:

第一个多线程:

想法是 header 中的 ETag 标志,以确保原子处理。首先,我读取了 count 属性和 ETag 标志。然后,我增加它。在我使用表中递增的值更新计数属性之前, if_match 会将我的 Etag 与表中存储的 Etag 进行匹配,如果 ETag 匹配,则计数将被更新,否则,它会引发错误,我捕获此错误并再次尝试读取和递增,直到更新成功 To understand more read the docu

 header_etag = "random-etag"
    response_etag = "random-response"
    while True:
        sleep(random.random())  # sleep between 0 and 1 second.
        header = table_service1.get_entity(
            client_table, client_table, client_product)
        header_etag = header['etag']
        new_count = header['Count'] + 1
        entity_product = create_product_entity(
            client_table, client_product, new_count, client_image_table)
        try:
            response_etag = table_service1.merge_entity(client_table, entity_product,
                                                        if_match=header_etag)
            break
        except:
            logging.info("race condition detected")

第二个

通过阻止多线程解决:

local.settings.json 中的本地调试

{
  "IsEncrypted": false,
  "Values": {

    "AzureFunctionsJobHost__extensions__queues__batchSize": 1,
    "AzureFunctionsJobHost__extensions__queues__newBatchThreshold": 0,

  }
}

**生产中** host.json

 "extensions": {
        "queues": {
            "batchSize": 1,
            "newBatchThreshold": 0
        }
    }

To understand pls see the documentation of microsoft

关于azure - 使用表存储azure中的Etag通过多线程更新属性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69150602/

相关文章:

python - 类型错误 : while_loop() got an unexpected keyword argument 'maximum_iterations' In Jupyter Azure

azure - 有什么方法可以在单次faceLift中添加超过1000张脸

azure - 多个 Azure EventHub 触发器可触发 Azure Function 应用程序中的单个函数

azure - Azure 中的磁盘大小较小且 IOPS 较高

azure - 删除 Azure SDK 生成的日志

azure - 我可以为 Azure Blob 存储中的单个文件拥有多个共享访问签名吗?

azure - 如何在存储帐户的防火墙中将 Azure API 管理列入白名单?

Azure Function - 限制服务总线触发

azure - 将 DocumentDB 绑定(bind)迁移到 Azure WebJobs SDK 扩展时出错

azure - Azure 存储可以从通用存储更改为 Blob 存储吗