2012-05-21 18 views
6

मेरी प्रोजेक्ट में, मैं समानांतर कार्यों को चलाने के लिए multiprocessing कक्षा का उपयोग करता हूं। मैं इसके बजाय threading का उपयोग करना चाहता हूं, क्योंकि इसमें बेहतर प्रदर्शन है (मेरे कार्य टीसीपी/आईपी बाध्य हैं, सीपीयू या आई/ओ बाध्य नहीं हैं)।मल्टीप्रोसेसिंग से थ्रेडिंग

multiprocessingPool.imap_unordered और Pool.map_async के रूप में अद्भुत काम करता है, यह है कि threading वर्ग में मौजूद नहीं है है।

इसके बजाय threading का उपयोग करने के लिए मेरे कोड को बदलने का सही तरीका क्या है? प्रलेखन multiprocessing.dummy वर्ग प्रस्तुत करता है, जो threading कक्षा के लिए एक रैपर है। हालांकि कि (कम से कम अजगर 2.7.3 पर) त्रुटियों के बहुत सारे को जन्म देती है:

pool = multiprocessing.Pool(processes) 
    File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 150, in Pool 
    return ThreadPool(processes, initializer, initargs) 
    File "C:\python27\lib\multiprocessing\pool.py", line 685, in __init__ 
    Pool.__init__(self, processes, initializer, initargs) 
    File "C:\python27\lib\multiprocessing\pool.py", line 136, in __init__ 
    self._repopulate_pool() 
    File "C:\python27\lib\multiprocessing\pool.py", line 199, in _repopulate_pool 
    w.start() 
    File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 73, in start 
    self._parent._children[self] = None 
AttributeError: '_DummyThread' object has no attribute '_children' 

संपादित करें: क्या वास्तव में होता यह है कि मैं एक जीयूआई है कि एक अलग धागा चलाता है (जीयूआई gettint अटक से रोकने के लिए) । वह धागा विशिष्ट खोज फ़ंक्शन चलाता है जिसमें ThreadPool विफल रहता है।

संपादित करें 2: बगफिक्स was fixed और भविष्य में रिलीज़ में शामिल किया जाएगा। एक क्रेशर तय करने के लिए बढ़िया!

import urllib2, htmllib, formatter 
import multiprocessing.dummy as multiprocessing 
import xml.dom.minidom 
import os 
import string, random 
from urlparse import parse_qs, urlparse 

from useful_util import retry 
import config 
from logger import log 

class LinksExtractor(htmllib.HTMLParser): 
    def __init__(self, formatter): 
     htmllib.HTMLParser.__init__(self, formatter) 
     self.links = [] 
     self.ignoredSites = config.WebParser_ignoredSites 

    def start_a(self, attrs): 
     for attr in attrs: 
      if attr[0] == "href" and attr[1].endswith(".mp3"): 
       if not filter(lambda x: (x in attr[1]), self.ignoredSites): 
        self.links.append(attr[1]) 

    def get_links(self): 
     return self.links 


def GetLinks(url, returnMetaUrlObj=False): 
    ''' 
    Function gather links from a url. 
    @param url: Url Address. 
    @param returnMetaUrlObj: If true, returns a MetaUrl Object list. 
          Else, returns a string list. Default is False. 

    @return links: Look up. 
    ''' 
    htmlparser = LinksExtractor(formatter.NullFormatter()) 

    try: 
     data = urllib2.urlopen(url) 
    except (urllib2.HTTPError, urllib2.URLError) as e: 
     log.error(e) 
     return [] 
    htmlparser.feed(data.read()) 
    htmlparser.close() 

    links = list(set(htmlparser.get_links())) 

    if returnMetaUrlObj: 
     links = map(MetaUrl, links) 

    return links 

def isAscii(s): 
    "Function checks is the string is ascii." 
    try: 
     s.decode('ascii') 
    except (UnicodeEncodeError, UnicodeDecodeError): 
     return False 
    return True 

@retry(Exception, logger=log) 
def parse(song, source): 
    ''' 
    Function parses the source search page and returns the .mp3 links in it. 
    @param song: Search string. 
    @param source: Search website source. Value can be dilandau, mp3skull, youtube, seekasong. 

    @return links: .mp3 url links. 
    ''' 
    source = source.lower() 
    if source == "dilandau": 
     return parse_dilandau(song) 
    elif source == "mp3skull": 
     return parse_Mp3skull(song) 
    elif source == "SeekASong": 
     return parse_SeekASong(song) 
    elif source == "youtube": 
     return parse_Youtube(song) 

    log.error('no source "%s". (from parse function in WebParser)') 
    return [] 

def parse_dilandau(song, pages=1): 
    "Function connects to Dilandau.eu and returns the .mp3 links in it" 
    if not isAscii(song): # Dilandau doesn't like unicode. 
     log.warning("Song is not ASCII. Skipping on dilandau") 
     return [] 

    links = [] 
    song = urllib2.quote(song.encode("utf8")) 

    for i in range(pages): 
     url = 'http://en.dilandau.eu/download_music/%s-%d.html' % (song.replace('-','').replace(' ','-').replace('--','-').lower(),i+1) 
     log.debug("[Dilandau] Parsing %s... " % url) 
     links.extend(GetLinks(url, returnMetaUrlObj=True)) 
    log.debug("[Dilandau] found %d links" % len(links)) 

    for metaUrl in links: 
     metaUrl.source = "Dilandau" 

    return links 

