我在编程方面相对较新(您可以在我的代码中看到),但我目前正在学习更多有关数据处理的 kafka 和 java。对于主题中的数据,我需要与一些表进行连接以检查数据是否存在并获取其他数据,所以我对数据库做了一些请求(要检索的字段太多,我需要单独的查询以使其可读)。对于每条记录从主题中检索我与数据库建立了一些连接,然后(处理数据后)更新表(我对表进行批处理,只有这样很快)。
我的问题是时间。我用二十万个寄存器做测试……需要半个小时和六千个,太慢了。我的代码是这样的
public class TestKafka {
public static Connection conexion = null;
public static void main(){
conexion = C3P0DataSource.getInstance().getConnection();
runConsumer();
}
.
..
public static void runConsumer(){
try // ( Connection conexion C3P0DataSource.getInstance().getConnection();)
{
conexion.setAutoCommit(false);
while (true) { // with kafka connector - I try to simulate data streaming
final ConsumerRecords<String, String> consumerRecords = consumer.poll(Long.MAX_VALUE);
List<Map<String, String>> recordData = new ArrayList<Map<String, String>>();
ObjectMapper mapper = new ObjectMapper();
for (ConsumerRecord<String, String> record : consumerRecords) {
Map<String, String> map = new HashMap<String, String>();
DataStructure_Topic config = mapper.readValue(record.value(), DataStructure_Topic.class);
map.put("row_id_1", config.getCodent());
map.put("row_id_2", config.getCentalta());
map.put("row_id_3", config.getCuenta());
datosAComprobar.add(map);
recordData = firstConsult(recordData, conexion);
if (recordData.size() > 0) {
recordData = SecondConsult(recordData, conexion);
// few petitions to the database
if (recordData.size() > 0) {
// ..data processing.. and update
}
}
datosAComprobar.clear();
}
consumer.commitSync();
Thread.sleep(100);
}
} catch(){...}
}
请求数据库(每个查询的结构相同):
public static List<Map<String, String>> FirstConsult(List<Map<String, String>> MyList, Connection conn) {
PreparedStatement stmt = null;
ResultSet rs = null;
List<Map<String, String>> list = new ArrayList<Map<String, String>>();
String query = "";
int contador = 1;
for (Map<String, String> val : MyList) {
query += "select " + val.get("row1") + " as r1, " + val.get("row2") + " as row2,"+val.get("cuenta")+"from table_a inner join table_b...."
if (contador < MyList.size()) {
query += "\r\nunion\r\n";
}
contador += 1;
}
try {
stmt = conn.prepareStatement(query);
rs = stmt.executeQuery();
ResultSetMetaData rsmd = rs.getMetaData();
int columnsNumber = rsmd.getColumnCount();
if (rs.next()) {
do {
Map<String, String> map = new HashMap<String, String>();
for (int i = 1; i <= columnsNumber; i++) {
String columnValue = rs.getString(i);
String columnName = rsmd.getColumnName(i);
map.put(columnName, columnValue);
}
if (!map.isEmpty()) {
list.add(map);
}
} while (rs.next());
}
} catch(e){...} finally {
try {
if(rs != null) rs.close();
if (stmt != null) stmt.close();
} catch (SQLException e) {...}
}
return list;
}
如何改进我的代码或至少改进与数据库的连接以获得更好的时间...?当我加载更多记录时,它会变慢。我需要关闭连接吗?我关闭所有语句和结果集 ...
最佳答案
如您所见,这不是一种有效的做事方式。一种常见的模式是将数据库导入 Kafka 并在那里完成工作,而不是查找数据库。
您可以将数据库表提取到 Kafka topic using CDC 中然后使用 Kafka Streams 或 ksqlDB 等流处理技术将原始 Kafka 主题与从数据库填充的新 Kafka 主题中的必要数据连接起来。 This talk here展示它的实际效果。
关于java - 使用kafka+java流式处理数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58991161/