python - 等效于 Python 和 PySpark 中的 R data.table 滚动连接

标签 python r pyspark data.table pyspark-sql

有谁知道如何在 PySpark 中执行 R data.table 滚动连接?

从 Ben here 借用滚动连接的例子和很好的解释;

sales<-data.table(saleID=c("S1","S2","S3","S4","S5"), 
              saleDate=as.Date(c("2014-2-20","2014-5-1","2014-6-15","2014-7- 1","2014-12-31")))

commercials<-data.table(commercialID=c("C1","C2","C3","C4"), 
                    commercialDate=as.Date(c("2014-1-1","2014-4-1","2014-7-1","2014-9-15")))

setkey(sales,"saleDate")
setkey(commercials,"commercialDate")

sales[commercials, roll=TRUE]

结果是;

saleDate saleID commercialID
1: 2014-01-01     NA    C1
2: 2014-04-01     S1    C2
3: 2014-07-01     S4    C3
4: 2014-09-15     S4    C4

非常感谢您的帮助。

最佳答案

滚动连接不是 join + fillna

首先,滚动连接joinfillna 相同!只有当所连接的表的键(就 data.table 而言,这将是左表和右连接)在主表中具有等效项时才会出现这种情况。 data.table 滚动连接不需要这个。

据我所知并搜索了很长时间,没有直接的等价物。它甚至有一个问题https://github.com/pandas-dev/pandas/issues/7546 .然而:

Pandas 解决方案:

虽然在 pandas 中有一个解决方案。假设你右边的 data.table 是表 A,你左边的 data.table 是表 B。

  1. 分别按键对表 A 和表 B 进行排序。
  2. 向A添加一列tag,全为0,向B添加一列tag,全为1。
  3. 从B中删除除key和tag以外的所有列(可以省略,但这样更清晰),调用表B'。将 B 保留为原件 - 我们稍后会用到它。
  4. 将 A 和 B' 连接到 C,并忽略 B' 中的行有很多 NA 的事实。
  5. 按键对 C 排序。
  6. 使用 C = C.assign(groupNr = np.cumsum(C.tag)) 创建一个新的 cumsum 列
  7. tag 上使用过滤(query)去除所有 B'-rows。
  8. 向原始 B 添加一个运行计数器列 groupNr(从 0 到 N-1 或从 1 到 N 的整数,具体取决于您是要向前滚动连接还是向后滚动连接)。
  9. groupNr 上加入 B 和 C。

编程代码

#0. 'date' is the key for the rolling join. It does not have to be a date.
A = pd.DataFrame.from_dict(
    {'date': pd.to_datetime(["2014-3-1", "2014-5-1", "2014-6-1", "2014-7-1", "2014-12-1"]),
     'value': ["a1", "a2", "a3", "a4", "a5"]})
B = pd.DataFrame.from_dict(
    {'date': pd.to_datetime(["2014-1-15", "2014-3-15", "2014-6-15", "2014-8-15", "2014-11-15", "2014-12-15"]),
     'value': ["b1", "b2", "b3", "b4", "b5", "b6"]})

#1. Sort the table A and and B each by key.
A = A.sort_values('date')
B = B.sort_values('date')

#2. Add a column tag to A which are all 0 and a column tag to B that are all 1.
A['tag'] = 0
B['tag'] = 1

#3. Delete all columns except the key and tagfrom B (can be omitted, but it is clearer this way) and call the table B'. Keep B as an original - we are going to need it later.
B_ = B[['date','tag']] # You need two [], because you get a series otherwise.

#4. Concatenate A with B' to C and ignore the fact that the rows from B' has many NAs.
C = pd.concat([A, B_])

#5. Sort C by key.
C = C.sort_values('date')

#6. Make a new cumsum column with C = C.assign(groupNr = np.cumsum(C.tag))
C = C.assign(groupNr = np.cumsum(C.tag))

#7. Using filtering (query) on tag get rid of all B'-rows.
C = C[C.tag == 0]

#8. Add a running counter column groupNr to the original B (integers from 0 to N-1 or from 1 to N, depending on whether you want forward or backward rolling join).
B['groupNr'] = range(len(B)+1)[1:] # B's values are carried forward to A's values
B['groupNr'] = range(len(B))       # B's values are carried backward to A's values

#9. Join B with C on groupNr to D.
D = C.set_index('groupNr').join(B.set_index('groupNr'), lsuffix='_A', rsuffix='_B')

关于python - 等效于 Python 和 PySpark 中的 R data.table 滚动连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38896648/

相关文章:

python - 在共享内存中使用 numpy 数组进行多处理

R Shiny 从 numericInput 数据表列中提取值

apache-spark - Pyspark 在 groupby 中创建字典

绝对初学者的Python编程: chapter 3 ERROR

python - python 中两个程序的集成

r - 使用Extrafont程序包检索Windows字体后,Fonts表仍然为空

Pyspark - 排名列保持联系

azure - Pyspark - 获取列表 azure synapse 工作区中容器文件夹的所有内容并存储该数据

python - Python 2.x 中的 True=False 赋值

r - R:遍历向量时使用元素的位置信息。