def parse_Mp3skull(song, pages=1): 
    "Function connects to mp3skull.com and returns the .mp3 links in it" 
    links = [] 
    song = urllib2.quote(song.encode("utf8")) 

    for i in range(pages): 
     # http://mp3skull.com/mp3/how_i_met_your_mother.html 
     url = 'http://mp3skull.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower()) 
     log.debug("[Mp3skull] Parsing %s... " % url) 
     links.extend(GetLinks(url, returnMetaUrlObj=True)) 
    log.debug("[Mp3skull] found %d links" % len(links)) 

    for metaUrl in links: 
     metaUrl.source = "Mp3skull" 

    return links 

def parse_SeekASong(song): 
    "Function connects to seekasong.com and returns the .mp3 links in it" 
    song = urllib2.quote(song.encode("utf8")) 

    url = 'http://www.seekasong.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower()) 
    log.debug("[SeekASong] Parsing %s... " % url) 
    links = GetLinks(url, returnMetaUrlObj=True) 
    for metaUrl in links: 
     metaUrl.source = "SeekASong" 
    log.debug("[SeekASong] found %d links" % len(links)) 

    return links 

def parse_Youtube(song, amount=10): 
    ''' 
    Function searches a song in youtube.com and returns the clips in it using Youtube API. 
    @param song: The search string. 
    @param amount: Amount of clips to obtain. 

    @return links: List of links. 
    ''' 
    "Function connects to youtube.com and returns the .mp3 links in it" 
    song = urllib2.quote(song.encode("utf8")) 
    url = r"http://gdata.youtube.com/feeds/api/videos?q=%s&max-results=%d&v=2" % (song.replace(' ', '+'), amount) 
    urlObj = urllib2.urlopen(url, timeout=4) 
    data = urlObj.read() 
    videos = xml.dom.minidom.parseString(data).getElementsByTagName('feed')[0].getElementsByTagName('entry') 

    links = [] 
    for video in videos: 
     youtube_watchurl = video.getElementsByTagName('link')[0].attributes.item(0).value 
     links.append(get_youtube_hightest_quality_link(youtube_watchurl)) 

    return links 

def get_youtube_hightest_quality_link(youtube_watchurl, priority=config.youtube_quality_priority): 
    ''' 
    Function returns the highest quality link for a specific youtube clip. 
    @param youtube_watchurl: The Youtube Watch Url. 
    @param priority: A list represents the qualities priority. 

    @return MetaUrlObj: MetaUrl Object. 
    ''' 
    video_id = parse_qs(urlparse(youtube_watchurl).query)['v'][0] 
    youtube_embedded_watchurl = "http://www.youtube.com/embed/%s?autoplay=1" % video_id 

    d = get_youtube_dl_links(video_id) 
    for x in priority: 
     if x in d.keys(): 
      return MetaUrl(d[x][0], 'youtube', d['VideoName'], x, youtube_embedded_watchurl) 
    log.error("No Youtube link has been found in get_youtube_hightest_quality_link.") 
    return "" 

@retry(Exception, logger=log) 
def get_youtube_dl_links(video_id): 
    ''' 
    Function gets the download links for a youtube clip. 
    This function parses the get_video_info format of youtube. 

    @param video_id: Youtube Video ID. 
    @return d: A dictonary of qualities as keys and urls as values. 
    ''' 
    d = {} 

    url = r"http://www.youtube.com/get_video_info?video_id=%s&el=vevo" % video_id 

    urlObj = urllib2.urlopen(url, timeout=12) 
    data = urlObj.read() 
    data = urllib2.unquote(urllib2.unquote(urllib2.unquote(data))) 
    data = data.replace(',url', '\nurl') 
    data = data.split('\n') 

    for line in data: 
     if 'timedtext' in line or 'status=fail' in line or '<AdBreaks>' in line: 
      continue 

     try: 
      url = line.split('&quality=')[0].split('url=')[1] 
      quality = line.split('&quality=')[1].split('&')[0] 
     except: 
      continue 
     if quality in d: 
      d[quality].append(url) 
     else: 
      d[quality] = [url] 

    try: 
     videoName = "|".join(data).split('&title=')[1].split('&')[0] 
    except Exception, e: 
     log.error("Could not parse VideoName out of get_video_info (%s)" % str(e)) 
     videoName = "" 

    videoName = unicode(videoName, 'utf-8') 
    d['VideoName'] = videoName.replace('+',' ').replace('--','-') 
    return d 


class NextList(object): 
    "A list with a 'next' method." 
    def __init__(self, l): 
     self.l = l 
     self.next_index = 0 

    def next(self): 
     if self.next_index < len(self.l): 
      value = self.l[self.next_index] 
      self.next_index += 1 
      return value 
     else: 
      return None 

    def isEOF(self): 
     " Checks if the list has reached the end " 
     return (self.next_index >= len(self.l)) 

