mysql - 使用 play 框架从 mysql 读取并插入 mongo(多线程问题)的运行脚本问题

标签 mysql scala playframework threadpool quill

我有一个脚本,我想运行它来遍历具有大约 4M 记录的 mysql 表,并且对于每个我执行另一个调用另一个 mysql 表以获取更多数据,从这些数据我创建一个新对象并将其插入到 mongodb。

所以基本上我想执行从 mysql 到 mongodb 的迁移

我正在使用 quill,这是一个用于在您的 scala 项目 (QDSL) 中使用 sql 的库。

my script class is very short and looks like:

class MigrateScript @Inject()(dao: PeopleDao) {

  lazy val ctx = new MysqlAsyncContext(SnakeCase, "mysql")

  import ctx._

      def getNextPerson(idx: Int): Future[Person] = {
        val q = quote {
          query[Person].drop(lift(idx)).take(1)
        }
        ctx.run(q.take(1)).map(_.head) recover {
          case t: NoSuchElementException =>
            println(s"problem with index: $idx")
            throw new RuntimeException
        }
      }

      def getPersonKids(personId: Int): Future[List[Kid]] = {
        val q = quote {
          query[Kid].filter(kid => kid.parent_id == lift(personId))
        }
        ctx.run(q)
      }


      def runScript(numberOfRecords: Int): Unit = {
        for (a <- 0 to numberOfRecords) {
          getNextPerson(a).map(person => {
            getPersonKids(person.id).map(kids => {
              // create obj and insert to mongo
              val parentWithKidsObjectToInsert = // creating new object using person & kids

              dao.insert(parentWithKidsObjectToInsert) // this returns Future[String]
            })
          })
        }
      }

}

为了运行它,我从我的 Controller 中这样做:

  def insertMySqlRecordsToMongo(numberOfRecords: Int) = Action { request =>
    mScript.runScript(numberOfRecords)
    Ok
  }

问题:

  1. 当我这样运行它时,脚本在 100+ 条记录后卡住,我的日志中出现错误:

    java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]

WARN   [ousekeeper] - c.z.h.p.HikariPool(755)        - HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m51s17ms).

感觉应用程序运行速度快于 mysql 连接池可以处理...

  1. 所以我尝试在 getNextPerson 之上添加 Await.result,它运行良好但非常慢。它每分钟仅插入 300 条记录,这可能需要几天时间才能超过 400 万条记录...

请问有什么解决办法吗?感谢所有花时间理解这一点的人 :)

最佳答案

我真的非常建议您研究 Spark 来执行此操作,这听起来像是一个典型的 ETL 用例。问题是您正在将数千条记录具体化到内存中,这会杀死您的 GC 并拖延 Futures。此外,您必须逐条记录地进行,这使得它变得非常非常慢。如果你将它加载到 Spark 数据帧中,我会占用更少的空间,因为 Spark 实际上并没有将记录具体化到内存中(它们使用非常紧凑的二进制内存序列化,如果需要,它会“溢出”到磁盘),这样可以节省你的堆来自 GC-annihilation。它还并行执行许多记录的加载和转换。它会为您提供性能特征,使您的问题变得易于处理。

以下是我大概会做的事情:

  1. 使用 spark.read.jdbc
  2. 将记录加载到 Spark 数据集中
  3. 通过父记录加入数据集和组
  4. 使用 Mongo Spark Collector 将记录写入 MongoDB

代码本身应该是这样的:

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import com.mongodb.spark._
// probably import some other stuff

SparkSession.builder()
      .master("local")
      .appName("Load records to mongo")
       // Configure the spark->mongo connector
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
      .enableHiveSupport()
      .getOrCreate()

case class PersonWithKids(person:Person, kids:List[Kid])

// make sure the MySQL jdbc driver is not the classpath because spark is about to use it
val people = spark.jdbc.read("(select * from people) as data", "jdbc://...").as[Person]
val kids = spark.jdbc.read("(select * from kids) as data", "jdbc://...").as[Kid]
val joined = 
  people
    .joinWith(kids, people("id") === kids("parent_id"))
    .map({case (person, kid) => PersonWithKids(person, List(kid))})
    .groupByKey(_.person)
    .flatMapGroups({case (person, personWithKidIter) => 
        PersonWithKids(person, personWithKidIter.toList.flatMap(_.kids))
    })

// make sure you did stuff correctly
// joined.show()
// joined.take(100).write.json("someFileWhereYouCanDoubleCheck.json")

MongoSpark.save(joined)

您可能需要以下 SBT 依赖项:

"org.apache.spark" %% "spark-core" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-hive" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-sql" % "2.3.1" // or 2.4.0 might be latest now
"org.mongodb.spark" %% "mongo-spark-connector" % "2.3.1" // or 2.4.0 might be latest now

祝你好运!

关于mysql - 使用 play 框架从 mysql 读取并插入 mongo(多线程问题)的运行脚本问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53915006/

相关文章:

mysql - 获取列名而不是列值

Mysql 查询帮助 - 需要

scala - gradle 中的影子插件无法正常工作 - gradle build 不会构建 fat jar

java - Heroku Push 被拒绝 - 未检测到 Cedar 支持的应用程序

playframework - 由 : javax. persistence.EntityNotFoundException 引起:Bean 已被删除 - 延迟加载失败

mysql - 时间戳为int字段,查询性能

scala - 如何手动渲染缩放模板?

scala - 如何创建可以从任意深度嵌套的列表中制作平面列表的函数?

scala - Play Framework Websocket 服务器不处理来自 netty 客户端的 Ping 帧

mysql - 将两个表(具有1-M关系)连接其中第二个表需要 'flattened'到一行