events - F# 使用代理将历史事件与两个代理之间的模拟交互交错

标签 events concurrency f# agent mailboxprocessor

抱歉这个问题变得这么大。我一开始只是想问一个简单的问题。 我有历史报价数据。我想模拟交易代理和订单代理对数据和彼此的 react 。我可以使用另一个代理来控制事件流,以便前两个代理没有机会比 R/T 中更快地对事件使用react。 - 但这似乎太模糊了,所以我经历了

  1. 文字墙 (TL:DR)
  2. 两个简短的段落(没有人知道我在问什么,因为没有问题)
  3. 发布了我的错误代码和更多详细信息 -> 又太长太模糊了

但是当我看到它时,我真的只是在问“你是怎么做到的?”这确实不是 SO 的正确格式。正如丹尼尔指出的那样,这个问题太宽泛了,最后,菲尔的提示和良好的 sleep 让我想出了一个解决方案,我将其包含在下面。希望其他人能从中受益。我仍然对我的方法不满意,但我认为代码审查是一个更好的发布这方面内容的地方。

另外,感谢 SO F# 社区没有投票否决我微薄的代表!

open System
open System.IO

let src = __SOURCE_DIRECTORY__
let myPath = Path.Combine(src, "Test_data.txt")

// Create some test data
let makeTestDataFile path lineCount = 
    let now = System.DateTime.Now 
    File.WriteAllLines(path,
     seq { for i in 1 .. lineCount do
           let dt = now.AddSeconds(float i) 
           yield sprintf "%s,%d" (dt.ToString("MM/dd/yyyy hh:mm:ss.fff tt")) i }) 

makeTestDataFile myPath 10

感谢 Phil,我得到了一个工作原型(prototype):

type MsgType = 
        | HistEvent  of DateTime * float
        | AgentEvent of DateTime * float

type DataPoint = {time:DateTime; value:float}

type Agent<'T> = MailboxProcessor<'T>

type EventTrafficAgent () = 

        let event = new Event<DataPoint>()

        let agent = Agent.Start(fun inbox ->
            let rec loop eventQue now () = async {
                let! msg = inbox.Receive()
                // use some form of que managment to decide
                let updatedQue =  
                    match msg with
                    | HistEvent (dt, v) -> 
                        let now = max now dt // log most recent date

                        // check if there are any pending events that can now be sent
                        // if so, send and remove
                        let updatedQue = 
                            eventQue
                            |> List.filter(fun e ->
                                            if e.time <= now then
                                                event.Trigger(e)
                                                let now = e.time 
                                                printfn "\tDequeing & Sending delayed event: {time = %s, value %f}" (e.time.ToString("mm:ss.fff")) e.value
                                                false
                                            else  
                                                true)

                        // just send the historical event as these are never delayed
                        event.Trigger({time = dt; value = v})
                        updatedQue

                    | AgentEvent (dt, v) -> 

                        let tm = dt.AddSeconds(1.5) // time with lag added i.e. "intended time of arrival"
                        let cacheIt = tm >= now

                        // return an updated list of cached orders
                        (if cacheIt then
                            printfn "\tAdding to delayed que: {time = %s, value %f}" (tm.ToString("mm:ss.fff")) v
                            {time = tm; value=v} :: eventQue
                         else 
                            printfn "\tJust sending without delay: {time = %s, value %f}" (tm.ToString("mm:ss.fff")) v
                            event.Trigger({time = tm; value = v})
                            eventQue)

                return! loop updatedQue now () 
                }
            loop  List.empty<DataPoint> DateTime.MinValue () )

        member x.Post  msg = agent.Post msg
        member x.EventProduced = event.Publish  

type OrderBookAgent () =

    let event = new Event<DataPoint>()
    let agent = Agent.Start(fun inbox ->
        let rec loop () = async {
            let! (msg:DataPoint) = inbox.Receive()
            if msg.value = 42. then event.Trigger({time = msg.time; value = 99.})
            return! loop  () 
            }
        loop () )

    member x.Post msg = agent.Post msg
    member x.Publish = event.Publish 

