我在 AWS 上设置了 Cassandra 集群。我想要得到的是随着更多节点的添加(如宣传的那样),I/O 吞吐量(每秒读/写次数)会增加。然而,我得到的却恰恰相反。随着新节点的添加,性能会降低。
您知道阻止其扩展的任何典型问题吗?
以下是一些详细信息:
我正在向列族添加一个文本文件 (15MB)。每一行都是一个记录。有150000条记录。当有1个节点时,写入大约需要90秒。但是当有2个节点时,需要120秒。我可以看到数据分布到 2 个节点。但是,吞吐量没有增加。
源代码如下:
public class WordGenCAS {
static final String KEYSPACE = "text_ks";
static final String COLUMN_FAMILY = "text_table";
static final String COLUMN_NAME = "text_col";
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.out.println("Usage: WordGenCAS <input file> <host1,host2,...>");
System.exit(-1);
}
String[] contactPts = args[1].split(",");
Cluster cluster = Cluster.builder()
.addContactPoints(contactPts)
.build();
Session session = cluster.connect(KEYSPACE);
InputStream fis = new FileInputStream(args[0]);
InputStreamReader in = new InputStreamReader(fis, "UTF-8");
BufferedReader br = new BufferedReader(in);
String line;
int lineCount = 0;
while ( (line = br.readLine()) != null) {
line = line.replaceAll("'", " ");
line = line.trim();
if (line.isEmpty())
continue;
System.out.println("[" + line + "]");
String cqlStatement2 = String.format("insert into %s (id, %s) values (%d, '%s');",
COLUMN_FAMILY,
COLUMN_NAME,
lineCount,
line);
session.execute(cqlStatement2);
lineCount++;
}
System.out.println("Total lines written: " + lineCount);
}
}
数据库架构如下:
CREATE KEYSPACE text_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };
USE text_ks;
CREATE TABLE text_table (
id int,
text_col text,
primary key (id)
) WITH COMPACT STORAGE;
谢谢!
最佳答案
即使这是一篇旧帖子,我认为也值得为这些(常见)问题发布解决方案。
正如您已经发现的,使用串行过程加载数据速度很慢。您所建议的做法是正确的。
但是,在不施加某种背压的情况下发出大量查询可能会带来麻烦,并且您会由于服务器(以及在某种程度上驱动程序)过度过载而丢失数据。
此解决方案将通过异步调用加载数据,并尝试对客户端施加一些反压以避免数据丢失。
public class WordGenCAS {
static final String KEYSPACE = "text_ks";
static final String COLUMN_FAMILY = "text_table";
static final String COLUMN_NAME = "text_col";
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.out.println("Usage: WordGenCAS <input file> <host1,host2,...>");
System.exit(-1);
}
String[] contactPts = args[1].split(",");
Cluster cluster = Cluster.builder()
.addContactPoints(contactPts)
.build();
Session session = cluster.connect(KEYSPACE);
InputStream fis = new FileInputStream(args[0]);
InputStreamReader in = new InputStreamReader(fis, "UTF-8");
BufferedReader br = new BufferedReader(in);
String line;
int lineCount = 0;
// This is the futures list of our queries
List<Future<ResultSet>> futures = new ArrayList<>();
// Loop
while ( (line = br.readLine()) != null) {
line = line.replaceAll("'", " ");
line = line.trim();
if (line.isEmpty())
continue;
System.out.println("[" + line + "]");
String cqlStatement2 = String.format("insert into %s (id, %s) values (%d, '%s');",
COLUMN_FAMILY,
COLUMN_NAME,
lineCount,
line);
lineCount++;
// Add the "future" returned by async method the to the list
futures.add(session.executeAsync(cqlStatement2));
// Apply some backpressure if we issued more than X query.
// Change X to another value suitable for your cluster
while (futures.size() > 1000) {
Future<ResultSet> future = futures.remove(0);
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
}
System.out.println("Total lines written: " + lineCount);
System.out.println("Waiting for writes to complete...");
// Wait until all writes are done.
while (futures.size() > 0) {
Future<ResultSet> future = futures.remove(0);
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("Done!");
}
}
关于Cassandra 因节点增多而变慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23977104/