我是新来的 sfdc 。我已经由用户创建了报告。我想使用 python 将报告的数据转储到 csv/excel 文件中。
我看到有几个 python 包。但是我的代码给出了一个错误
from simple_salesforce import Salesforce
sf = Salesforce(instance_url='https://cs1.salesforce.com', session_id='')
sf = Salesforce(password='xxxxxx', username='xxxxx', organizationId='xxxxx')
我可以了解设置 API 的基本步骤和一些示例代码吗
最佳答案
下面的代码相当长,可能仅适用于我们的用例,但基本思想如下:
找出日期间隔长度和额外需要的过滤,以免遇到“超过 2'000”的限制。在我的情况下,我可以有每周日期范围过滤器,但需要应用一些额外的过滤器
然后像这样运行它:
report_id = '00O4…'
sf = SalesforceReport(user, pass, token, report_id)
it = sf.iterate_over_dates_and_filters(datetime.date(2020,2,1),
'Invoice__c.InvoiceDate__c', 'Opportunity.CustomField__c',
[('a', 'startswith'), ('b', 'startswith'), …])
for row in it:
# do something with the dict
自 2020 年 2 月 1 日以来,迭代器每周(如果您需要每日迭代器或每月迭代器,则需要更改代码,但更改应该很小)并应用过滤器 CustomField__c.startswith('a'),然后 CustomField__c.startswith('b'), ... 并充当生成器,因此您无需自己弄乱过滤器循环。
如果有一个返回超过 2000 行的查询,迭代器会抛出一个异常,以确保数据不完整。
这里有一个警告:SF 限制为每小时最多 500 个查询。假设您有一年有 52 周和 10 个额外的过滤器,那么您已经遇到了这个限制。
这是类(class)(依赖
simple_salesforce
)import simple_salesforce
import json
import datetime
"""
helper class to iterate over salesforce report data
and manouvering around the 2000 max limit
"""
class SalesforceReport(simple_salesforce.Salesforce):
def __init__(self, username, password, security_token, report_id):
super(SalesforceReport, self).__init__(username=username, password=password, security_token=security_token)
self.report_id = report_id
self._fetch_describe()
def _fetch_describe(self):
url = f'{self.base_url}analytics/reports/{self.report_id}/describe'
result = self._call_salesforce('GET', url)
self.filters = dict(result.json()['reportMetadata'])
def apply_report_filter(self, column, operator, value, replace=True):
"""
adds/replaces filter, example:
apply_report_filter('Opportunity.InsertionId__c', 'startsWith', 'hbob').
For date filters use apply_standard_date_filter.
column: needs to correspond to a column in your report, AND the report
needs to have this filter configured (so in the UI the filter
can be applied)
operator: equals, notEqual, lessThan, greaterThan, lessOrEqual,
greaterOrEqual, contains, notContain, startsWith, includes
see https://sforce.co/2Tb5SrS for up to date list
value: value as a string
replace: if set to True, then if there's already a restriction on column
this restriction will be replaced, otherwise it's added additionally
"""
filters = self.filters['reportFilters']
if replace:
filters = [f for f in filters if not f['column'] == column]
filters.append(dict(
column=column,
isRunPageEditable=True,
operator=operator,
value=value))
self.filters['reportFilters'] = filters
def apply_standard_date_filter(self, column, startDate, endDate):
"""
replace date filter. The date filter needs to be available as a filter in the
UI already
Example: apply_standard_date_filter('Invoice__c.InvoiceDate__c', d_from, d_to)
column: needs to correspond to a column in your report
startDate, endDate: instance of datetime.date
"""
self.filters['standardDateFilter'] = dict(
column=column,
durationValue='CUSTOM',
startDate=startDate.strftime('%Y-%m-%d'),
endDate=endDate.strftime('%Y-%m-%d')
)
def query_report(self):
"""
return generator which yields one report row as dict at a time
"""
url = self.base_url + f"analytics/reports/query"
result = self._call_salesforce('POST', url, data=json.dumps(dict(reportMetadata=self.filters)))
r = result.json()
columns = r['reportMetadata']['detailColumns']
if not r['allData']:
raise Exception('got more than 2000 rows! Quitting as data would be incomplete')
for row in r['factMap']['T!T']['rows']:
values = []
for c in row['dataCells']:
t = type(c['value'])
if t == str or t == type(None) or t == int:
values.append(c['value'])
elif t == dict and 'amount' in c['value']:
values.append(c['value']['amount'])
else:
print(f"don't know how to handle {c}")
values.append(c['value'])
yield dict(zip(columns, values))
def iterate_over_dates_and_filters(self, startDate, date_column, filter_column, filter_tuples):
"""
return generator which iterates over every week and applies the filters
each for column
"""
date_runner = startDate
while True:
print(date_runner)
self.apply_standard_date_filter(date_column, date_runner, date_runner + datetime.timedelta(days=6))
for val, op in filter_tuples:
print(val)
self.apply_report_filter(filter_column, op, val)
for row in self.query_report():
yield row
date_runner += datetime.timedelta(days=7)
if date_runner > datetime.date.today():
break
关于python-2.7 - 使用 python 导入 Salesforce 报告数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22853232/