apache-storm - Storm-kafka spout 消耗缓慢

标签 apache-storm apache-kafka

我只是在尝试这里提到的 kafka-storm spout https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka我使用的配置如下所示。

    BrokerHosts brokerHosts = KafkaConfig.StaticHosts.fromHostString(
            ImmutableList.of("localhost"), 1);
    SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, // list of Kafka
            "test", // topic to read from
            "/kafkastorm", // the root path in Zookeeper for the spout to
            "discovery"); // an id for this consumer for storing the
                            // consumer offsets in Zookeeper
    spoutConfig.scheme = new StringScheme();
    spoutConfig.stateUpdateIntervalMs = 1000;


    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

    TridentTopology topology = new TridentTopology();
    InetSocketAddress inetSocketAddress = new InetSocketAddress(
            "localhost", 6379);
    TridentState wordsCount = topology
            .newStream(SPOUT_FIRST, kafkaSpout)
            .parallelismHint(1)
            .each(new Fields("str"), new TestSplit(), new Fields("words"))
            .groupBy(new Fields("words"))
            .persistentAggregate(
                    RedisState.transactional(inetSocketAddress),
                    new Count(), new Fields("counts")).parallelismHint(100);

    Config conf = new Config();
    conf.setMaxTaskParallelism(200);
    // conf.setDebug( true );
    // conf.setMaxSpoutPending(20);

    // This topology can only be run as local because it is a toy example
    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("symbolCounter", conf, topology.build());

但是上面的 spout 从 Kafka 主题中获取消息的速度大约是 7000/秒,但我预计每秒加载大约 50000 条消息。我尝试了各种增加 spoutConfig 中的提取缓冲区大小的选项,但没有看到任何结果。

有没有人遇到过无法以生产者生产消息的速度通过storm获取kafka主题的类似问题?

最佳答案

我将配置中的“topology.spout.max.batch.size”值更新为大约 64*1024 值,然后 Storm 处理变得很快。

关于apache-storm - Storm-kafka spout 消耗缓慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19705979/

相关文章:

java - 在 trident 中实现事务拓扑的问题

python - 如何在保留矩阵维度的同时序列化 numpy 数组?

docker - 在自定义网络中访问Docker容器的IP地址

apache-kafka - 无法使用 .\bin\windows\kafka-server-start.bat .\config\server.properties cmd 启动 kafka

java - 从 Kafka Streams 反序列化对象时出错

java - Storm 拓扑的 Mongo 连接池

maven - Apachestorm-jdbc maven 集成

scala - 访问 DStream 集合

kubernetes - 如何在 Kubernetes 上为 Kafka-connect 创建连接器?

hadoop - 如何在我的机器上运行 Storm 拓扑...使我的机器陷入 Storm