apache-spark - 在pyspark lambda映射函数中使用keras模型

标签 apache-spark tensorflow pyspark keras

我想使用该模型来预测 PySpark 中的映射 lambda 函数的分数。

def inference(user_embed, item_embed):
    feats = user_embed + item_embed
    dnn_model =  load_model("best_model.h5")
    infer = dnn_model.predict(np.array([feats]), verbose=0, steps=1)
    return infer
iu_score = iu.map(lambda x: Row(userid=x.userid, entryid=x.entryid, score = inference(x.user_embed, x.item_embed)))

运行速度极慢,代码运行后很快就卡在了最后阶段。

[Stage 119:==================================================>(4048 + 2) / 4050]

在 HTOP 监视器中,80 个核心中只有 2 个处于满负荷工作状态,其他核心似乎不工作。 那么我应该怎么做才能使模型并行预测呢? iu 是 3 亿,所以效率对我来说很重要。 谢谢。

enter image description here

我把verbose=1,就出现了预测日志,但是好像只是一一预测,而不是并行预测。

最佳答案

在回复过程中我做了一些研究,发现这个问题很有趣。 首先,如果效率真的很重要,请花一点时间在没有 Keres 的情况下重新编码整个事情。您仍然可以使用 tensorflow (模型)的高级 API,并且只需付出一点努力即可提取参数并将其分配给新模型。尽管从包装器框架中的大量实现来看还不清楚(TensorFlow是一个不够丰富的框架吗?),但在升级时很可能会遇到向后兼容性的问题。确实不建议用于生产。

话虽如此,您能否检查到底是什么问题,例如 - 您是否使用 GPU?也许他们重载了?您能否将整个事情包装起来不超过某些容量并使用优先级系统?如果没有优先级,您可以使用简单的队列。您还可以检查是否确实终止了tensorflow的 session ,或者同一台机器运行了许多干扰其他模型的模型。造成这种现象的原因有很多,如果能提供更多详细信息就太好了。

关于并行计算 - 您没有实现任何真正为该模型打开线程或进程的东西,所以我怀疑 pyspark 无法自行处理整个事情。也许实现(老实说我没有阅读整个 pyspark 文档)假设分派(dispatch)的函数运行得足够快并且没有按应有的方式分发。 PySpark 只是映射缩减原则的复杂实现。分派(dispatch)的函数在单个步骤中扮演映射函数的角色,这对于您的情况可能会出现问题。尽管它是作为 lambda 表达式传递的,但您应该更仔细地检查哪些实例速度较慢,以及它们在哪些计算机上运行。

我强烈建议您执行以下操作:
前往Tensorflow deplot official docs并阅读如何真正部署模型。有一个用于与已部署模型进行通信的协议(protocol),称为RPC,还有一个 Restful API。然后,使用 pyspark,您可以包装调用并与所提供的模型连接。你可以创建一个你想要的模型池,在 pyspark 中管理它,通过网络分配计算,从这里开始,天空和 cpus/gpus/tpus 是限制(我仍然对天空持怀疑态度)。

很高兴能从您那里得到有关结果的最新信息:)您让我很好奇。

我希望您能顺利解决这个问题,这是一个很好的问题。

关于apache-spark - 在pyspark lambda映射函数中使用keras模型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52366318/

相关文章:

python - 如何在 tf.data.Dataset 中输入不同大小的列表列表

tensorflow - 具有 data_formatchannels_first 的 Conv1D 在 Keras 上产生错误

pyspark - 如果不存在,则将列添加到 pyspark 数据框

java - 在dse4.6中运行spark程序

scala - 如何列出 Spark Scala shell 中 HDFS 位置中的所有 csv 文件?

python - Pyspark RDD 的最大文件大小

TensorFlow Bazel 构建失败

apache-spark - 如何在可能为空的列上使用 PySpark CountVectorizer

python - 如何在 pyspark 中重命名数据框的列?

apache-spark - Pyspark pandas_udf 文档代码的错误 :'java.lang.UnsupportedOperationException'