r - 如何用 mclapply [并行化] 替换 Quantstrat 'for loop'?

标签 r parallel-processing quantstrat mclapply

我想并行化 quantstrat。我的代码并不完全像这样,但这展示了问题。我认为的问题是 .blotter env 被初始化为指针内存地址,并且我无法初始化 new.env() 的数组/矩阵。

我想做的是将 for 循环替换为 mclapply,这样我就可以运行具有不同日期/符号的多个 applyStrategies(此处仅显示不同的符号)。我的最终目标是一个 beowulf 集群 (makeCluster),并计划使用最多 252 个交易日(滚动窗口)并行运行这些集群,每次迭代使用不同的符号(但我不需要所有这些。我只是问是否有一个以我可以使用 mclapply 的方式分配投资组合和后续 .blotter 内存对象的方法)

#Load quantstrat in your R environment.

rm(list = ls())

local()

library(quantstrat) 
library(parallel)

# The search command lists all attached packages.
search()

symbolstring1 <- c('QQQ','GOOG')
#symbolstring <- c('QQQ','GOOG')

#for(i in 1:length(symbolstring1))
  mlapply(symbolstring1, function(symbolstring)
{
  #local()
  #i=2
  #symbolstring=as.character(symbolstring1[i])
  
  .blotter <- new.env()
  .strategy <- new.env()
  
  try(rm.strat(strategyName),silent=TRUE)
  try(rm(envir=FinancialInstrument:::.instrument),silent=TRUE)
  for (name in ls(FinancialInstrument:::.instrument)){rm_instruments(name,keep.currencies = FALSE)}
  print(symbolstring)

currency('USD')

stock(symbolstring,currency='USD',multiplier=1)

# Currency and trading instrument objects stored in the 
# .instrument environment

print("FI")
ls(envir=FinancialInstrument:::.instrument)

# blotter functions used for instrument initialization 
# quantstrat creates a private storage area called .strategy

ls(all=T)

# The initDate should be lower than the startDate. The initDate will be used later while initializing the strategy.

initDate <- '2010-01-01'

startDate <- '2011-01-01'

endDate <- '2019-08-10'

init_equity <- 50000

# Set UTC TIME

Sys.setenv(TZ="UTC")

getSymbols(symbolstring,from=startDate,to=endDate,adjust=TRUE,src='yahoo')

# Define names for portfolio, account and strategy. 

#portfolioName <- accountName <- strategyName <- "FirstPortfolio"
portfolioName <- accountName <- strategyName <- paste0("FirstPortfolio",symbolstring)

print(portfolioName)
# The function rm.strat removes any strategy, portfolio, account, or order book object with the given name. This is important

#rm.strat(strategyName)

print("port")
initPortf(name = portfolioName,
          symbols = symbolstring,
          initDate = initDate)

initAcct(name = accountName,
         portfolios = portfolioName,
         initDate = initDate,
         initEq = init_equity)

initOrders(portfolio = portfolioName,
           symbols = symbolstring,
           initDate = initDate)



# name: the string name of the strategy

# assets: optional list of assets to apply the strategy to.  

# Normally these are defined in the portfolio object

# contstrains: optional portfolio constraints

# store: can be True or False. If True store the strategy in the environment. Default is False
print("strat")
strategy(strategyName, store = TRUE)

ls(all=T)

# .blotter holds the portfolio and account object 

ls(.blotter)

# .strategy holds the orderbook and strategy object

print(ls(.strategy))

print("ind")
add.indicator(strategy = strategyName, 
              name = "EMA", 
              arguments = list(x = quote(Cl(mktdata)), 
                               n = 10), label = "nFast")

add.indicator(strategy = strategyName, 
              name = "EMA", 
              arguments = list(x = quote(Cl(mktdata)), 
                               n = 30), 
              label = "nSlow")

# Add long signal when the fast EMA crosses over slow EMA.

print("sig")
add.signal(strategy = strategyName,
           name="sigCrossover",
           arguments = list(columns = c("nFast", "nSlow"),
                            relationship = "gte"),
           label = "longSignal")

# Add short signal when the fast EMA goes below slow EMA.

add.signal(strategy = strategyName, 
           name = "sigCrossover",
           arguments = list(columns = c("nFast", "nSlow"),
                            relationship = "lt"),
           label = "shortSignal")

# go long when 10-period EMA (nFast) >= 30-period EMA (nSlow)

print("rul")
add.rule(strategyName,
         name= "ruleSignal",
         arguments=list(sigcol="longSignal",
                        sigval=TRUE,
                        orderqty=100,
                        ordertype="market",
                        orderside="long",
                        replace = TRUE, 
                        TxnFees = -10),
         type="enter",
         label="EnterLong") 

# go short when 10-period EMA (nFast) < 30-period EMA (nSlow)

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "shortSignal", 
                          sigval = TRUE, 
                          orderside = "short", 
                          ordertype = "market", 
                          orderqty = -100, 
                          TxnFees = -10,                     
                          replace = TRUE), 
         type = "enter", 
         label = "EnterShort")

