multithreading - selenium 线程可以安全地使用 Python 进行抓取吗?

标签 multithreading python-2.7 selenium cookies thread-safety

我正在执行一个带有线程的 Python 脚本,在给定一个我放入队列的“查询”术语的情况下,我使用查询参数创建 url,设置 cookie 并解析网页以返回产品和 url那些产品。这是脚本。

任务:对于一组给定的查询,将前 20 个产品 ID 存储在一个文件中,如果查询返回的结果较少,则降低 #。

我记得读过 Selenium 不是线程安全的。只是想确保这个问题是因为那个限制而发生的,有没有办法让它在并发线程中工作?主要问题是脚本受 I/O 限制,抓取大约 3000 个 url 非常慢。

from pyvirtualdisplay import Display
from data_mining.scraping import scraping_conf as sf #custom file with rules for scraping
import Queue
import threading
import urllib2
import time
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By

num_threads=5
COOKIES=sf.__MERCHANT_PARAMS[merchant_domain]['COOKIES']
query_args =sf.__MERCHANT_PARAMS[merchant_domain]['QUERY_ARGS']


class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, queue, out_queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.out_queue = out_queue

    def url_from_query(self,query):
        for key,val in query_args.items():
            if query_args[key]=='query' :
                query_args[key]=query
                print "query", query
            try :
                url = base_url+urllib.urlencode(query_args)
                print "url"
                return url
            except Exception as e:
                log()
                return None


    def init_driver_and_scrape(self,base_url,query,url):
        # Will use Pyvirtual display later 
        #display = Display(visible=0, size=(1024, 768))
        #display.start()
        fp = webdriver.FirefoxProfile()
        fp.set_preference("browser.download.folderList",2)
        fp.set_preference("javascript.enabled", True)
        driver = webdriver.Firefox(firefox_profile=fp)
        driver.delete_all_cookies()
        driver.get(base_url)
        for key,val in COOKIES[exp].items():
            driver.add_cookie({'name':key,'value':val,'path':'/','domain': merchant_domain,'secure':False,'expiry':None})
        print "printing cookie name & value"
        for cookie in driver.get_cookies():
            if cookie['name'] in COOKIES[exp].keys():
                print cookie['name'],"-->", cookie['value']
        driver.get(base_url+'search=junk') # To counter any refresh issues
        driver.implicitly_wait(20)
        driver.execute_script("window.scrollTo(0, 2000)")
        print "url inside scrape", url
        if url is not None :
            flag = True
            i=-1
            row_data,row_res=(),()
            while flag :
                i=i+1
                try :
                    driver.get(url)
                    key=sf.__MERCHANT_PARAMS[merchant_domain]['GET_ITEM_BY_ID']+str(i)
                    print key
                    item=driver.find_element_by_id(key)
                    href=item.get_attribute("href")
                    prod_id=eval(sf.__MERCHANT_PARAMS[merchant_domain]['PRODUCTID_EVAL_FUNC'])
                    row_res=row_res+(prod_id,)
                    print url,row_res
                except Exception as e:
                    log()
                    flag =False
            driver.delete_all_cookies()
            driver.close()

            return query+"|"+str(row_res)+"\n"  #  row_data, row_res
        else :
            return  [query+"|"+"None"]+"\n"
    def run(self):
        while True:
            #grabs host from queue
            query = self.queue.get()
            url=self.url_from_query(query)
            print "query, url", query, url
            data=self.init_driver_and_scrape(base_url,query,url)
            self.out_queue.put(data)

            #signals to queue job is done
            self.queue.task_done()


class DatamineThread(threading.Thread):
    """Threaded Url Grab"""
    def __init__(self, out_queue):
        threading.Thread.__init__(self)
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            data = self.out_queue.get()
            fh.write(str(data)+"\n")
            #signals to queue job is done
            self.out_queue.task_done()

start = time.time()

def log():
    logging_hndl=logging.getLogger("get_results_url")
    logging_hndl.exception("Stacktrace from "+"get_results_url")


df=pd.read_csv(fh_query, sep='|',skiprows=0,header=0,usecols=None,error_bad_lines=False) # read all queries
query_list=list(df['query'].values)[0:3]

def main():
    exp="Control"
    #spawn a pool of threads, and pass them queue instance
    for i in range(num_threads):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    #populate queue with data
    print query_list
    for query in query_list:
        queue.put(query)

    for i in range(num_threads):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()


    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()

main()
print "Elapsed Time: %s" % (time.time() - start)

虽然我应该得到每个 url 页面的所有搜索结果,但我只得到 1st , i=0 搜索卡,这不会对所有查询/url 执行。我究竟做错了什么 ?

我的期望-

url inside scrape http://<masked>/search=nike+costume
searchResultsItem0
url inside scrape http://<masked>/search=red+tops
searchResultsItem0
url inside scrape http://<masked>/search=halloween+costumes
searchResultsItem0
and more searchResultsItem(s) , like searchResultsItem1,searchResultsItem2 and so on..

我得到了什么

url inside scrape http://<masked>/search=nike+costume
searchResultsItem0
url inside scrape http://<masked>/search=nike+costume
searchResultsItem0
url inside scrape http://<masked>/search=nike+costume
searchResultsItem0

骨架代码取自

http://www.ibm.com/developerworks/aix/library/au-threadingpython/

此外,当我使用 Pyvirtual 显示时,它是否也适用于线程?我还使用了具有相同 Selenium 代码的进程,但它给出了相同的错误。 本质上它打开了 3 个 Firefox 浏览器,带有确切的 url,而它应该从队列中的不同项目打开它们。在这里,我将规则存储在将导入为 sf 的文件中,该文件具有基域的所有自定义属性。

由于设置 cookies 是我脚本的一个组成部分,我不能使用 dryscrape。

编辑: 我试图定位错误,这是我发现的 - 在自定义规则文件中,我在上面调用了“sf”,我将 QUERY_ARGS 定义为

__MERCHANT_PARAMS = {
  "some_domain.com" :
  {
    COOKIES: { <a dict of dict, masked here>
              },
    ... more such rules
    QUERY_ARGS:{'search':'query'}
  }

所以真正发生的是,在调用时,

query_args =sf.__MERCHANT_PARAMS[merchant_domain]['QUERY_ARGS'] - 这应该返回字典 {'search':'query'},当它返回时,

AttributeError: 'module' object has no attribute '_ThreadUrl__MERCHANT_PARAMS'

这是我不明白线程如何传递“_ThreadUrl__”的地方我还尝试在 url_from_query 方法中重新初始化 query_args,但这不起作用。

关于我做错了什么的任何指示?

最佳答案

我可能会很晚才回复这个。然而,我测试了它 python2.7 并且这两个选项 multithreading 和 mutliprocess 都与 selenium 一起工作并且它打开了两个单独的浏览器。

关于multithreading - selenium 线程可以安全地使用 Python 进行抓取吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26972131/

相关文章:

python - 在引号之间匹配字符串时出现问题,但如果行以 % 开头则不会出现问题

Python 子进程卡在communicate() 调用上

java - URL 无法使用 WebDriver 打开

c# - 如何暂停和继续 TPL 任务?

python - 比较列表中的文件

c# - TryExecuteTask(task) 总是阻塞吗?

c# - 更改 eBay 页面内容/DOM 的最简单方法

java - 无法使用 Xpath 找到元素

ios - 使用 "performSelectorOnMainThread"在 iOS 上进行上下文切换

c - 线程结构作为函数参数 C