java - 如何关闭 Apache Kafka 连接器任务?

标签 java apache-kafka apache-kafka-connect

现在我正在使用 Apache Kafka 并有任务: 我们的目录中有一些 csv 文件,它是一个小批量文件,每个文件约为 25-30 mb。我所需要的只是解析文件并将其放入 kafka。

正如我所见,Kafka 有一些有趣的东西,比如 Connector。

我可以创建 Source-Connector 和 SourceTask,但我不明白一件事: 当我处理文件时,如何停止或删除我的任务?

例如,我有虚拟连接器:

public class DummySourceConnector extends SourceConnector {
private static final Logger logger = LogManager.getLogger();

@Override
public String version() {
    logger.info("version");

    return "1";
}

@Override
public ConfigDef config() {
    logger.info("config");

    return null;
}

@Override
public Class<? extends Task> taskClass() {
    return DummySourceTask.class;
}

@Override
public void start(Map<String, String> props) {
    logger.info("start {}", props);
}

@Override
public void stop() {
    logger.info("stop");
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    logger.info("taskConfigs {}", maxTasks);

    return ImmutableList.of(ImmutableMap.of("key", "value"));
}

和任务:

public class DummySourceTask extends SourceTask {
private static final Logger logger = LogManager.getLogger();

private long offset = 0;

@Override
public String version() {
    logger.info("version");

    return "1";
}

@Override
public void start(Map<String, String> props) {
    logger.info("start {}", props);
}


@Override
public List<SourceRecord> poll() throws InterruptedException {
    Thread.sleep(3000);

    final String value = "Offset " + offset++ + " Timestamp " + Instant.now().toString();

    logger.info("poll value {}", value);

    return ImmutableList.of(new SourceRecord(
            ImmutableMap.of("partition", 0),
            ImmutableMap.of("offset", offset),
            "topic-dummy",
            SchemaBuilder.STRING_SCHEMA,
            value
    ));
}

public void stop() {
    logger.info("stop");
}

但是当任务全部完成后我如何才能关闭它呢? 或者也许你可以帮助我为这项任务提供另一个想法。

感谢您的帮助!

最佳答案

首先,我鼓励您查看现有的连接器 here 。我觉得 spooldir 连接器会对你有帮助。您甚至可以只下载并安装它,而无需编写任何代码。

其次,如果我理解正确的话,你想停止一个任务。我相信this discussion就是你想要的。

关于java - 如何关闭 Apache Kafka 连接器任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39849434/

相关文章:

java - null = ""对于字符串

java - 哪些模板语言可同时用于 Java 和 Javascript?

apache-spark - 结构化流 : watermark vs. 恰好一次语义

spring-boot - 事务发件箱模式与微服务中的 ChainedKafkaTransactionManager

apache-kafka - 为什么在 Kafka Connect 中使用 JDBC Sink Connector 无法删除记录

apache-kafka - 连接器配置不包含连接器类型

java - 关于正则表达式的查询

java - Java GridLayout 和添加按钮的问题

apache-kafka - 弗林克 : Window does not process data at end of stream

apache-kafka - 连接 Kafka 3.0 中的问题 - org.apache.kafka.common.KafkaException : Failed to load SSL keystore