我有一个云 Composer dag,其调度程序属性设置为 none 并且需要触发。
我已将我的 dag 代码上传到云 compose gcs 文件夹,并尝试使用本地 gcloud 凭据从本地触发,如 python 文档中的建议( https://github.com/GoogleCloudPlatform/python-docs-samples/blob/HEAD/composer/rest/composer2/composer2_airflow_rest_api.py )。一切正常。我可以使用应用程序默认凭据从本地触发 dag。
现在我想使用 java 代码从本地触发相同的 dag。但该文档对于创建客户端和触发 dag 运行没有帮助。我知道 google 提供了一个包 google-cloud-orchestration-airflow( https://cloud.google.com/java/docs/reference/google-cloud-orchestration-airflow/latest/overview ) 但它没有一个类,我可以通过该类从我们的服务 java 代码触发 dag 运行。 我想为我的 dag 创建一个 java 客户端并手动触发 dag 运行。
最佳答案
我可以复制任何Python代码所做的事情,并通过添加授权 header 并将其设置为从默认凭据生成的访问 token 来从本地触发dag运行,而不是使用google-cloud-orchestration-airflow库在我本地。
附上工作代码:
import com.google.auth.oauth2.GoogleCredentials;
import com.google.gson.JsonObject;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class Main {
static Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws IOException {
//Build the empty request payload
JsonObject jsonPayload = new JsonObject();
jsonPayload.add("conf", new JsonObject());
//set the headers
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
credentials.refresh();
String accessToken = credentials.getAccessToken().getTokenValue();
HttpPost httpPost = new HttpPost( "https://<env-url>.composer.googleusercontent.com/<dag-name>/dagRuns");
httpPost.setHeader("Authorization", "Bearer " + accessToken);
httpPost.setHeader("Content-Type", ContentType.APPLICATION_JSON.getMimeType());
// Set the request body to trigger the DAG run.
httpPost.setEntity(new StringEntity(jsonPayload.toString()));
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
CloseableHttpResponse httpResponse = httpClient.execute(httpPost);
HttpEntity entity = httpResponse.getEntity();
String responseContent = EntityUtils.toString(entity);
logger.info("Received response: " + responseContent);
} catch (Exception e) {
logger.info("Failed to send request: " + e.getMessage());
} finally {
httpClient.close();
}
}
}
通过上述方式,我可以从本地触发 dag 运行。我无法找到通过 google-cloud-orchestration-airflow 包进行操作的方法。但是附加一个授权 header 就完成了身份验证工作。
关于google-cloud-platform - 使用客户端从 java 代码手动触发 Cloud Composer dag 运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76084591/