python - 如何使用 pyspark 创建循环来迭代连接

标签 python loops join pyspark

我正在开发 Databricks,我有一个包含 BOM( Material list )列表的数据框:数据框的结构如下面的示例所示,其中标识符是“父”产品的代码(成品),组件是“子”产品(可以是半成品或原 Material )的代码。如果该组件是半成品,您也可以在标识符列表中找到它及其组件(但我在数据框中没有指示告诉我产品的类型,是成品、半成品还是原始产品)。

  identifier   component
  xxxx         yyyy
  xxxx         zzzz
  xxxx         aaaa
  aaaa         bbbb
  aaaa         cccc
  bbbb         dddd
  bbbb         eeee
  cccc         ffff
  cccc         mmmm
  ffff         aaaa
  ffff         gggg
  ffff         hhhh
  hhhh         iiii
  hhhh         jjjj

在上面的示例中,有两个最终产品(xxxx 和 ffff)。 xxxx 具有 yyyy、zzzz 和 aaaa 组件。 aaaa 应该是半成品,因为它也列在标识符栏中,并且由 cccc 和 dddd 组成。 cccc 和 dddd 也是半成品,因为它们列在标识符栏中,并且由 dddd、eeee、ffff 和 mmmm 组成(应该是原 Material ,因为它们不在标识符栏中)。第二个最终产品ffff由aaaa(半成品,也用于xxxx)、gggg(原 Material )和hhhh(半成品,由原 Material iiii和jjjj组成)组成。 我必须按照企业提供的仅包含最终产品的产品列表来过滤此数据框:假设我必须仅选择 xxxx(ffff 不在列表中)。问题是,如果我过滤 xxxx,我会丢失与半成品相关的信息(如果我过滤数据帧,仅选择标识符 xxxx,我会获得 3 行,但我要做的是找到一种方法来保留 aaaa, bbbb 和 cccc,以及它们每个的组件的详细信息)。所以最终过滤后的数据框应该是

    identifier   component
  xxxx         yyyy
  xxxx         zzzz
  xxxx         aaaa
  aaaa         bbbb
  aaaa         cccc
  bbbb         dddd
  bbbb         eeee
  cccc         ffff
  cccc         mmmm

我试图找出如何用循环解决这个问题(真正的数据帧显然更大,大约13000行),但我找不到一个好的起点(我不是Python专家)。有没有人提供一些好的建议、文档或片段作为起点?

最佳答案

if I filter the dataframe selecting only identifier xxxx I obtain 3 rows, but what I have to do is to find a way to keep also aaaa, bbbb and cccc

你不是说,你必须找到一种方法来保留 yyyyzzzzaaaa,因为这些是xxxx 依赖的产品?澄清这个问题并解释一下如何获得最终的过滤数据帧(我得到了不同的结果),我很乐意提供帮助。

我仍然很乐意提供帮助,但我仍然不明白您是如何得出最终的过滤数据框的。

因此,您选择 xxxx 并收到三个关联的组件 yyyyzzzzaaaa。然后呢?您获得这三个组件的关联组件,然后递归地执行此操作,直到获得基础 Material ?但这并不能反射(reflect)最终过滤数据框中的内容。

编辑如果我正确理解了这个问题,这就是我想到的(我不经常使用 pandas,所以如果有 pandas 功能可以使这个更可爱):

def main():

    import pandas as pd
    from queue import Queue

    df = pd.read_csv("data.csv", names=["identifier", "component"])

    result_df = pd.DataFrame()

    selected_identifier = "xxxx"
    identifier_queue = Queue()
    identifier_queue.put(selected_identifier)
    previously_seen_identifiers = set()

    while not identifier_queue.empty():
        current_identifier = identifier_queue.get()
        if current_identifier in previously_seen_identifiers:
            continue
        previously_seen_identifiers.add(current_identifier)
        current_df = df.loc[df["identifier"] == current_identifier]
        result_df = result_df.append(current_df)
        components = current_df[["component"]]["component"].tolist()
        for component in components:
            identifier_queue.put(component)

    print(result_df)

    return 0


if __name__ == "__main__":
    import sys
    sys.exit(main())

其工作原理如下:

  • 创建一个标识符队列(也可以是一个堆栈),该队列仍然 需要进行处理。最初要处理的唯一标识符 我们选择的最终产品(在本例中为xxxx)。
  • 当队列不为空时,获取下一个标识符并将其删除 从队列中,从原始数据帧创建一个数据帧 仅包含当前标识符所在的组件 取决于,并将此子数据帧附加到 result_df 数据框。在开始下一次迭代之前,我们采用关联的 当前标识符所依赖的组件并将它们添加到 我们的未处理标识符队列。
  • 在循环之前,我们还创建了一个 set() 来跟踪所有 到目前为止我们已经看到的标识符。在循环中,如果当前标识符 已经看到了,我们忽略它并继续下一个 标识符。之所以这样做是因为有些产品似乎 具有循环依赖关系。

请告诉我这是否是您的想法。

关于python - 如何使用 pyspark 创建循环来迭代连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58666808/

相关文章:

php - 将值检索到数组中

mysql - 从连接表中仅选择具有 MAX(date) 的行中的所有列

php - 从 MySQL 中的两个连接表中获取多个计数

Python - 用模拟替换类变量单例

python - 如何在 Pandas 的非时间索引上按值区间进行滑动窗口?

c++ - 输入和中断 C++ for 循环 : minimum of 10 iterations AND no empty string

php - 其中加入 1 : Many relationship?

Python3 'Cannot import name ' cached_property'

python - 如何将额外的数据和图像一起提供给 CNN?

java - 创建多个嵌套循环以生成两个遍历数组长度的数字