python - 执行数千个独立事务的可靠方法?

标签 python google-app-engine google-cloud-datastore python-2.7

我的应用程序遇到瓶颈,很难找到解决方案。一点背景:

  • 我的应用程序 ping 一个 API 以收集有关数十万个项目的信息并将它们存储到数据存储
  • 我们需要对这些项的混合维度执行简单的聚合,我们在存储项
  • 期间尝试和计算这些维度。

    当前实现:
  • 我们根据需要手动开始下载这些项目,这会在专门用于下载这些项目的后端创建任务。每个任务将根据分页和获取每个项目所需的 API 调用数量启动更多任务。
  • 每个任务将下载、解析和批量存储项目,同时通过使用字典将我们想要的聚合保存在内存中。
  • 在每个任务执行结束时,我们将聚合字典写入拉取队列。
  • 一旦我们检测到 API 调用即将结束,我们就会启动一个聚合任务到第二个后端配置
  • 这个“聚合任务”从拉取队列(一次 20 个)中拉取,并在尝试存储每个聚合之前合并在每个任务中找到的字典(进一步在内存聚合中进行)。此任务还将启动其他任务以对拉取队列中的剩余任务执行聚合(数百个)
  • 我们使用 sharded counter在存储到数据存储时帮助缓解任何争用的方法
  • 每个聚合任务可以尝试存储 500-1500 个聚合,这些聚合应该相互独立

  • 还有其他检查等,以确保正确处理所有拉取队列任务并下载所有项目。

    问题:

    我们希望尽快下载和存储所有项目和聚合。我为所描述的每个后端配置启用了 20 个实例(我将它们称为“聚合器”后端和“下载器”后端)。下载器后端似乎相当快地通过 API 调用。我大量使用 NDB 库和异步 URL Fetches/Datastore 调用来获得它。我还启用了 threadsafe:true 以便没有实例在开始下一个任务之前等待 RPC 调用完成(所有任务都可以独立运行并且是幂等的)。

    聚合器后端是重要的时间槽发挥作用的地方。通过事务异步存储 500-1500 个这些聚合需要 40 秒或更长时间(我什至不认为所有事务都被正确提交)。我将此后端保留为 threadsafe:false,因为我使用 300 秒的拉取队列到期期限,但是如果我允许在单个实例上执行多个任务,它们可能会级联并插入完成一些任务超过 300 秒标记,从而允许另一个任务第二次提取相同的任务,并且可能会重复计算。

    日志显示 BadRequestError: Nested transactions are not supported.以前的错误(在堆栈跟踪中)为 TransactionFailedError: too much contention on these datastore entities. please try again. .我经常看到的另一个错误是 BadRequestError(The referenced transaction has expired or is no longer valid.)
    根据我的理解,有时这些错误意味着事务仍然可以在没有进一步交互的情况下提交。我怎么知道这是否已正确提交?我是否以合乎逻辑/高效的方式执行此操作,或者是否有更多的并发空间而不会弄乱一切?

    相关代码:
    class GeneralShardConfig(ndb.Model):
        """Tracks the number of shards for each named counter."""
        name = ndb.StringProperty(required=True)
        num_shards = ndb.IntegerProperty(default=4)
    
    class GeneralAggregateShard(ndb.Model):
        """Shards for each named counter"""
        name = ndb.StringProperty(name='n', required=True)
        count = ndb.FloatProperty(name='c', default=0.00) #acts as a total now
    
    @ndb.tasklet
    def increment_batch(data_set):
        def run_txn(name, value):
            @ndb.tasklet
            def txn():
                to_put = []
                dbkey = ndb.Key(GeneralShardConfig, name)
                config = yield dbkey.get_async(use_memcache=False)
                if not config:
                    config = GeneralShardConfig(key=dbkey,name=name)
                    to_put.append(config)
                index = random.randint(0, config.num_shards-1)
                shard_name =  name + str(index)
                dbkey = ndb.Key(GeneralAggregateShard, shard_name)
                counter = yield dbkey.get_async()
                if not counter:
                    counter = GeneralAggregateShard(key=dbkey, name=name)
                counter.count += value
                to_put.append(counter)
                yield ndb.put_multi_async(to_put)
            return ndb.transaction_async(txn, use_memcache=False, xg=True)
        res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]
        raise ndb.Return(res)
    

    鉴于实现,我看到的唯一“争用”空间是 2 个或更多聚合任务是否需要更新相同的聚合名称,这不应该太频繁发生,并且对于分片计数器,我希望这种重叠很少,如果有的话,发生。我假设BadRequestError(The referenced transaction has expired or is no longer valid.)当事件循环检查所有 tasklet 的状态并命中对已完成事务的引用时,就会出现错误。这里的问题是它出错了,这是否意味着所有交易都被过早地切断了,或者我可以假设所有交易都通过了?我进一步假设这条线 res = yield[run_txn(key, value) for key, value in data_set.iteritems() if value != 0.00]需要将每个 tasklet 分解为 try/except 以检测这些错误。

    在我为此生气之前,我很感激任何有关如何优化此过程并以可靠方式进行的指导/帮助。

    编辑 1:
    我修改了聚合器任务行为如下:
  • 如果从队列中租用了 1 个以上的任务,则将这些任务聚合在内存中,然后将结果存储在拉队列中的另一个任务中,并立即启动另一个“聚合器任务”
  • 否则,如果租用了 1 个任务,则尝试保存结果

  • 这有助于减少我一直看到的争用错误,但它仍然不是很可靠。最近,我打了 BadRequestError: Nested transactions are not supported.堆栈跟踪指示 RuntimeError: Deadlock waiting for <Future fbf0db50 created by transaction_async(model.py:3345) for tasklet transaction(context.py:806) suspended generator transaction(context.py:876); pending>
    我相信这种修改应该通过允许聚合过程中所有可能的重叠组合并在单个实例中一次全部尝试来优化过程,而不是多个实例都执行可能发生冲突的事务。我仍然无法以可靠的方式保存结果。

    最佳答案

    通过减少数据存储 I/O(将工作留给自动批处理程序并禁用索引),您可以更加确定数据存储写入完成(更少争用)并且它应该更快。

    配置(重命名的计数器)位于事务之外,并且可以在循环遍历事务的同时并发运行。

    将方法和总属性添加到 Counter 以(希望)使将来更容易修改。

    为十进制支持创建了一个新的 ndb 属性(假设这就是您指定 0.00 而不是 0.0 的原因)。

    编辑:

    消除了对交易的需求并更改了分片系统以获得可靠性。

    import webapp2
    
    import copy
    import decimal
    import logging
    import random
    import string
    
    from google.appengine.api import datastore_errors
    from google.appengine.datastore import entity_pb
    from google.appengine.ext import deferred
    from google.appengine.ext import ndb
    
    
    TEST_BATCH_SIZE = 250
    TEST_NAME_LEN = 12
    
    
    class DecimalProperty(ndb.Property):
        """A Property whose value is a decimal.Decimal object."""
    
        def _datastore_type(self, value):
          return str(value)
    
        def _validate(self, value):
          if not isinstance(value, decimal.Decimal):
            raise datastore_errors.BadValueError('Expected decimal.Decimal, got %r'
                                                 % (value,))
          return value
    
        def _db_set_value(self, v, p, value):
            value = str(value)
            v.set_stringvalue(value)
            if not self._indexed:
                p.set_meaning(entity_pb.Property.TEXT)
    
        def _db_get_value(self, v, _):
            if not v.has_stringvalue():
                return None
            value = v.stringvalue()
            return decimal.Decimal(value)
    
    class BatchInProgress(ndb.Model):
        """Use a scheduler to delete batches in progress after a certain time"""
    
        started = ndb.DateTimeProperty(auto_now=True)
    
        def clean_up(self):
            qry = Shard.query().filter(Shard.batch_key == self.key)
            keys = qry.fetch(keys_only=True)
            while keys:
                ndb.delete_multi(keys)
                keys = qry.fetch(keys_only=True)
    
    def cleanup_failed_batch(batch_key):
        batch = batch_key.get()
    
        if batch:
            batch.clean_up()
            batch.delete()
    
    class Shard(ndb.Model):
        """Shards for each named counter"""
    
        counter_key = ndb.KeyProperty(name='c')
        batch_key = ndb.KeyProperty(name='b')
        count = DecimalProperty(name='v', default=decimal.Decimal('0.00'),
                                indexed=False)
    
    class Counter(ndb.Model):
        """Tracks the number of shards for each named counter"""
    
        @property
        def shards(self):
            qry = Shard.query().filter(Shard.counter_key == self.key)
            results = qry.fetch(use_cache=False, use_memcache=False)
            return filter(None, results)
    
        @property
        def total(self):
            count = decimal.Decimal('0.00') # Use initial value if no shards
    
            for shard in self.shards:
                count += shard.count
    
            return count
    
        @ndb.tasklet
        def incr_async(self, value, batch_key):
            index = batch_key.id()
            name = self.key.id() + str(index)
    
            shard = Shard(id=name, count=value,
                          counter_key=self.key, batch_key=batch_key)
    
            yield shard.put_async(use_cache=False, use_memcache=False)
    
        def incr(self, *args, **kwargs):
            return self.incr_async(*args, **kwargs).get_result()
    
    @ndb.tasklet
    def increment_batch(data_set):
        batch_key = yield BatchInProgress().put_async()
        deferred.defer(cleanup_failed_batch, batch_key, _countdown=3600)
    
        # NOTE: mapping is modified in place, hence copying
        mapping = copy.copy(data_set)
    
        # (1/3) filter and fire off counter gets
        #       so the futures can autobatch
        counters = {}
        ctr_futs = {}
        ctr_put_futs = []
        zero_values = set()
        for name, value in mapping.iteritems():
            if value != decimal.Decimal('0.00'):
                ctr_fut = Counter.get_by_id_async(name) # Use cache(s)
                ctr_futs[name] = ctr_fut
            else:
                # Skip zero values because...
                zero_values.add(name)
                continue
    
        for name in zero_values:
            del mapping[name] # Remove all zero values from the mapping
        del zero_values
    
        while mapping: # Repeat until all transactions succeed
    
            # (2/3) wait on counter gets and fire off increment transactions
            #       this way autobatchers should fill time
            incr_futs = {}
            for name, value in mapping.iteritems():
                counter = counters.get(name)
                if not counter:
                    counter = counters[name] = yield ctr_futs.pop(name)
                if not counter:
                    logging.info('Creating new counter %s', name)
                    counter = counters[name] = Counter(id=name)
                    ctr_put_futs.append(counter.put_async())
                else:
                    logging.debug('Reusing counter %s', name)
                incr_fut = counter.incr_async(value, batch_key)
                incr_futs[(name, value)] = incr_fut
    
            # (3/3) wait on increments and handle errors
            #       by using a tuple key for variable access
            for (name, value), incr_fut in incr_futs.iteritems():
                counter = counters[name]
                try:
                    yield incr_fut
                except:
                    pass
                else:
                    del mapping[name]
    
            if mapping:
                logging.warning('%i increments failed this batch.' % len(mapping))
    
        yield batch_key.delete_async(), ctr_put_futs
    
        raise ndb.Return(counters.values())
    
    class ShardTestHandler(webapp2.RequestHandler):
    
        @ndb.synctasklet
        def get(self):
            if self.request.GET.get('delete'):
                ndb.delete_multi_async(Shard.query().fetch(keys_only=True))
                ndb.delete_multi_async(Counter.query().fetch(keys_only=True))
                ndb.delete_multi_async(BatchInProgress.query().fetch(keys_only=True))
            else:
                data_set_test = {}
                for _ in xrange(TEST_BATCH_SIZE):
                    name = ''
                    for _ in xrange(TEST_NAME_LEN):
                        name += random.choice(string.letters)
                    value = decimal.Decimal('{0:.2f}'.format(random.random() * 100))
                    data_set_test[name] = value
                yield increment_batch(data_set_test)
            self.response.out.write("Done!")
    
    app = webapp2.WSGIApplication([('/shard_test/', ShardTestHandler)], debug=True)
    app = ndb.toplevel(app.__call__)
    

    关于python - 执行数千个独立事务的可靠方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11251167/

    相关文章:

    python - 分割字符串时丢失编码

    python - 如何通过 Python 连接到 VPN/代理服务器?

    python - 获取列表之间第一个和最后一个公共(public)元素索引的最快方法

    python - 在 Tkinter 中如何将被调用函数作为参数传递?

    python - 如何在Python Google App Engine Sdk中配置flex crossdomain.xml

    google-app-engine - GeoPtProperty有什么样的优势?

    python - 在 python 中绘制二元 3D 矩阵

    google-app-engine - `gcloud preview app deploy app.yaml` 部署除 Docker 镜像之外的所有源文件。这有必要吗

    java - GAE 数据存储 - 如何同时处理多个命名空间

    java - 如果嵌入了子对象,则只能引用子对象的属性