python - Python 中的 nifi executescript 流文件创建问题

标签 python overriding apache-nifi

我正在使用 python 脚本抓取页面(例如 facebook 页面),并希望将每个帖子传递到文件中(类似于 gettwitter 进程)。 ExecuteScript 是我的 nifi 数据流中的第一个处理器。我设法使用 session.create() 创建一个流文件,并且没有任何问题。

但是,我对如何将从 facebook 读取的数据放入 outputstreamCallback 感到困惑。我见过的大多数示例都使用了 java 重写,但我必须使用 Python,并且必须承认我对此并不陌生。

我发现了很多有关读取流文件的示例,但没有太多内容。下面是我想在 Python 中用 Java 做的事情。

FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
    @Override
    public void process(final OutputStream out) throws IOException {
        out.write(tweet.getBytes(StandardCharsets.UTF_8));

如果还有其他方法,请指导。谢谢。

<小时/>

采用@James建议的更改后,我编写了一个片段,如下所示,但未传输流文件。但没有编译错误。

import urllib2
import json
import datetime
import csv
import time
import sys
import traceback
from org.apache.nifi.processor.io import OutputStreamCallback
from org.python.core.util import StringUtil

class WriteContentCallback(OutputStreamCallback):
    def __init__(self, content):
        self.content_text = content

    def process(self, outputStream):
        try:
            outputStream.write(StringUtil.toBytes(self.content_text))
        except:
            traceback.print_exc(file=sys.stdout)
            raise

#app_id = "<FILL IN>"
#app_secret = "<FILL IN>" # DO NOT SHARE WITH ANYONE!
page_id = "dsssssss"
#page_id = raw_input("Please Paste Public Page Name:")

#access_token = app_id + "|" + app_secret

access_token = "sdfsdfsf%sdfsdf"

#access_token = raw_input("Please Paste Your Access Token:")


def scrapeFacebookPageFeedStatus(page_id, access_token):
        flowFile = session.create()
        flowFile = session.write(flowFile, WriteContentCallback("Hello there this is my data"))
        flowFile = session.write()
        session.transfer(flowFile, REL_SUCCESS)

        has_next_page = False
        num_processed = 0   # keep a count on how many we've processed
        scrape_starttime = datetime.datetime.now()


        while has_next_page:
            print "Scraping %s Facebook Page: %s\n" % (page_id, scrape_starttime)
            has_next_page = False

        print "\nDone!\n%s Statuses Processed in %s" % \
                (num_processed, datetime.datetime.now() - scrape_starttime)


if __name__ == '__main__':
    scrapeFacebookPageFeedStatus(page_id, access_token)
    flowFile = session.create()
    flowFile = session.write(flowFile, WriteContentCallback("and your data"))
    session.transfer(flowFile, REL_SUCCESS)

下面是 nifi-app.log 的输出

> [root@ambari logs]# tail -100 nifi-app.log 2017-04-03 14:08:07,989
> INFO [StandardProcessScheduler Thread-6]
> o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> ExecuteScript[id=a62f4b97-8fd7-15cd-95b9-505e1b960805] to run with 1
> threads 2017-04-03 14:08:08,938 INFO [Flow Service Tasks Thread-2]
> o.a.nifi.controller.StandardFlowService Saved flow controller
> org.apache.nifi.controller.FlowController@44ec5960 // Another save
> pending = false 2017-04-03 14:08:13,789 INFO [StandardProcessScheduler
> Thread-3] o.a.n.c.s.TimerDrivenSchedulingAgent Scheduled
> PutFile[id=a62f4b8e-8fd7-15cd-7517-56593deabf55] to run with 1 threads
> 2017-04-03 14:08:14,296 INFO [Flow Service Tasks Thread-2]
> o.a.nifi.controller.StandardFlowService Saved flow controller
> org.apache.nifi.controller.FlowController@44ec5960 // Another save
> pending = false

最佳答案

以下是 Python ExecuteScript 中 NiFi OutputStreamCallback 的简单实现:

import sys
import traceback
from org.apache.nifi.processor.io import OutputStreamCallback
from org.python.core.util import StringUtil

class WriteContentCallback(OutputStreamCallback):
    def __init__(self, content):
        self.content_text = content

    def process(self, outputStream):
        try:
            outputStream.write(StringUtil.toBytes(self.content_text))
        except:
            traceback.print_exc(file=sys.stdout)
            raise

# Create new FlowFile with content
flowFile = session.create()
flowFile = session.write(flowFile, WriteContentCallback("This is the flowfile content"))
session.transfer(flowFile, REL_SUCCESS)

关于python - Python 中的 nifi executescript 流文件创建问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43086641/

相关文章:

c# - 自定义类用作字典中的键但找不到键

java - 如何在不使用 setter 的情况下从子类访问父类(super class)中的私有(private)字段?

java - 改变java中子类方法的语义

mysql - 得到???通过Nifi将非拉丁数据放入mysql时

apache-nifi - JSON 数组到 nifi 中的多个 JSON 对象

Mixin 注入(inject)后 Python 对象丢失 __dict__

python - 用户不存在并且没有重定向到 django 项目中创建的租户

python - 如何使用 outlook web access 从 python 发送电子邮件?

apache-nifi - Nifi DistributedCache查找问题

python - 添加新行而不覆盖现有行