class MetaUrl(object): 
    "a url strecture data with many metadata" 
    def __init__(self, url, source="", videoName="", quality="", youtube_watchurl=""): 
     self.url = str(url) 
     self.source = source 
     self.videoName = videoName # Youtube Links Only 
     self.quality = quality # Youtube Links Onlys 
     self.youtube_watchurl = youtube_watchurl # Youtube Links Onlys 

    def __repr__(self): 
     return "<MetaUrl '%s' | %s>" % (self.url, self.source) 


def search(song, n, processes=config.search_processes): 
    ''' 
    Function searches song and returns n valid .mp3 links. 
    @param song: Search string. 
    @param n: Number of songs. 
    @param processes: Number of processes to launch in the subprocessing pool. 
    ''' 
    linksFromSources = [] 
    pool = multiprocessing.Pool(processes) 

    args = [(song, source) for source in config.search_sources] 
    imapObj = pool.imap_unordered(_parse_star, args) 
    for i in range(len(args)): 
     linksFromSources.append(NextList(imapObj.next(15))) 
    pool.terminate() 

    links = [] 
    next_source = 0 
    while len(links) < n and not all(map(lambda x: x.isEOF(), linksFromSources)): 
     nextItem = linksFromSources[next_source].next() 
     if nextItem: 
      log.debug("added song %.80s from source ID %d (%s)" % (nextItem.url.split('/')[-1], next_source, nextItem.source)) 
      links.append(nextItem) 

     if len(linksFromSources) == next_source+1: 
      next_source = 0 
     else: 
      next_source += 1 

    return links 

def _parse_star(args): 
    return parse(*args) 
+0

क्या की तरह अपने कोड दिखता है? संदर्भ होना अच्छा लगेगा। –

+0

यहां कोड है: http://pastebin.com/F8QVUtkP। इसमें कई परियोजना फाइलों के संदर्भ हैं, लेकिन मूल रूप से मुख्य कार्य पंक्ति 24 9 पर 'खोज()' है। प्रश्न में अपवाद 'पूल = multiprocessing.Pool (प्रक्रियाओं) 'आदेश पर उठाया गया है। ऐसा तब नहीं होता जब मूल मल्टीप्रोसेसिंग क्लास का उपयोग किया जाता है। – iTayb

+0

@iTayb: आप 'प्रक्रियाओं' के रूप में क्या तर्क पारित कर रहे हैं? –

उत्तर

6

मैं अपनी मशीन पर अपनी समस्या का पुन: उत्पन्न नहीं कर सकता। आपके processes चर में क्या है? क्या यह int है?

Python 2.7.3 (default, Apr 10 2012, 23:31:26) [MSC v.1500 32 bit (Intel)] on win32 
Type "help", "copyright", "credits" or "license" for more information. 
>>> import multiprocessing.dummy as multiprocessing 
>>> pool = multiprocessing.Pool(5) 
>>> pool 
<multiprocessing.pool.ThreadPool object at 0x00C7DF90> 
>>> 

---- संपादित ----

आप शायद भी दोबारा जांच करता है, तो आप अपने मानक पुस्तकालय में गड़बड़ था, एक स्वच्छ किसी अन्य फ़ोल्डर में अजगर 2.7.3 के स्थापित करने की कोशिश करना चाहते हैं।

---- संपादित 2 ----

आप जल्दी से इस तरह यह पैच कर सकते हैं:

import multiprocessing.dummy 
import weakref 
import threading 

class Worker(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 

    def run(self): 
     poll = multiprocessing.dummy.Pool(5) 
     print str(poll) 

w = Worker() 
w._children = weakref.WeakKeyDictionary() 
w.start() 
+0

आपका कोड यहां काम करता है। दिलचस्प है, अगर मैंने इसे अपलोड किया है तो वास्तव में काम करता है अगर आप इसे सीधे चलाते हैं। वास्तव में क्या होता है कि मेरे पास एक जीयूआई है जो एक अलग धागा चलाता है (जीयूआई को गेटिंट फंसे से रोकने के लिए)। वह धागा विशिष्ट 'खोज' फ़ंक्शन चलाता है जिसमें 'थ्रेडपूल' होता है जो विफल रहता है। – iTayb

+1

आप सही हैं, मुझे लगता है कि पाइथन ने ऐसी परिस्थिति को कवर नहीं किया है जिसे आप एक गैर-प्रक्रिया होस्ट से थ्रेडपूल बनाते हैं। समाधान के लिए 2 संपादित करें देखें। या, कम हैकी, आप अपने जीयूआई थ्रेड में 'पोल' बना सकते हैं और इसे अपने कार्यकर्ता धागे को उपयोग करने के लिए पास कर सकते हैं। – xbtsw

+1

धन्यवाद। मैंने पायथन देव टीम में एक बग मुद्दा भी जारी किया: http://bugs.python.org/issue14881। आपका बहुत बहुत धन्यवाद! – iTayb

संबंधित मुद्दे