有没有办法通过下一个事件触发数据湖分析作业:
“当数据/事件到达事件中心时”它会触发作业。
它不一定是事件中心,但我想触发作业并以某种方式传递数据(例如,作为 json)
最佳答案
It's not necessarily event hub, but i want to trigger the job and somehow pass data (as json for example)
根据您的描述,我建议您可以考虑使用azure web jobs(您也可以使用 eventhub trigger 或 queue trigger )并使用azure data Lake Analytics Net SDK来实现您的要求。
在开始使用 Azure Data Lake Analytics Net SDK 之前,您需要首先为您的应用程序注册一个 AD 应用程序,以请求 token (使用客户端 ID 和 key )访问 ADLA。
在 Azure AD 中注册应用程序并为其创建服务原则。关于如何注册应用程序并获取访问 token 的更多详细步骤请参阅document .
注意:不要忘记在AD组中添加访问数据湖的权限,更多详细信息,您可以引用此article .
完成此操作后,您可以使用以下代码创建一个 Web 作业,该作业将由队列(或事件中心)触发,以在数据湖分析中创建一个新作业来运行您的脚本。
代码如下:
注意:您需要从 Nuget 安装以下软件包:
Microsoft.Azure.Graph.RBAC (preview)
Microsoft.Azure.Management.DataLake.Analytics
Microsoft.Azure.Management.DataLake.Store
Microsoft.IdentityModel.Clients.ActiveDirectory
Microsoft.Rest.ClientRuntime.Azure.Authentication
函数.cs:
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Rest;
using System.Threading;
using Microsoft.Rest.Azure.Authentication;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Azure.Management.DataLake.Analytics;
using Microsoft.Azure.Management.DataLake.Store;
using Microsoft.Azure.Graph.RBAC;
using Microsoft.Azure.Management.DataLake.Analytics.Models;
using System.Security.Cryptography.X509Certificates;
namespace WebJob1
{
public class Functions
{
// This function will get triggered/executed when a new message is written
// on an Azure Queue called queue.
public static void ProcessQueueMessage([QueueTrigger("queue")] string message, TextWriter log)
{
string adlaAccountName = "adlaAccountName";
string subscriptionId = "yoursubscriptionid";
string domain = "tenantid";
var armTokenAudience = new Uri(@"https://management.core.windows.net/");
var adlTokenAudience = new Uri(@"https://datalake.azure.net/");
var aadTokenAudience = new Uri(@"https://graph.windows.net/");
// ----------------------------------------
// Perform authentication to get credentials
// ----------------------------------------
// NON - INTERACTIVE WITH SECRET KEY
string clientId = "clientId";
string secretKey = "clientsecretKey";
var armCreds = GetCredsServicePrincipalSecretKey(domain, armTokenAudience, clientId, secretKey);
var adlCreds = GetCredsServicePrincipalSecretKey(domain, adlTokenAudience, clientId, secretKey);
var aadCreds = GetCredsServicePrincipalSecretKey(domain, aadTokenAudience, clientId, secretKey);
// INTERACTIVE WITH CACHE
//var tokenCache = new TokenCache();
//tokenCache.BeforeAccess = BeforeTokenCacheAccess;
//tokenCache.AfterAccess = AfterTokenCacheAccess;
//var armCreds = GetCredsInteractivePopup(domain, armTokenAudience, tokenCache, PromptBehavior.Auto);
//var adlCreds = GetCredsInteractivePopup(domain, adlTokenAudience, tokenCache, PromptBehavior.Auto);
//var aadCreds = GetCredsInteractivePopup(domain, aadTokenAudience, tokenCache, PromptBehavior.Auto);
// INTERACTIVE WITHOUT CACHE
// var armCreds = GetCredsInteractivePopup(domain, armTokenAudience, PromptBehavior.Auto);
// var adlCreds = GetCredsInteractivePopup(domain, adlTokenAudience, PromptBehavior.Auto);
// var aadCreds = GetCredsInteractivePopup(domain, aadTokenAudience, PromptBehavior.Auto);
// NON-INTERACTIVE WITH CERT
// string clientId = "<service principal / application client ID>";
// var certificate = new X509Certificate2(@"<path to (PFX) certificate file>", "<certificate password>");
// var armCreds = GetCredsServicePrincipalCertificate(domain, armTokenAudience, clientId, certificate);
// var adlCreds = GetCredsServicePrincipalCertificate(domain, adlTokenAudience, clientId, certificate);
// var aadCreds = GetCredsServicePrincipalCertificate(domain, aadTokenAudience, clientId, certificate);
// ----------------------------------------
// Create the REST clients using the credentials
// ----------------------------------------
var adlaAccountClient = new DataLakeAnalyticsAccountManagementClient(armCreds);
adlaAccountClient.SubscriptionId = subscriptionId;
var adlsAccountClient = new DataLakeStoreAccountManagementClient(armCreds);
adlsAccountClient.SubscriptionId = subscriptionId;
var adlaCatalogClient = new DataLakeAnalyticsCatalogManagementClient(adlCreds);
var adlaJobClient = new DataLakeAnalyticsJobManagementClient(adlCreds);
var adlsFileSystemClient = new DataLakeStoreFileSystemManagementClient(adlCreds);
var graphClient = new GraphRbacManagementClient(aadCreds);
graphClient.TenantID = domain;
// ----------------------------------------
// Perform operations with the REST clients
// ----------------------------------------
var script = @" your script ";
var jobId = Guid.NewGuid();
var properties = new USqlJobProperties(script);
var parameters = new JobInformation("test1", JobType.USql, properties, priority: 1, degreeOfParallelism: 1, jobId: jobId);
//Create and submit new job
var jobInfo = adlaJobClient.Job.Create(adlaAccountName, jobId, parameters);
}
// The interactive samples reuse Azure PowerShell's client ID
// For production code you should use your own client ids
private static string azure_powershell_clientid = "1950a258-227b-4e31-a9cf-717495945fc2";
/*
* Interactive: User popup
* (no token cache to reuse/save session state)
*/
private static ServiceClientCredentials GetCredsInteractivePopup(string domain, Uri tokenAudience, PromptBehavior promptBehavior = PromptBehavior.Auto)
{
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
// The client id comes from Azure PowerShell
// for production code you should use your own client id
var clientSettings = new ActiveDirectoryClientSettings
{
ClientId = azure_powershell_clientid,
ClientRedirectUri = new Uri("urn:ietf:wg:oauth:2.0:oob"),
PromptBehavior = promptBehavior
};
var serviceSettings = ActiveDirectoryServiceSettings.Azure;
serviceSettings.TokenAudience = tokenAudience;
var creds = UserTokenProvider.LoginWithPromptAsync(domain, clientSettings, serviceSettings).GetAwaiter().GetResult();
return creds;
}
/*
* Interactive: User popup
* (using a token cache to reuse/save session state)
*/
private static ServiceClientCredentials GetCredsInteractivePopup(string domain, Uri tokenAudience, TokenCache tokenCache, PromptBehavior promptBehavior = PromptBehavior.Auto)
{
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
var clientSettings = new ActiveDirectoryClientSettings
{
ClientId = azure_powershell_clientid,
ClientRedirectUri = new Uri("urn:ietf:wg:oauth:2.0:oob"),
PromptBehavior = promptBehavior
};
var serviceSettings = ActiveDirectoryServiceSettings.Azure;
serviceSettings.TokenAudience = tokenAudience;
var creds = UserTokenProvider.LoginWithPromptAsync(domain, clientSettings, serviceSettings, tokenCache).GetAwaiter().GetResult();
return creds;
}
/*
* Interactive: Device code login
* NOT YET SUPPORTED by Azure's .NET SDK authentication library
*/
private static ServiceClientCredentials GetCredsDeviceCode()
{
throw new NotImplementedException("Azure SDK's .NET authentication library doesn't support device code login yet.");
}
/*
* Non-interactive: Service principal / application using a secret key
* Setup: https://learn.microsoft.com/en-us/azure/azure-resource-manager/resource-group-authenticate-service-principal#create-service-principal-with-password
*/
private static ServiceClientCredentials GetCredsServicePrincipalSecretKey(string domain, Uri tokenAudience, string clientId, string secretKey)
{
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
var serviceSettings = ActiveDirectoryServiceSettings.Azure;
serviceSettings.TokenAudience = tokenAudience;
var creds = ApplicationTokenProvider.LoginSilentAsync(domain, clientId, secretKey, serviceSettings).GetAwaiter().GetResult();
return creds;
}
/*
* Non-interactive: Service principal / application using a certificate
* Setup: https://learn.microsoft.com/en-us/azure/azure-resource-manager/resource-group-authenticate-service-principal#create-service-principal-with-self-signed-certificate
*/
private static ServiceClientCredentials GetCredsServicePrincipalCertificate(string domain, Uri tokenAudience, string clientId, X509Certificate2 certificate)
{
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
var clientAssertionCertificate = new ClientAssertionCertificate(clientId, certificate);
var serviceSettings = ActiveDirectoryServiceSettings.Azure;
serviceSettings.TokenAudience = tokenAudience;
var creds = ApplicationTokenProvider.LoginSilentWithCertificateAsync(domain, clientAssertionCertificate, serviceSettings).GetAwaiter().GetResult();
return creds;
}
}
}
结果:
关于c# - 有没有办法按事件触发数据湖分析作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46527699/