hadoop - 如何通过水槽将 Twitter 数据通过代理提供给 hdfs?

标签 hadoop twitter proxy flume-ng

我已经安装了 flume 并且正在尝试将 Twitter 数据输入到 hdfs 文件夹中。

我的 flume.conf 文件如下所示:

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = <required>
TwitterAgent.sources.Twitter.consumerSecret = <required>
TwitterAgent.sources.Twitter.accessToken = <required>
TwitterAgent.sources.Twitter.accessTokenSecret = <required>
TwitterAgent.sources.Twitter.keywords = hadoop, big data, china, india.
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/flume/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

我遇到了以下错误:

2014-11-03 02:00:49,834 (Twitter Stream consumer-1[Establishing connection]) [DEBUG -  twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] User-Agent: twitter4j http://twitter4j.org/ /2.2.6
2014-11-03 02:00:49,834 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Connection: close
2014-11-03 02:00:49,835 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client-Version: 2.2.6
2014-11-03 02:00:49,835 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client-URL: http://twitter4j.org/en/twitter4j-2.2.6.xml
2014-11-03 02:00:49,836 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Accept-Encoding: gzip
2014-11-03 02:00:49,836 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client: Twitter4J
2014-11-03 02:00:49,837 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:75)] Post Params: count=0&track=hadoop%2Cbig%20data%2Canalytics%2Cbigdata%2Ccloudera%2Cdata%20science&include_entities=true
2014-11-03 02:00:49,843 (Twitter Stream consumer-1[Establishing connection]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Connection refused
2014-11-03 02:00:49,843 (Twitter Stream consumer-1[Establishing connection]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Waiting for 2000 milliseconds
2014-11-03 02:00:49,843 (Twitter Stream consumer-1[Waiting for 2000 milliseconds]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Twitter Stream consumer-1[Waiting for 2000 milliseconds]
2014-11-03 02:00:51,843 (Twitter Stream consumer-1[Waiting for 2000 milliseconds]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Connection refused
2014-11-03 02:00:51,844 (Twitter Stream consumer-1[Waiting for 2000 milliseconds]) [INFO - twitter4j.internal.logging.SLF4JLogger.info(SLF4JLogger.java:83)] Establishing connection.

我的学院网络装有代理服务器。我认为问题出在代理服务器上。

如何在 flume 中使用代理?

最佳答案

https://github.com/cloudera/cdh-twitter-example 构建 jar

解压,然后在里面执行(如前所述):

转到/cdh-twitter-example-master/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java

并添加这一行

cb.setHttpProxyHost("your proxy");
cb.setHttpProxyPort(8080);//port
cb.setHttpProxyUser("");
cb.setHttpProxyPassword("");

$ cd 水槽资源

$mvn 包

den 将 target 中的 jar 放到 flume lib 文件夹中.enjoy

关于hadoop - 如何通过水槽将 Twitter 数据通过代理提供给 hdfs?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26704206/

相关文章:

scala - 当存储级别设置为磁盘时,Spark将数据存储在哪里?

apache-spark - hive 如何使用条件语句根据结果执行不同的查询

hadoop - Datanode和Namenode运行但未反射(reflect)在UI中

swift - Fabric(twitter api) 设置问题。使用 swift

apache - 配置安全的 SSL Apache 反向代理

hadoop - 将文件导入HDFS的不同方式

Python 为什么我无法发送推文?

ios - 如何判断用户是否已在 iOS 上登录 Facebook?

reactjs - 通过代理在 dev 上提供 React 项目

linux - 尝试连接服务器时 Sshuttle 连接被拒绝