java - Flink : Left joining a stream with a static list

标签 java kotlin apache-flink flink-streaming flink-sql

我想将尝试流加入到被阻止电子邮件的静态列表中,并按 IP 对结果进行分组,以便以后可以计算一组相关统计数据。结果应以每 10 秒后 30 分钟的滑动窗口形式提供。以下是我尝试实现此目的的几种方法之一:

override fun performQuery(): Table {
    val query = "SELECT ip, " +
        "COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
        "COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
        "COUNT(DISTINCT id) accounts, " +
        "COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
        "COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
        "FROM Attempts " +
        "LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
        "WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
        "GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"

    return runQuery(query)
        .select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}

这使用了下面的用户定义表函数,它已经在我的 tableEnv 中注册。如blockedEmailsList :
public class BlockedEmailsList extends TableFunction<Row> {
    private Collection<String> emails;

    public BlockedEmailsList(Collection<String> emails) {
        this.emails = emails;
    }

    public Row read(String email) {
        return Row.of(email);
    }

    public void eval() {
        this.emails.forEach(email -> collect(read(email)));
    }
}

但是,它返回以下错误:
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.


如果我按照它的建议进行操作并施放 created_atTIMESTAMP ,我得到了这个:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.

我在 Stack Overflow 上发现了与这些异常相关的其他问题,但它们涉及流和临时表,并且它们都没有解决将流加入静态列表的情况。

有任何想法吗?

编辑:看起来我的用例的 Flink 项目中有一个 Unresolved 问题:https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API

因此,我也接受解决方法建议。

最佳答案

Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
原因是横向表函数是 Flink 正则连接,正则连接会发送空值,例如
left:(K0, A), right(K1, T1)  => send    (K0, A, NULL, NULL)
left:         , right(K0, T2) => retract (K0, A, NULL, NULL )  
                                   send   (K0, A, K0, T2)
因此输入流中的时间属性将在加入后丢失。
在您的情况下,您不需要 TableFunction,您可以使用标量函数
像:
 public static class BlockedEmailFunction extends ScalarFunction {
     private static List<String> blockedEmails = ...;
     public Boolean eval(String email) {
        return blockedEmails.contains(attempt.getEmail());
     }
 }


// register function
env.createTemporarySystemFunction("blockedEmailFunction", BlockedEmailFunction.class);

// call registered function in SQL and do window operation as your expected
env.sqlQuery("SELECT blockedEmailFunction(email) as status, ip, createdAt FROM Attempts");
 

关于java - Flink : Left joining a stream with a static list,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61111592/

相关文章:

java - 如果之前进行过搜索,则带有缓存的搜索建议 (Java)

android - BillingClient.BillingClientStateListener.onBillingSetupFinished 被多次调用

java - Apache Flink 与 Hadoop 2.8.0 结合使用进行 S3A 路径样式访问

frameworks - Apache Flink 作业中的多流支持

java - 在 Office 365 REST API 中对事件使用扩展属性

绘图程序的 Java parseInt 错误

java - 如何通过检索我上传到 firebase 数据库或存储的 Image、TextView 来设置 Activity 的 ImageView、Text View ?

kotlin - 如何仅在需要时读取Maven存储库凭据?

android - 使用 androidx 获取 fragment 内的 actionBar

apache-spark - 在无限流中计数不同