java - 使用嵌入式 Cassandra 服务器测试 Cassandra-Spark 作业的 Java 示例

标签 java apache-spark cassandra

我是 Cassandra 和 Spark 的新手。我正在尝试为我的 Spark 作业设置测试,它执行以下操作:

  1. 将表 A 中的数据加载到 DataFrame 中
  2. 对这些 DataFrame 进行过滤、分组和聚合
  3. 将结果载入表B

我想使用嵌入式 Cassandra 服务器来运行测试,而不是让它连接到 Cassandra 数据库的本地实例。有没有人这样做过?如果是这样,有人可以给我指出一个很好的例子吗?提前感谢您的帮助!

最佳答案

this code does

package cassspark.clt;

import java.io.*;
import javafx.application.Application;
import java.util.concurrent.Executors ;
import java.util.concurrent.ExecutorService;
import org.apache.cassandra.service.CassandraDaemon;
import com.datastax.driver.core.exceptions.ConnectionException;
import java.util.Properties;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.sql.SparkSession;

public class EmbeddedCassandraDemo extends Application {

    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private CassandraDaemon cassandraDaemon;

    public EmbeddedCassandraDemo() {
    }

    public static void main(String[] args) {
        try {
            new EmbeddedCassandraDemo().run();
        }
        catch(java.lang.InterruptedException e)
        {
            ;
        }
    }

    @Override public void start(javafx.stage.Stage stage) throws Exception
    {
        stage.show();
    }

    private void run() throws InterruptedException, ConnectionException {
        setProperties();
        activateDeamon();
    }

    private void activateDeamon() {
        executor.execute( new Runnable() {

            @Override
            public void run() {
                cassandraDaemon = new CassandraDaemon();
                cassandraDaemon.activate();
                SparkSession spark = SparkSession .builder().master("local").appName("ASH").getOrCreate();
            }
        });
    }

    private void setProperties() {

        final String yaml = System.getProperty("user.dir") + File.separator +"conf"+File.separator+"cassandra.yaml";
        final String storage = System.getProperty("user.dir") + File.separator +"storage" + File.separator +"data";

        System.setProperty("cassandra.config", "file:"+ yaml );
        System.setProperty("cassandra.storagedir", storage );
        System.setProperty("cassandra-foreground", "true");

        String log4JPropertyFile = "./conf/log4j.properties";
        Properties p = new Properties();
        try {
            p.load(new FileInputStream(log4JPropertyFile));
            PropertyConfigurator.configure(p);
        } catch (IOException e) {
            System.err.println("./conf/log4j.properties not found ");
            System.exit(1);
            ;
        }
    }
}

关于java - 使用嵌入式 Cassandra 服务器测试 Cassandra-Spark 作业的 Java 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38705049/

相关文章:

java - Apache Spark 聚合键的工作原理

java - SQLContext 未创建 : NoClassDefFoundError: org/apache/spark/sql/catalyst/rules/RuleExecutor

java - mouseEntered 不起作用

Java:泛型不适用于我的方法,我还能做什么?

java - 如何设置 OnClickListener (Android)

java - 列删除会增加 Cassandra 中的读取延迟吗?

Cassandra读取一致性一,受其他节点影响?

java - Spring4 MVC单元测试无法编译

scala - Spark Structured Streaming 从查询异常中恢复

java - 我们如何将 com.datastax.driver.core.LocalDate 转换为 java.util.Date?