# Close long positions when the shortSignal column is True

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "shortSignal", 
                          sigval = TRUE, 
                          orderside = "long", 
                          ordertype = "market", 
                          orderqty = "all", 
                          TxnFees = -10, 
                          replace = TRUE), 
         type = "exit", 
         label = "ExitLong")

# Close Short positions when the longSignal column is True

add.rule(strategyName, 
         name = "ruleSignal", 
         arguments = list(sigcol = "longSignal", 
                          sigval = TRUE, 
                          orderside = "short", 
                          ordertype = "market", 
                          orderqty = "all", 
                          TxnFees = -10, 
                          replace = TRUE), 
         type = "exit", 
         label = "ExitShort")

print("summary")
summary(getStrategy(strategyName))

# Summary results are produced below

print("results")
results <- applyStrategy(strategy= strategyName, portfolios = portfolioName,symbols=symbolstring)

# The applyStrategy() outputs all transactions(from the oldest to recent transactions)that the strategy sends. The first few rows of the applyStrategy() output are shown below

getTxns(Portfolio=portfolioName, Symbol=symbolstring)

mktdata

updatePortf(portfolioName)

dateRange <- time(getPortfolio(portfolioName)$summary)[-1]

updateAcct(portfolioName,dateRange)

updateEndEq(accountName)

print(plot(tail(getAccount(portfolioName)$summary$End.Eq,-1), main = "Portfolio Equity"))

#cleanup
for (name in symbolstring) rm(list = name)
#rm(.blotter)
rm(.stoploss)
rm(.txnfees)
#rm(.strategy)
rm(symbols)

}
)

但是抛出错误 get(symbol, envir = envir) 中出错:未找到对象“QQQ”

具体来说,问题是 FinancialInstrument:::.instrument 指向的内存地址未使用我的封装变量调用(符号字符串)进行更新

最佳答案

apply.paramsetquantstrat已经使用 foreach构造并行执行applyStrategy .

apply.paramset需要做大量的工作,以确保工作人员有可用的环境来完成工作,并收集正确的结果以将它们发送回调用进程。

您要做的最简单的事情可能是使用 apply.paramset 。设置日期和符号参数,并使函数正常运行。

或者,我建议您查看使用并行所需的步骤 foreach施工apply.paramset将其修改为您建议的情况。

另请注意,您的问题询问有关使用 Beowulf 集群和 mclapply 。这行不通。 mclapply只能在单个内存空间中工作。 Beowulf 集群通常不共享单个内存和进程空间。他们通常通过并行库(例如 MPI)分配作业。 apply.paramset已经可以使用 doMPI 在 Beowulf 集群上进行分发后端至foreach 。这就是我们使用 foreach 的原因之一:有多种不同的可用并行后端。 doMC foreach 的后端实际上使用mclapply幕后花絮。

关于r - 如何用 mclapply [并行化] 替换 Quantstrat 'for loop'?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63429617/

相关文章:

r - 计算大量矩阵的有效方法

java - 如何优化这个for循环的计算速度?

r - 继续在 quantstrat 中获取 "dims do not match the length of object"

r - quantStrat 无法识别列名

r - 将因子水平转换为数字

r - 如何避免 R markdown v2 中标题的空格

r - 根据存在的数据帧行分配分组变量R

linux - 在 bash 脚本中使用并行处理分组的输入文件

parallel-processing - 并行处理 vec : how to do safely, 还是不使用不稳定的功能?