我必须将我的项目与 salesforce 集成。更具体地说,我需要能够将一些 salesforce.com 数据批量导入我们的系统,例如客户。我已经做了足够多的研发工作,可以得出结论,SF Bulk API 是可行的方法。我们的项目是一个 Java 系统,我们已经在使用 Camel,它恰好有一个看起来工作得很好的 Salesforce 组件。
这个问题与 Salesforce 和 Camel 一样重要。
SF 批量 api 的本质是它们是异步的。这意味着,我提交了一份工作,然后我必须轮询状态,然后状态在某个时候变为 COMPLETED,然后我可以提取结果。到目前为止一切顺利,但我需要了解的是,从 Camel 的角度来看,我如何在提交工作后启动此轮询?正如您在我的示例中看到的那样,我正在使用延迟器,但这是次优的——有些批处理可能需要几个小时才能准备好。
此外,如您所见,我保留了 jobInfo 变量。那是因为好像没有通过getQueryResultIds传递过来。 camel 中有什么方法可以在它被推下管道时保留交换的某些部分吗?
这是我为批量导入帐户编写的代码(我取出了一些专有的部分):
final BatchInfo[] bi = new BatchInfo[1];
from("timer://runOnce?repeatCount=1&delay=10")
.process(new Processor() {
public void process(Exchange exchange) throws Exception
{
JobInfo jobInfo = new JobInfo();
jobInfo.setContentType(ContentType.CSV);
jobInfo.setOperation(OperationEnum.QUERY);
jobInfo.setObject("Account");
jobInfo.setConcurrencyMode(ConcurrencyModeEnum.PARALLEL);
exchange.getOut().setBody(jobInfo);
}
})
.to("salesforce:createJob")
.to("salesforce:createBatchQuery?sObjectQuery=select Id,Name,Type,BillingCity,BillingState," +
"BillingPostalCode,BillingCountry,Phone from Account")
.delay(10000)
.to("salesforce:getBatch")
.process(new Processor() {
public void process(Exchange exchange) throws Exception
{
BatchInfo batchInfo = exchange.getIn().getBody(BatchInfo.class);
bi[0] = batchInfo;
exchange.getOut().setBody(batchInfo);
}
})
.to("salesforce:getQueryResultIds")
.process(new Processor() {
public void process(Exchange exchange) throws Exception
{
if (exchange.getException() != null)
{
exchange.getException().printStackTrace();
}
System.out.println(exchange.getIn().getBody());
Collection resultIds = exchange.getIn().getBody(Collection.class);
String resultId = (String) resultIds.iterator().next();
exchange.getOut().setHeader(SalesforceEndpointConfig.RESULT_ID, resultId);
exchange.getOut().setHeader(SalesforceEndpointConfig.JOB_ID, bi[0].getJobId());
exchange.getOut().setHeader(SalesforceEndpointConfig.BATCH_ID, bi[0].getId());
exchange.getOut().setBody(exchange.getIn().getBody());
}
})
.to("salesforce:getQueryResult")
.process(new Processor() {
public void process(Exchange exchange) throws Exception
{
InputStream is = exchange.getIn().getBody(InputStream.class);
CachedOutputStream cos = new CachedOutputStream(exchange);
BufferedReader r = new BufferedReader(new InputStreamReader(is));
PrintWriter pw = new PrintWriter(cos);
boolean header = false;
String line = null;
com.google.gson.stream.JsonWriter writer = new com.google.gson.stream.JsonWriter(pw);
Gson gson = new Gson();
writer.beginArray();
while ((line = r.readLine()) != null)
{
if (!header)
{
header = true;
continue;
}
String[] split = line.split(",");
JsonObject account = new JsonObject();
JsonObject headers = new JsonObject();
JsonObject data = new JsonObject();
account.add("headers", headers);
headers.addProperty("category", "entity");
headers.addProperty("type", "Account");
account.add("data", data);
data.addProperty("accountId", split[0].substring(1, split[0].length() - 1));
data.addProperty("name", split[1].substring(1, split[1].length() - 1));
data.addProperty("accountType", split[2].substring(1, split[2].length() - 1));
gson.toJson(account, writer);
}
writer.endArray();
writer.flush();
exchange.getOut().setBody(cos.getInputStream());
}
})
.to("stream:out");
如您所见,我在这里所做的是从SF批量导入数据并将其转换为JSON并保存到文件中。为了这个问题的目的,我简化了事情。
我知道,对于现实世界的场景,我必须用 SEDA 或 VM 生产者替换路由的“来自计时器”部分。
非常感谢任何帮助。
问候, 奥列格
最佳答案
最后,我决定不为此使用 Camel Salesforce 连接器,这是一种耻辱。缺少库的文档和不太清晰的源代码注释让我担心我可能花更多的时间让库工作而不是实际实现我的项目。
关于java - Salesforce 批量 API 和 Apache Camel Salesforce 组件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20710422/