google-cloud-platform - 使用客户端从 java 代码手动触发 Cloud Composer dag 运行

标签 google-cloud-platform airflow google-cloud-composer airflow-2.x

我有一个云 Composer dag,其调度程序属性设置为 none 并且需要触发。

我已将我的 dag 代码上传到云 compose gcs 文件夹,并尝试使用本地 gcloud 凭据从本地触发,如 python 文档中的建议( https://github.com/GoogleCloudPlatform/python-docs-samples/blob/HEAD/composer/rest/composer2/composer2_airflow_rest_api.py )。一切正常。我可以使用应用程序默认凭据从本地触发 dag。

现在我想使用 java 代码从本地触发相同的 dag。但该文档对于创建客户端和触发 dag 运行没有帮助。我知道 google 提供了一个包 google-cloud-orchestration-airflow( https://cloud.google.com/java/docs/reference/google-cloud-orchestration-airflow/latest/overview ) 但它没有一个类,我可以通过该类从我们的服务 java 代码触发 dag 运行。 我想为我的 dag 创建一个 java 客户端并手动触发 dag 运行。

最佳答案

我可以复制任何Python代码所做的事情,并通过添加授权 header 并将其设置为从默认凭据生成的访问 token 来从本地触发dag运行,而不是使用google-cloud-orchestration-airflow库在我本地。

附上工作代码:

import com.google.auth.oauth2.GoogleCredentials;
import com.google.gson.JsonObject;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class Main {
    static Logger logger = LoggerFactory.getLogger(Main.class);
    public static void main(String[] args) throws IOException {
        //Build the empty request payload
        JsonObject jsonPayload = new JsonObject();
        jsonPayload.add("conf", new JsonObject());
        //set the headers
        GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
        credentials.refresh();
        String accessToken = credentials.getAccessToken().getTokenValue();
        HttpPost httpPost = new HttpPost(  "https://<env-url>.composer.googleusercontent.com/<dag-name>/dagRuns");
        httpPost.setHeader("Authorization", "Bearer " + accessToken);
        httpPost.setHeader("Content-Type", ContentType.APPLICATION_JSON.getMimeType());
        // Set the request body to trigger the DAG run.
        httpPost.setEntity(new StringEntity(jsonPayload.toString()));

        CloseableHttpClient httpClient = HttpClients.createDefault();
        try {
            CloseableHttpResponse httpResponse = httpClient.execute(httpPost);
            HttpEntity entity = httpResponse.getEntity();
            String responseContent = EntityUtils.toString(entity);
            logger.info("Received response: " + responseContent);
        } catch (Exception e) {
            logger.info("Failed to send request: " + e.getMessage());
        } finally {
            httpClient.close();
        }

    }
}

通过上述方式,我可以从本地触发 dag 运行。我无法找到通过 google-cloud-orchestration-airflow 包进行操作的方法。但是附加一个授权 header 就完成了身份验证工作。

关于google-cloud-platform - 使用客户端从 java 代码手动触发 Cloud Composer dag 运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76084591/

相关文章:

python - 想要创建当前任务下游的 Airflow 任务

google-cloud-platform - 使用上传云存储操作替换文件夹

amazon-web-services - 是否有 AWS/Azure/Gcloud API 的包装器?

kubernetes - 将Compute Engine磁​​盘作为持久卷添加到Container Engine

python - Google Cloud Composer 安装依赖项的时间太长

python - Apache Airflow : Pass variable in jinja include

kubernetes - 如何使用Cloud Composer安排外部集群中的Kubernetes工作负载

docker - 如何加速 Google Cloud Platform 上的 bazel 构建

python - Airflow 界面中的干净 TreeView

python - 以编程方式在 dockerized apache Airflow python 操作符内创建 SSH 隧道