type TradingAgent () =

    let event = new Event<DataPoint>()
    let agent = Agent.Start(fun inbox ->
        let rec loop () = async {
            let! (msg:DataPoint) = inbox.Receive()
            if msg.value = 7. then event.Trigger({time = msg.time; value = 42.})
            return! loop  () 
            }
        loop () )

    member x.Post msg = agent.Post msg
    member x.Publish = event.Publish 


type StreamData(filePath, eventMgr:EventTrafficAgent) =

    let sr = new StreamReader ((filePath:string))

    member x.Reply() =
        async { while not sr.EndOfStream do
                 let line =  sr.ReadLine () 
                 let dtVal = line.Split(char(","))
                 let time =DateTime.Parse (dtVal.[0]) 
                 let value = Double.Parse(dtVal.[1]) 
                 do! Async.Sleep(250) // here to allow you to see it ticking by. set to 1 for full speed
                 do eventMgr.Post (HistEvent(time, value))}
        |> Async.StartImmediate

let eventCop  = new EventTrafficAgent()
let orderBook = new OrderBookAgent()
let tradeBot  = new TradingAgent()


eventCop.EventProduced.Add(fun e -> printfn "event Cop publishing {time = %s, value %3f}" (e.time.ToString("mm:ss.fff")) e.value)
eventCop.EventProduced.Add(fun e -> orderBook.Post e )
eventCop.EventProduced.Add(fun e -> tradeBot.Post e )

orderBook.Publish.Add(fun e -> eventCop.Post  (AgentEvent( e.time, e.value)) )
tradeBot.Publish.Add(fun  e -> eventCop.Post  (AgentEvent( e.time, e.value)) )

let stream = StreamData(myPath, eventCop )

do stream.Reply()

输出为

event Cop publishing {time = 26:23.265, value 3.000000}
event Cop publishing {time = 26:24.265, value 4.000000}
event Cop publishing {time = 26:25.265, value 5.000000}
event Cop publishing {time = 26:26.265, value 6.000000}
event Cop publishing {time = 26:27.265, value 7.000000}
    Adding to delayed que: {time = 26:28.765, value 42.000000}
event Cop publishing {time = 26:28.265, value 8.000000}
event Cop publishing {time = 26:28.765, value 42.000000}
    Dequeing & Sending delayed event: {time = 26:28.765, value 42.000000}
event Cop publishing {time = 26:29.265, value 9.000000}
    Adding to delayed que: {time = 26:30.265, value 99.000000}
event Cop publishing {time = 26:30.265, value 99.000000}
    Dequeing & Sending delayed event: {time = 26:30.265, value 99.000000}
event Cop publishing {time = 26:30.265, value 10.000000}

我想我剩下的唯一问题是使用 AsyncSeq<'T> 之类的东西将数据吸入事件管理器而不是像我现在所做的那样将其插入会更好。

最佳答案

第一次尝试还不错,我想你已经快到了,值得注意的是 loop 应该定义为一个函数,即

let rec loop () = async {
             ^^

并且对循环的调用应该使用单位作为参数,即

   do! loop ()
            ^^
   }
loop () )
     ^^

最后我建议使用 return!结束了!对于递归,即

   return! loop ()

关于events - F# 使用代理将历史事件与两个代理之间的模拟交互交错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22926062/

相关文章:

Firefox 3.6.6 和 4.0 beta 1 中的 jQuery 自定义事件和过多递归错误

events - Visual Basic 中的 MSWinsock.Winsock 事件处理

f# - 实现 F# 库以供 TypeScript/Javascript 使用?

macos - 从 osx 终端编译和运行 f# 脚本

javascript - TypeScript 中未定义的属性

JavaScript 事件处理程序执行顺序

c# - 不使用锁的并行 For 共享数组

用于生成 ConcurrentModificationException 的 Java 示例代码

multithreading - 什么是跳线,什么时候需要?

f# - F# 中的逻辑否定运算符? (!-相等的)