go - 大量 transient 对象 - 避免争用

标签 go

我有一个用 Go 编写的新 TCP 服务器,有 100 多个客户端连接到它。每个客户端都需要集中查看数据流,因为他们正在查看来自不同位置的电波上的 radio 数据包,然后进行分析。该代码有效,但我看到围绕锁定有很多争用和增加的 CPU,并且在思考如何避免锁定(如果可能)或围绕它进行优化之后。

当 TCP 服务器为接收到的每个数据包启动 GoRoutine 时,addMessage 函数需要一定程度的同步。稍后还会在另一个函数中分析这些数据包,该函数在 map 上执行 RLock()

每秒被调用一次的 cullMessages() 函数真正陷入困境并可能真正变慢,有时需要 2-3 秒才能运行,这使问题更加复杂接下来的 2-3 个操作正在排队等待解锁并立即运行!

任何想法/想法将不胜感激!

var dataMessagesMutex sync.RWMutex
var dataMessages map[string][]*trackingPacket_v1

// Function is called from each TCP client who need to share this data
func addMessage(trackingPacket *trackingPacket_v1) {
    dataMessagesMutex.Lock()
    dataMessages[trackingPacket.packetID] = append(dataMessages[trackingPacket.packetID], trackingPacket)
    dataMessagesMutex.Unlock()
}

// Function called on a loop, need to delete based on age here
func cullMessages() {
    cullTS := time.Now().Add(-time.Second * MODES_MAX_MESSAGE_AGE)

    dataMessagesMutex.Lock()
    defer dataMessagesMutex.Unlock()

    for avr, data := range dataMessages {
        sort.Sort(PacketSorter(data))
        highestIndex := 0

        for i, messages := range data {
            if cullTS.Sub(messages.ProcessedTime) > 0 {
                // Need to delete the message here
                messages = nil
                highestIndex = i
            }
        }
        // Copy the new slice into the data variable
        data = data[highestIndex+1:]

        if len(data) == 0 {
            // Empty Messages, delete
            delete(dataMessages, avr)
        }
    }
}

更新: 新增分析功能

func processCandidates() {
    mlatMessagesMutex.RLock()
    defer dataMessagesMutex.RUnlock()

    for _, data := range dataMessages {
        numberOfMessages := len(data)
        for a := 0; a < numberOfMessages; a++ {
            packetA := data[a]
            applicablePackets := []*trackingPacket_v1{packetA}
            for b := 0; b < numberOfMessages; b++ {
                // Don't compare identical packets
                if b == a {
                    continue
                }

                packetB := data[b]

                // Only consider this packet if it's within an acceptable
                // timestamp threshold
                tsDelta := math.Abs(packetA.NormalisedTS - packetB.NormalisedTS)

                if tsDelta < MAX_MESSAGE_TS_DIFF {
                    // Finally, we need to make sure that only one message per
                    // station is included in our batch
                    stationAlreadyRepresented := false
                    for i := 0; i < len(applicablePackets); i++ {
                        if applicablePackets[i].Sharecode == packetB.Sharecode {
                            stationAlreadyRepresented = true
                        }
                    }

                    if stationAlreadyRepresented == false {

                        applicablePackets = append(applicablePackets, packetB)
                    }
                }
            }

            // Remove any stations which are deemed too close to one another
            if len(applicablePackets) >= MIN_STATIONS_NEEDED {
                applicablePackets = cullPackets(applicablePackets)
            }

            // Provided we still have enough packets....
            if len(applicablePackets) >= MIN_STATIONS_NEEDED {
                // Generate a hash for this batch...
                hash := generateHashForPackets(applicablePackets)
                batchIsUnique := true

                for _, packet := range applicablePackets {
                    if packet.containsHash(hash) {
                        batchIsUnique = false
                        break
                    }
                }

                if batchIsUnique == true {
                    for _, packet := range applicablePackets {
                        packet.addHash(hash)
                    }

                    go sendOfDataForWork(applicablePackets)
                }
            }

        }
    }
}

最佳答案

与其拥有一个大 map ,不如为每个 packetID 拥有一个 goroutine。调度程序 goroutine 可以有一个 map[string]chan *trackingPacket_v1,并在适当的 channel 上发送传入的数据包。然后,该 packetID 的 goroutine 会将数据包收集到本地 slice 中,并定期挑选和分析它们。

您可能需要以某种方式终止未在 MODES_MAX_MESSAGE_AGE 中收到数据包的 goroutine。调度程序 goroutine 可能会跟踪最近看到每个 packetID 的时间,并定期检查并检查那些太旧的。然后它将关闭这些 channel 并将它们从其 map 中删除。当analysis goroutine发现自己的channel被关闭时,就会退出。

关于go - 大量 transient 对象 - 避免争用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41748628/

相关文章:

go - 所有的协程都在 sleep

go - 如何将字符串值分配给 GO 中的时间格式?

java - 将Java字节数组转换为Go结构

java - 如何使用 Java native 接口(interface)从 Java 调用 Go 函数?

regex - 如何使用正则表达式在 golang 的括号内获取所有内容

go - ZeroMQ 在断开连接的对等点上进行循环故障转移

go - Golang 的 Codeclimate 测试覆盖格式化程序

go - 如何测试从自定义配置构建的 zap Logger 的日志记录?

go - 如何遍历 channel 并指定为引用

unit-testing - 如何在自定义文件夹中使用 go test 生成多个包的覆盖率?