java - 使用 Spark 和 RDD 映射 cassandra 数据库的表

标签 java mapreduce apache-spark rdd

我必须映射一个表,其中写入应用程序的使用历史记录。该表有这些元组:

<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>
<AppId,date,cpuUsage,memoryUsage>

AppId总是不同的,因为在许多应用程序中都引用了 date以这种格式表示dd/mm/yyyy hh/mm cpuUsagememoryUsage表示为%例如:

<3ghffh3t482age20304,230720142245,0.2,3,5>

我以这种方式从 cassandra 检索数据(小片段):

public static void main(String[] args) {
        Cluster cluster;
        Session session;
        cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
        session = cluster.connect();
        session.execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication "
                + "= {'class':'SimpleStrategy', 'replication_factor':3};");
        String createTableAppUsage = "CREATE TABLE IF NOT EXISTS foo.appusage"
                + "(appid text,date text, cpuusage double, memoryusage double, "
                + "PRIMARY KEY(appid,date) " + "WITH CLUSTERING ORDER BY (time ASC);";
        session.execute(createTableAppUsage);
        // Use select to get the appusage's table rows
        ResultSet resultForAppUsage = session.execute("SELECT appid,cpuusage FROM foo.appusage");
       for (Row row: resultForAppUsage)
             System.out.println("appid :" + row.getString("appid") +" "+ "cpuusage"+row.getString("cpuusage"));
        // Clean up the connection by closing it
        cluster.close();
    }

所以,我现在的问题是按 key value 映射数据并创建一个集成此代码的元组(片段不起作用):

        <AppId,cpuusage>

        JavaPairRDD<String, Integer> saveTupleKeyValue =someStructureFromTakeData.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String x) {
                return new Tuple2(x, y);
            }

我如何使用RDD和reduce映射appId和cpuusage eg. cpuusage >50

有什么帮助吗?

提前致谢。

最佳答案

假设您已创建有效的 SparkContext sparkContext,已将 Spark-cassandra 连接器依赖项添加到您的项目中,并配置您的 Spark 应用程序以与您的 cassandra 集群通信(请参阅 docs 了解这一点) ),然后我们可以像这样将数据加载到 RDD 中:

val data = sparkContext.cassandraTable("foo", "appusage").select("appid", "cpuusage")

在 Java 中,想法是相同的,但它需要更多的管道,如here所述。

关于java - 使用 Spark 和 RDD 映射 cassandra 数据库的表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30732582/

相关文章:

scala - 在 apache spark 生产场景中处理 Skew 数据

java - 特定组的 TestNG @AfterMethod 在 Eclipse 中的每个方法之后运行

java - 简单的 Java GIS 编辑器

scala - 未找到 HIVE 表

java - mongodb使用java驱动程序比较两个不同的字段

python - mrjob:在 EMR 上设置日志记录

scala - 每 5 小时间隔查找最小值

java - HttpsURLConnection:使用 IDE JDK 可以进行身份​​验证,但不能使用 JRE!这很奇怪

java - 排除 pom.xml 中为 jacoco 配置的内容并排除 sonar 中的文件

java - 如何检查文件内容是否为空