pyspark - Hyperopt 无法使用跟踪 URI : databricks 执行 mlflow.end_run()

标签 pyspark databricks azure-databricks mlflow hyperopt

我正在使用 Azure Databricks + Hyperopt + MLflow 对小型数据集进行一些超参数调整。看起来作业正在运行,并且我在 MLflow 中获得输出,但作业以以下错误消息结束:

Hyperopt failed to execute mlflow.end_run() with tracking URI: databricks

这是我的代码,其中一些信息经过编辑:

from pyspark.sql import SparkSession

# spark session initialization
spark = (SparkSession.builder.getOrCreate())
sc = spark.sparkContext

# Data Processing
import pandas as pd
import numpy as np
# Hyperparameter Tuning
from hyperopt import fmin, tpe, hp, anneal, Trials, space_eval, SparkTrials, STATUS_OK
from sklearn.model_selection import RepeatedStratifiedKFold, cross_val_score
# Modeling
from sklearn.ensemble import RandomForestClassifier
# cleaning
import gc
# tracking
import mlflow
# track runtime
from datetime import date, datetime

mlflow.set_experiment('/user/myname/myexp')
# notebook settings \ variable settings
n_splits = #
n_repeats = #
max_evals = #

dfL = pd.read_csv("/my/data/loc/mydata.csv")

x_train = dfL[['f1','f2','f3']]
y_train = dfL['target']

def define_model(params):
    model = RandomForestClassifier(n_estimators=int(params['n_estimators']),
                                   criterion=params['criterion'], 
                                   max_depth=int(params['max_depth']), 
                                   min_samples_split=params['min_samples_split'], 
                                   min_samples_leaf=params['min_samples_leaf'], 
                                   min_weight_fraction_leaf=params['min_weight_fraction_leaf'], 
                                   max_features=params['max_features'], 
                                   max_leaf_nodes=None, 
                                   min_impurity_decrease=params['min_impurity_decrease'], 
                                   min_impurity_split=None, 
                                   bootstrap=params['bootstrap'], 
                                   oob_score=False, 
                                   n_jobs=-1, 
                                   random_state=int(params['random_state']), 
                                   verbose=0, 
                                   warm_start=False, 
                                   class_weight={0:params['class_0_weight'], 1:params['class_1_weight']})
        return model


space = {'n_estimators': hp.quniform('n_estimators', #, #, #),
         'criterion': hp.choice('#', ['#','#']),
         'max_depth': hp.quniform('max_depth', #, #, #),
         'min_samples_split': hp.quniform('min_samples_split', #, #, #),
         'min_samples_leaf': hp.quniform('min_samples_leaf', #, #, #),
         'min_weight_fraction_leaf': hp.quniform('min_weight_fraction_leaf', #, #, #),
         'max_features': hp.quniform('max_features', #, #, #),
         'min_impurity_decrease': hp.quniform('min_impurity_decrease', #, #, #),
         'bootstrap': hp.choice('bootstrap', [#,#]),
         'random_state': hp.quniform('random_state', #, #, #),
         'class_0_weight': hp.choice('class_0_weight', [#,#,#]),
         'class_1_weight': hp.choice('class_1_weight', [#,#,#])}

# define hyperopt objective
def objective(params, n_splits=n_splits, n_repeats=n_repeats):

    # define model
    model = define_model(params)
    # get cv splits
    kfold = RepeatedStratifiedKFold(n_splits=n_splits, n_repeats=n_repeats, random_state=1331)
    # define and run sklearn cv scorer
    scores = cross_val_score(model, x_train, y_train, cv=kfold, scoring='roc_auc')
    score = scores.mean()

    return {'loss': score*(-1), 'status': STATUS_OK}

spark_trials = SparkTrials(parallelism=36, spark_session=spark)
with mlflow.start_run():
  best = fmin(objective, space, algo=tpe.suggest, trials=spark_trials, max_evals=max_evals)

最后我得到..

100%|██████████| 200/200 [1:35:28<00:00, 100.49s/trial, best loss: -0.9584565527065526]

Hyperopt failed to execute mlflow.end_run() with tracking URI: databricks

Exception: 'MLFLOW_RUN_ID'

Total Trials: 200: 200 succeeded, 0 failed, 0 cancelled.

我的 Azure Databricks 集群是..

6.6 ML (includes Apache Spark 2.4.5, Scala 2.11)
Standard_DS3_v2
min 9 max 18 nodes

我做错了什么还是这是一个错误?

最佳答案

此消息是一个已知(但无害)问题,已在 MLR 7.0 中修复。我尝试在 DBR 7.0 ML 集群上执行,它正在工作。

你不需要start_run(); SparkTrials 会自动为您启动运行。错误只是因为这个。

因此,使用 SparkTrials,即使没有 start_run(),它仍然可以工作; SparkTrials 应该会自动运行并为您记录日志。

关于pyspark - Hyperopt 无法使用跟踪 URI : databricks 执行 mlflow.end_run(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62160734/

相关文章:

Databricks Delta Lake 表历史记录错误 : AnalysisException: Cannot time travel Delta table to version 1. 可用版本 : [10, 22]

rest - 使用 Spark(Databricks) 的并行 REST API 请求

apache-spark - 如何在pyspark RDD中找到整个列数据的总和?

python - spark 谓词下推是否适用于 JDBC?

scala - 使用 databricks-connect 连接到 python 中的 databricks 时出错

python - Azure Kusto Spark 在写入中重写 ingestion_time()

python - 如何在 PySpark 数据框中添加变量/条件列

pyspark - 如何对pyspark中每个组内的变量进行排序?

Azure Databricks 与 ADLA 进行处理

pyspark - 如何在 Databricks 笔记本中使用 Airflow 变量?