我是 Cassandra 和 Spark 的新手。我正在尝试为我的 Spark 作业设置测试,它执行以下操作:
- 将表 A 中的数据加载到 DataFrame 中
- 对这些 DataFrame 进行过滤、分组和聚合
- 将结果载入表B
我想使用嵌入式 Cassandra 服务器来运行测试,而不是让它连接到 Cassandra 数据库的本地实例。有没有人这样做过?如果是这样,有人可以给我指出一个很好的例子吗?提前感谢您的帮助!
最佳答案
this code does
package cassspark.clt;
import java.io.*;
import javafx.application.Application;
import java.util.concurrent.Executors ;
import java.util.concurrent.ExecutorService;
import org.apache.cassandra.service.CassandraDaemon;
import com.datastax.driver.core.exceptions.ConnectionException;
import java.util.Properties;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.sql.SparkSession;
public class EmbeddedCassandraDemo extends Application {
private ExecutorService executor = Executors.newSingleThreadExecutor();
private CassandraDaemon cassandraDaemon;
public EmbeddedCassandraDemo() {
}
public static void main(String[] args) {
try {
new EmbeddedCassandraDemo().run();
}
catch(java.lang.InterruptedException e)
{
;
}
}
@Override public void start(javafx.stage.Stage stage) throws Exception
{
stage.show();
}
private void run() throws InterruptedException, ConnectionException {
setProperties();
activateDeamon();
}
private void activateDeamon() {
executor.execute( new Runnable() {
@Override
public void run() {
cassandraDaemon = new CassandraDaemon();
cassandraDaemon.activate();
SparkSession spark = SparkSession .builder().master("local").appName("ASH").getOrCreate();
}
});
}
private void setProperties() {
final String yaml = System.getProperty("user.dir") + File.separator +"conf"+File.separator+"cassandra.yaml";
final String storage = System.getProperty("user.dir") + File.separator +"storage" + File.separator +"data";
System.setProperty("cassandra.config", "file:"+ yaml );
System.setProperty("cassandra.storagedir", storage );
System.setProperty("cassandra-foreground", "true");
String log4JPropertyFile = "./conf/log4j.properties";
Properties p = new Properties();
try {
p.load(new FileInputStream(log4JPropertyFile));
PropertyConfigurator.configure(p);
} catch (IOException e) {
System.err.println("./conf/log4j.properties not found ");
System.exit(1);
;
}
}
}
关于java - 使用嵌入式 Cassandra 服务器测试 Cassandra-Spark 作业的 Java 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38705049/