google-cloud-dataflow - 如何集成测试写入 Bigtable 的 Dataflow 管道?

标签 google-cloud-dataflow apache-beam google-cloud-bigtable

根据Beam website ,

Often it is faster and simpler to perform local unit testing on your pipeline code than to debug a pipeline’s remote execution.

出于这个原因,我想为写入 Bigtable 的 Beam/Dataflow 应用程序使用测试驱动开发。

但是,按照 Beam 测试文档,我陷入了僵局——PAssert 没有用,因为输出 PCollection 包含 org.apache.hadoop.hbase.client.Put 对象,这些对象不会覆盖 equals 方法。

can't get the contents的 PCollection 也可以对它们进行验证,因为

It is not possible to get the contents of a PCollection directly - an Apache Beam or Dataflow pipeline is more like a query plan of what processing should be done, with PCollection being a logical intermediate node in the plan, rather than containing the data.

那么除了手动运行之外,我还可以如何测试这个管道呢?我正在使用 Maven 和 JUnit(在 Java 中,因为 Dataflow Bigtable Connector 似乎都支持这些)。

最佳答案

Bigtable Emulator Maven plugin可用于为此编写集成测试:

  • 配置Maven Failsafe plugin并将测试用例的结尾从 *Test 更改为 *IT 以作为集成测试运行。
  • 在命令行的 gcloud sdk 中安装 Bigtable Emulator:

    gcloud components install bigtable   
    

    请注意,这个必需的步骤会降低代码的可移植性(例如,它会在您的构建系统上运行吗?在其他开发人员的机器上?)所以我将在部署到构建系统之前使用 Docker 对其进行容器化。

  • 根据 README 将模拟器插件添加到 pom

  • 使用 HBase Client API并查看 example Bigtable Emulator integration test设置您的 session 和表。

  • 按照 Beam 文档正常编写测试,除了不使用 PAssert 而是实际调用 CloudBigtableIO.writeToTable 然后使用 HBase 客户端从表中读取数据以进行验证。

这是一个集成测试示例:

package adair.example;

import static org.apache.hadoop.hbase.util.Bytes.toBytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.junit.Assert;
import org.junit.Test;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;

/**
 *  A simple integration test example for use with the Bigtable Emulator maven plugin.
 */
public class DataflowWriteExampleIT {

  private static final String PROJECT_ID = "fake";
  private static final String INSTANCE_ID = "fakeinstance";
  private static final String TABLE_ID = "example_table";
  private static final String COLUMN_FAMILY = "cf";
  private static final String COLUMN_QUALIFIER = "cq";

  private static final CloudBigtableTableConfiguration TABLE_CONFIG =
    new CloudBigtableTableConfiguration.Builder()
      .withProjectId(PROJECT_ID)
      .withInstanceId(INSTANCE_ID)
      .withTableId(TABLE_ID)
      .build();

  public static final List<String> VALUES_TO_PUT = Arrays
    .asList("hello", "world", "introducing", "Bigtable", "plus", "Dataflow", "IT");

  @Test
  public void testPipelineWrite() throws IOException {
    try (Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID)) {
      Admin admin = connection.getAdmin();
      createTable(admin);

      List<Mutation> puts = createTestPuts();

      //Use Dataflow to write the data--this is where you'd call the pipeline you want to test.
      Pipeline p = Pipeline.create();
      p.apply(Create.of(puts)).apply(CloudBigtableIO.writeToTable(TABLE_CONFIG));
      p.run().waitUntilFinish();

      //Read the data from the table using the regular hbase api for validation
      ResultScanner scanner = getTableScanner(connection);
      List<String> resultValues = new ArrayList<>();
      for (Result row : scanner) {
        String cellValue = getRowValue(row);
        System.out.println("Found value in table: " + cellValue);
        resultValues.add(cellValue);
      }

      Assert.assertThat(resultValues,
        IsIterableContainingInAnyOrder.containsInAnyOrder(VALUES_TO_PUT.toArray()));
    }
  }

  private void createTable(Admin admin) throws IOException {
    HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(TABLE_ID));
    tableDesc.addFamily(new HColumnDescriptor(COLUMN_FAMILY));

    admin.createTable(tableDesc);
  }

  private ResultScanner getTableScanner(Connection connection) throws IOException {
    Scan scan = new Scan();
    Table table = connection.getTable(TableName.valueOf(TABLE_ID));
    return table.getScanner(scan);
  }

  private String getRowValue(Result row) {
    return Bytes.toString(row.getValue(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER)));
  }

  private List<Mutation> createTestPuts() {
    return VALUES_TO_PUT
          .stream()
          .map(this::stringToPut)
          .collect(Collectors.toList());
  }

  private Mutation stringToPut(String cellValue){
    String key = UUID.randomUUID().toString();
    Put put = new Put(toBytes(key));
    put.addColumn(toBytes(COLUMN_FAMILY), toBytes(COLUMN_QUALIFIER), toBytes(cellValue));
    return put;
  }

}

关于google-cloud-dataflow - 如何集成测试写入 Bigtable 的 Dataflow 管道?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51162693/

相关文章:

google-cloud-dataflow - 解释与 Cloud Dataflow 一起使用时 Google Cloud PubSub 的成本

python - 如何在 Apache Beam/Google 数据流中将大窗口缩小为较小的窗口?

google-cloud-platform - 区域丢失后从 Dataflow 恢复 PubSub Acked 消息

java - 如何在 Apache Beam Java 中将 TestStreams 与多输出类一起使用

java - Google Cloud Bigtable Java 客户端 - tcnative 错误

google-cloud-bigtable - Bigtable 备份和冗余

python - 为什么我需要洗牌我的 PCollection 才能在 Cloud Dataflow 上自动缩放?

python - 使用数据流的 GCS 文件流式传输(apachebeam python)

java - 如何防止 Cassandra 的数据流读取并行度降低

json - Bigtable/HBase : Rich column family vs a single JSON Object