java - 使用kafka+java流式处理数据

标签 java oracle apache-kafka

我在编程方面相对较新(您可以在我的代码中看到),但我目前正在学习更多有关数据处理的 kafka 和 java。对于主题中的数据,我需要与一些表进行连接以检查数据是否存在并获取其他数据,所以我对数据库做了一些请求(要检索的字段太多,我需要单独的查询以使其可读)。对于每条记录从主题中检索我与数据库建立了一些连接,然后(处理数据后)更新表(我对表进行批处理,只有这样很快)。
我的问题是时间。我用二十万个寄存器做测试……需要半个小时和六千个,太慢了。我的代码是这样的

public class TestKafka {  
    public static Connection conexion = null;  
    public static void main(){  
        conexion = C3P0DataSource.getInstance().getConnection();  
        runConsumer();  
    }  
    .  
    ..  
    public static void runConsumer(){  
    try // ( Connection conexion C3P0DataSource.getInstance().getConnection();)
    {  
        conexion.setAutoCommit(false);  
        while (true) {  // with kafka connector - I try to simulate data streaming
            final ConsumerRecords<String, String> consumerRecords = consumer.poll(Long.MAX_VALUE);  
            List<Map<String, String>> recordData = new ArrayList<Map<String, String>>();  
            ObjectMapper mapper = new ObjectMapper();  
            for (ConsumerRecord<String, String> record : consumerRecords) {  
                Map<String, String> map = new HashMap<String, String>();  
                DataStructure_Topic config = mapper.readValue(record.value(), DataStructure_Topic.class);  
                map.put("row_id_1", config.getCodent());  
                map.put("row_id_2", config.getCentalta());  
                map.put("row_id_3", config.getCuenta());  
                datosAComprobar.add(map);  
                recordData = firstConsult(recordData, conexion);  
                if (recordData.size() > 0) {  
                    recordData = SecondConsult(recordData, conexion);  
                    // few petitions to the database  
                    if (recordData.size() > 0) {  
                        // ..data processing.. and update
                    }  
                }  
                datosAComprobar.clear();  
            }  
            consumer.commitSync();  
            Thread.sleep(100);  
        }  
    } catch(){...}  
}  

请求数据库(每个查询的结构相同):

public static List<Map<String, String>> FirstConsult(List<Map<String, String>> MyList, Connection conn) {  
    PreparedStatement stmt = null;  
    ResultSet rs = null;  
    List<Map<String, String>> list = new ArrayList<Map<String, String>>();  
    String query = "";  
    int contador = 1;  
    for (Map<String, String> val : MyList) {  
    query += "select " + val.get("row1") + " as r1, " + val.get("row2") + " as row2,"+val.get("cuenta")+"from table_a inner join table_b...."  
        if (contador < MyList.size()) {  
            query += "\r\nunion\r\n";  
        }  
        contador += 1;  
    }  
    try {  
        stmt = conn.prepareStatement(query);  
        rs = stmt.executeQuery();  
        ResultSetMetaData rsmd = rs.getMetaData();  
        int columnsNumber = rsmd.getColumnCount();  
        if (rs.next()) {  
            do {  
                Map<String, String> map = new HashMap<String, String>();  
                for (int i = 1; i <= columnsNumber; i++) {  
                    String columnValue = rs.getString(i);  
                    String columnName = rsmd.getColumnName(i);  
                    map.put(columnName, columnValue);  
                }  
                if (!map.isEmpty()) {  
                    list.add(map);  
                }  
            } while (rs.next());  
        }  
    } catch(e){...} finally {  
            try {  
                if(rs != null) rs.close();  
                if (stmt != null) stmt.close();  
            } catch (SQLException e) {...}  
    }  
    return list;  
}  

如何改进我的代码或至少改进与数据库的连接以获得更好的时间...?当我加载更多记录时,它会变慢。我需要关闭连接吗?我关闭所有语句和结果集 ...

最佳答案

如您所见,这不是一种有效的做事方式。一种常见的模式是将数据库导入 Kafka 并在那里完成工作,而不是查找数据库。

您可以将数据库表提取到 Kafka topic using CDC 中然后使用 Kafka Streams 或 ksqlDB 等流处理技术将原始 Kafka 主题与从数据库填充的新 Kafka 主题中的必要数据连接起来。 This talk here展示它的实际效果。

关于java - 使用kafka+java流式处理数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58991161/

相关文章:

java - 获取 XML 元素之间文本的正则表达式

python - 使用 SQLAlchemy 将值列表传递给 Oracle 函数

Oracle 综合指数性能

apache-kafka - 在 HDP Sandbox 2.6.5 中从 localhost 生成到 Kafka 不起作用

amazon-web-services - Kafka Producer 中的 SSL 握手失败

java - Java 是 "pass-by-reference"还是 "pass-by-value"?

java - 具有共享 key (MapsId)和延迟加载问题的 OneToOne

java - Maven 找不到 aws s3 sdk

docker - 从本地 Docker For Mac 中部署的服务访问本地 Kafka(包括 Kubernetes 扩展)

java - 如何在 Java 中为 Oracle 数据库创建只读连接