我正在使用 Twitter4J 和 Akka Streams 提取推文。我选择了几个字段,如 userId、tweetId、推文文本等。此推文实体被写入数据库:
class Counter extends StatusAdapter with Databases{
implicit val system = ActorSystem("TweetsExtractor")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
implicit val LoggingAdapter =
Logging(system, classOf[Counter])
val overflowStrategy = OverflowStrategy.backpressure
val bufferSize = 1000
val statusSource = Source.queue[Status](
bufferSize,
overflowStrategy
)
val insertFlow: Flow[Status, Tweet, NotUsed] =
Flow[Status].map(status => Tweet(status.getId, status.getUser.getId, status.getText, status.getLang,
status.getFavoriteCount, status.getRetweetCount))
val insertSink: Sink[Tweet, Future[Done]] = Sink.foreach(tweetRepository.create)
val insertGraph = statusSource via insertFlow to insertSink
val queueInsert = insertGraph.run()
override def onStatus(status: Status) =
Await.result(queueInsert.offer(status), Duration.Inf)
}
我的意图是添加位置字段。在 Twitter4J 中有一个特定的 GeoLocation 类型,它包含 double 类型的纬度和经度。但是,当我尝试直接通过流程提取纬度和经度时,没有任何内容写入数据库:
Flow[Status].map(status => Tweet(status.getId, status.getUser.getId, status.getText, status.getLang, status.getFavoriteCount, status.getRetweetCount, status.getGeoLocation.getLatitude, status.getGeoLocation.getLongitude))
这种行为的原因可能是什么,我该如何解决?
最佳答案
正如对问题的评论所证实的那样,这里发生的事情是大多数推文都没有附加地理位置数据,这使得这些字段为空并导致了不当行为。
几个简单的空值检查应该可以解决这个问题。
关于postgresql - 将 GeoLocation Twitter4J 写入 Postgres,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48183288/