我有一个脚本,我想运行它来遍历具有大约 4M 记录的 mysql 表,并且对于每个我执行另一个调用另一个 mysql 表以获取更多数据,从这些数据我创建一个新对象并将其插入到 mongodb。
所以基本上我想执行从 mysql 到 mongodb 的迁移
我正在使用 quill,这是一个用于在您的 scala 项目 (QDSL) 中使用 sql 的库。
my script class is very short and looks like:
class MigrateScript @Inject()(dao: PeopleDao) {
lazy val ctx = new MysqlAsyncContext(SnakeCase, "mysql")
import ctx._
def getNextPerson(idx: Int): Future[Person] = {
val q = quote {
query[Person].drop(lift(idx)).take(1)
}
ctx.run(q.take(1)).map(_.head) recover {
case t: NoSuchElementException =>
println(s"problem with index: $idx")
throw new RuntimeException
}
}
def getPersonKids(personId: Int): Future[List[Kid]] = {
val q = quote {
query[Kid].filter(kid => kid.parent_id == lift(personId))
}
ctx.run(q)
}
def runScript(numberOfRecords: Int): Unit = {
for (a <- 0 to numberOfRecords) {
getNextPerson(a).map(person => {
getPersonKids(person.id).map(kids => {
// create obj and insert to mongo
val parentWithKidsObjectToInsert = // creating new object using person & kids
dao.insert(parentWithKidsObjectToInsert) // this returns Future[String]
})
})
}
}
}
为了运行它,我从我的 Controller 中这样做:
def insertMySqlRecordsToMongo(numberOfRecords: Int) = Action { request =>
mScript.runScript(numberOfRecords)
Ok
}
问题:
当我这样运行它时,脚本在 100+ 条记录后卡住,我的日志中出现错误:
java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
和
WARN [ousekeeper] - c.z.h.p.HikariPool(755) - HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m51s17ms).
感觉应用程序运行速度快于 mysql 连接池可以处理...
- 所以我尝试在 getNextPerson 之上添加 Await.result,它运行良好但非常慢。它每分钟仅插入 300 条记录,这可能需要几天时间才能超过 400 万条记录...
请问有什么解决办法吗?感谢所有花时间理解这一点的人 :)
最佳答案
我真的非常建议您研究 Spark 来执行此操作,这听起来像是一个典型的 ETL 用例。问题是您正在将数千条记录具体化到内存中,这会杀死您的 GC 并拖延 Futures。此外,您必须逐条记录地进行,这使得它变得非常非常慢。如果你将它加载到 Spark 数据帧中,我会占用更少的空间,因为 Spark 实际上并没有将记录具体化到内存中(它们使用非常紧凑的二进制内存序列化,如果需要,它会“溢出”到磁盘),这样可以节省你的堆来自 GC-annihilation。它还并行执行许多记录的加载和转换。它会为您提供性能特征,使您的问题变得易于处理。
以下是我大概会做的事情:
- 使用
spark.read.jdbc
将记录加载到 Spark 数据集中
- 通过父记录加入数据集和组
- 使用 Mongo Spark Collector 将记录写入 MongoDB
代码本身应该是这样的:
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import com.mongodb.spark._
// probably import some other stuff
SparkSession.builder()
.master("local")
.appName("Load records to mongo")
// Configure the spark->mongo connector
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.enableHiveSupport()
.getOrCreate()
case class PersonWithKids(person:Person, kids:List[Kid])
// make sure the MySQL jdbc driver is not the classpath because spark is about to use it
val people = spark.jdbc.read("(select * from people) as data", "jdbc://...").as[Person]
val kids = spark.jdbc.read("(select * from kids) as data", "jdbc://...").as[Kid]
val joined =
people
.joinWith(kids, people("id") === kids("parent_id"))
.map({case (person, kid) => PersonWithKids(person, List(kid))})
.groupByKey(_.person)
.flatMapGroups({case (person, personWithKidIter) =>
PersonWithKids(person, personWithKidIter.toList.flatMap(_.kids))
})
// make sure you did stuff correctly
// joined.show()
// joined.take(100).write.json("someFileWhereYouCanDoubleCheck.json")
MongoSpark.save(joined)
您可能需要以下 SBT 依赖项:
"org.apache.spark" %% "spark-core" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-hive" % "2.3.1" // or 2.4.0 might be latest now
"org.apache.spark" %% "spark-sql" % "2.3.1" // or 2.4.0 might be latest now
"org.mongodb.spark" %% "mongo-spark-connector" % "2.3.1" // or 2.4.0 might be latest now
祝你好运!
关于mysql - 使用 play 框架从 mysql 读取并插入 mongo(多线程问题)的运行脚本问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53915006/