我想知道如何使用 mgo 在 Go 中管理 MongoDB session ,尤其是关于如何正确确保 session 已关闭以及如何对写入失败使用react。
我已阅读以下内容:
Best practice to maintain a mgo session
Should I copy session for each operation in mgo?
仍然不能将其应用于我的情况。
我有两个 goroutine,它们将事件一个接一个地存储到 MongoDB 中,共享同一个 *mgo.Session,两者看起来基本上如下所示:
func storeEvents(session *mgo.Session) {
session_copy := session.Copy()
// *** is it correct to defer the session close here? <-----
defer session_copy.Close()
col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
for {
event := GetEvent()
err := col.Insert(&event)
if err != nil {
// *** insert FAILED - how to react properly? <-----
session_copy = session.Copy()
defer session_copy.Close()
}
}
}
col.Insert(&event) 几个小时后返回错误
read tcp 127.0.0.1:46954->127.0.0.1:27017: i/o timeout
我不确定如何对此使用react。发生此错误后,它会在所有后续写入中发生,因此看来我必须创建一个新 session 。我的替代方案似乎是:
1) 重启整个goroutine,即
if err != nil {
go storeEvents(session)
return
}
2) 创建一个新的 session 副本
if err != nil {
session_copy = session.Copy()
defer session_copy.Close()
col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
continue
}
--> 我使用 defer session_copy.Close()
的方式是否正确? (注意上面的 defer 引用了另一个 session 的 Close() 函数。无论如何,这些 session 永远不会关闭,因为该函数永远不会返回。也就是说,随着时间的推移,许多 session 将被创建但不会关闭。
其他选择?
最佳答案
所以我不知道这是否会对您有所帮助,但我对此设置没有任何问题。
我有一个从中导入的 mongo 包。这是我的 mongo.go 文件的模板
package mongo
import (
"time"
"gopkg.in/mgo.v2"
)
var (
// MyDB ...
MyDB DataStore
)
// create the session before main starts
func init() {
MyDB.ConnectToDB()
}
// DataStore containing a pointer to a mgo session
type DataStore struct {
Session *mgo.Session
}
// ConnectToTagserver is a helper method that connections to pubgears' tagserver
// database
func (ds *DataStore) ConnectToDB() {
mongoDBDialInfo := &mgo.DialInfo{
Addrs: []string{"ip"},
Timeout: 60 * time.Second,
Database: "db",
}
sess, err := mgo.DialWithInfo(mongoDBDialInfo)
if err != nil {
panic(err)
}
sess.SetMode(mgo.Monotonic, true)
MyDB.Session = sess
}
// Close is a helper method that ensures the session is properly terminated
func (ds *DataStore) Close() {
ds.Session.Close()
}
然后在另一个包中,例如 main Updated Based on the comment below
package main
import (
"../models/mongo"
)
func main() {
// Grab the main session which was instantiated in the mongo package init function
sess := mongo.MyDB.Session
// pass that session in
storeEvents(sess)
}
func storeEvents(session *mgo.Session) {
session_copy := session.Copy()
defer session_copy.Close()
// Handle panics in a deferred fuction
// You can turn this into a wrapper (middleware)
// remove this this function, and just wrap your calls with it, using switch cases
// you can handle all types of errors
defer func(session *mgo.Session) {
if err := recover(); err != nil {
fmt.Printf("Mongo insert has caused a panic: %s\n", err)
fmt.Println("Attempting to insert again")
session_copy := session.Copy()
defer session_copy.Close()
col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
event := GetEvent()
err := col.Insert(&event)
if err != nil {
fmt.Println("Attempting to insert again failed")
return
}
fmt.Println("Attempting to insert again succesful")
}
}(session)
col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
event := GetEvent()
err := col.Insert(&event)
if err != nil {
panic(err)
}
}
我在 AWS 上的生产服务器上使用了类似的设置。我每小时插入超过 100 万次。希望这可以帮助。我为确保 mongo 服务器可以处理连接所做的另一件事是在我的生产机器上创建 ulimit。在这个stack里面有讲到
关于mongodb - 在出现错误的情况下重新创建 mgo session (读取 tcp 127.0.0.1 :46954->127. 0.0.1:27017: i/o 超时),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40698909/