apache-spark - 如何加载 word2vec 模型并将其函数调用到映射器中

标签 apache-spark pyspark apache-spark-mllib word2vec

我想加载一个 word2vec 模型并通过执行单词类比任务来评估它(例如 a 对 b 就像 c 对某事?)。为此,首先我加载我的 w2v 模型:

model = Word2VecModel.load(spark.sparkContext, str(sys.argv[1]))

然后我调用映射器来评估模型:

rdd_lines = spark.read.text("questions-words.txt").rdd.map(getAnswers)
getAnswers 函数每次从 questions-words.txt 中读取一行,其中每一行包含问题和评估我的模型的答案(例如雅典希腊巴格达伊拉克,其中 a=雅典,b=希腊,c=巴格达等等=伊拉克)。阅读该行后,我创建了 current_questionactual_answer (例如: current_question=Athens Greece Baghdadactual_answer=Iraq )。之后,我调用用于计算类比的 getAnalogy 函数(基本上,考虑到它计算答案的问题)。最后,在计算完类比之后,我返回答案并将其写入文本文件。

问题是我收到以下异常:

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers.

我认为它被抛出是因为我在 map 函数中使用了模型。这个 question 与我的问题类似,但我不知道如何将该答案应用到我的代码中。我怎么解决这个问题?以下是完整代码:

def getAnalogy(s, model):
    try:
        qry = model.transform(s[0]) - model.transform(s[1]) - model.transform(s[2])    
        res = model.findSynonyms((-1)*qry,5) # return 5 "synonyms"
        res = [x[0] for x in res]
        for k in range(0,3):
            if s[k] in res:
                res.remove(s[k])
        return res[0]
    except ValueError:
        return "NOT FOUND"

def getAnswers (text):
    tmp = text[0].split(' ', 3)
    answer_list = []
    current_question = " ".join(str(x) for x in tmp[:3])
    actual_answer = tmp[-1]

    model_answer = getAnalogy(current_question, model)
    if model_answer is "NOT FOUND":
        answer_list.append("NOT FOUND\n")
    elif model_answer is actual_answer:
        answer_list.append("TRUE\n")
    else:
        answer_list.append("FALSE:\n")
    return answer_list.append


if __name__ == "__main__":

    if len(sys.argv) != 3:
        print("Usage: my_test <file>", file=sys.stderr)
        exit(-1)


    spark = SparkSession\
    .builder\
    .appName("my_test")\
    .getOrCreate()


    model = Word2VecModel.load(spark.sparkContext, str(sys.argv[1]))

    rdd_lines = spark.read.text("questions-words.txt").rdd.map(getAnswers)

    dataframe = rdd_lines.toDF()

    dataframe.write.text(str(sys.argv[2]))

    spark.stop()

最佳答案

正如您已经怀疑的那样,您不能在 map 功能中使用该模型。另一方面,questions-answers.txt文件不是那么大(约 20K 行),因此您最好使用普通 Python 列表推导式进行评估(它本质上是您链接的问题中的第一个建议答案);它并不快,但它只是一项一次性任务。这是一种方法,使用 my getAnalogy function因为您已经为错误处理增加了它(请注意,我已经从 questions-answers.txt 中删除了“注释”行,并且您应该将其转换为小写,您的代码中似乎没有这样做):


from pyspark.mllib.feature import Word2Vec, Word2VecModel
model = Word2VecModel.load(sc, "word2vec/demo_200") # model built with k=200
with open('/home/ctsats/word2vec/questions-words.txt') as f:
    lines = f.readlines()
lines2 = [x.lower() for x in lines] # all to lowercase
lines3 = [x.strip('\n') for x in lines2] # remove end-of-line characters
lines4 = [x.split(' ',3) for x in lines3]
lines4[0] # check:
# ['Athens', 'Greece', 'Baghdad', 'Iraq']

def getAnswers (text, model):
    actual_answer = text[-1]
    question = [text[0], text[1], text[2]]
    model_answer = getAnalogy(question, model)
    if model_answer == "NOT FOUND":
        correct_answer = "NOT FOUND"
    elif model_answer == actual_answer:
        correct_answer = "TRUE"
    else:
        correct_answer = "FALSE"
    return text, model_answer, correct_answer

因此,您的评估列表现在可以构建为

answer_list = [getAnswers(x, model) for x in lines4]    

这是前 20 个条目的示例(模型为 k=200 ):

[(['athens', 'greece', 'baghdad', 'iraq'], u'turkey', 'FALSE'),
 (['athens', 'greece', 'bangkok', 'thailand'], u'turkey', 'FALSE'),
 (['athens', 'greece', 'beijing', 'china'], u'albania', 'FALSE'),
 (['athens', 'greece', 'berlin', 'germany'], u'germany', 'TRUE'),
 (['athens', 'greece', 'bern', 'switzerland'], u'liechtenstein', 'FALSE'),
 (['athens', 'greece', 'cairo', 'egypt'], u'albania', 'FALSE'),
 (['athens', 'greece', 'canberra', 'australia'], u'liechtenstein', 'FALSE'),
 (['athens', 'greece', 'hanoi', 'vietnam'], u'turkey', 'FALSE'),
 (['athens', 'greece', 'havana', 'cuba'], u'turkey', 'FALSE'),
 (['athens', 'greece', 'helsinki', 'finland'], u'finland', 'TRUE'),
 (['athens', 'greece', 'islamabad', 'pakistan'], u'turkey', 'FALSE'),
 (['athens', 'greece', 'kabul', 'afghanistan'], u'albania', 'FALSE'),
 (['athens', 'greece', 'london', 'england'], u'italy', 'FALSE'),
 (['athens', 'greece', 'madrid', 'spain'], u'portugal', 'FALSE'),
 (['athens', 'greece', 'moscow', 'russia'], u'russia', 'TRUE'),
 (['athens', 'greece', 'oslo', 'norway'], u'albania', 'FALSE'),
 (['athens', 'greece', 'ottawa', 'canada'], u'moldova', 'FALSE'),
 (['athens', 'greece', 'paris', 'france'], u'france', 'TRUE'),
 (['athens', 'greece', 'rome', 'italy'], u'italy', 'TRUE'),
 (['athens', 'greece', 'stockholm', 'sweden'], u'norway', 'FALSE')]

关于apache-spark - 如何加载 word2vec 模型并将其函数调用到映射器中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41046843/

相关文章:

scala - 检索 Spark Mllib StringIndexer 列映射

apache-spark - spark-如何在使用 RowMatrix 计算相似度后检索项目对

apache-spark - Apache Zeppelin 和 Spark 流 : Twitter Example only works local

apache-spark - AWS EMR Spark : Error: Cannot load main class from JAR

apache-spark - Spark如何在cassandra表之间复制数据?

python - 在 Pyspark 中连接多个 csv 时添加路径位置列

python-3.x - PySpark - py4j.protocol.Py4JJavaError,在我的 win10 笔记本电脑上运行 Spark 线性回归模型时

apache-spark - 在spark独立模式下,master和executors位于一台机器上吗?

scala - 在 Spark 和 Scala 中创建数据集时出现问题

apache-spark - 分割后变换数组元素