diff --git a/scripts/urlgrabber-ext-down b/scripts/urlgrabber-ext-down new file mode 100755 index 0000000..c37e6a8 --- /dev/null +++ b/scripts/urlgrabber-ext-down @@ -0,0 +1,55 @@ +#! /usr/bin/python +# A very simple external downloader + +import time, os, errno, sys +from urlgrabber.grabber import \ + _readlines, URLGrabberOptions, _loads, \ + PyCurlFileObject, URLGrabError + +def write(fmt, *arg): + try: os.write(1, fmt % arg) + except OSError, e: + if e.arg[0] != errno.EPIPE: raise + sys.exit(1) + +class ProxyProgress: + def start(self, *d1, **d2): + self.next_update = 0 + def update(self, _amount_read): + t = time.time() + if t < self.next_update: return + self.next_update = t + 0.31 + write('%d %d\n', self._id, _amount_read) + +def main(): + import signal + signal.signal(signal.SIGINT, lambda n, f: sys.exit(1)) + cnt = 0 + while True: + lines = _readlines(0) + if not lines: break + for line in lines: + cnt += 1 + opts = URLGrabberOptions() + opts._id = cnt + for k in line.split(' '): + k, v = k.split('=', 1) + setattr(opts, k, _loads(v)) + if opts.progress_obj: + opts.progress_obj = ProxyProgress() + opts.progress_obj._id = cnt + tm = time.time() + try: + fo = PyCurlFileObject(opts.url, opts.filename, opts) + fo._do_grab() + fo.fo.close() + size = fo._amount_read + dlsz = size - fo._reget_length + ug_err = 'OK' + except URLGrabError, e: + size = dlsz = 0 + ug_err = '%d %s' % e.args + write('%d %d %d %.3f %s\n', opts._id, size, dlsz, time.time() - tm, ug_err) + +if __name__ == '__main__': + main() diff --git a/setup.py b/setup.py index d0b87b8..bfa4a18 100644 --- a/setup.py +++ b/setup.py @@ -15,8 +15,10 @@ url = _urlgrabber.__url__ packages = ['urlgrabber'] package_dir = {'urlgrabber':'urlgrabber'} scripts = ['scripts/urlgrabber'] -data_files = [('share/doc/' + name + '-' + version, - ['README','LICENSE', 'TODO', 'ChangeLog'])] +data_files = [ + ('share/doc/' + name + '-' + version, ['README','LICENSE', 'TODO', 'ChangeLog']), + ('libexec', ['scripts/urlgrabber-ext-down']), +] options = { 'clean' : { 'all' : 1 } } classifiers = [ 'Development Status :: 4 - Beta', diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py index 38ae1f7..094be77 100644 --- a/urlgrabber/grabber.py +++ b/urlgrabber/grabber.py @@ -263,6 +263,33 @@ GENERAL ARGUMENTS (kwargs) What type of name to IP resolving to use, default is to do both IPV4 and IPV6. + async = (key, limit) + + When this option is set, the urlgrab() is not processed immediately + but queued. parallel_wait() then processes grabs in parallel, limiting + the numer of connections in each 'key' group to at most 'limit'. + + max_connections + + The global connection limit. + + timedhosts + + The filename of the host download statistics. If defined, urlgrabber + will update the stats at the end of every download. At the end of + parallel_wait(), the updated stats are saved. If synchronous grabs + are used, you should call th_save(). + + default_speed, half_life + + These options only affect the async mirror selection code. + The default_speed option sets the speed estimate for mirrors + we have never downloaded from, and defaults to 1 MBps. + + The speed estimate also drifts exponentially from the speed + actually measured to the default speed, with default + period of 30 days. + RETRY RELATED ARGUMENTS @@ -343,6 +370,15 @@ RETRY RELATED ARGUMENTS but it cannot (without severe trickiness) prevent the exception from being raised. + failfunc = None + + The callback that gets called when urlgrab request fails. + If defined, urlgrab() calls it instead of raising URLGrabError. + Callback syntax is identical to failure_callback. + + Contrary to failure_callback, it's called only once. It's primary + purpose is to use urlgrab() without a try/except block. + interrupt_callback = None This callback is called if KeyboardInterrupt is received at any @@ -444,7 +480,7 @@ import pycurl from ftplib import parse150 from StringIO import StringIO from httplib import HTTPException -import socket +import socket, select, fcntl from byterange import range_tuple_normalize, range_tuple_to_header, RangeError try: @@ -878,6 +914,7 @@ class URLGrabberOptions: self.retry = None self.retrycodes = [-1,2,4,5,6,7] self.checkfunc = None + self.failfunc = _do_raise self.copy_local = 0 self.close_connection = 0 self.range = None @@ -886,6 +923,7 @@ class URLGrabberOptions: self.keepalive = 1 self.proxies = None self.libproxy = False + self.proxy = None self.reget = None self.failure_callback = None self.interrupt_callback = None @@ -913,6 +951,12 @@ class URLGrabberOptions: self.size = None # if we know how big the thing we're getting is going # to be. this is ultimately a MAXIMUM size for the file self.max_header_size = 2097152 #2mb seems reasonable for maximum header size + self.async = None # blocking by default + self.mirror_group = None + self.max_connections = 5 + self.timedhosts = None + self.half_life = 30*24*60*60 # 30 days + self.default_speed = 1e6 # 1 MBit def __repr__(self): return self.format() @@ -932,6 +976,17 @@ class URLGrabberOptions: s = s + indent + '}' return s +def _do_raise(obj): + raise obj.exception + +def _run_callback(cb, obj): + if not cb: + return + if callable(cb): + return cb(obj) + cb, arg, karg = cb + return cb(obj, *arg, **karg) + class URLGrabber(object): """Provides easy opening of URLs with a variety of options. @@ -977,10 +1032,9 @@ class URLGrabber(object): if DEBUG: DEBUG.info('exception: %s', exception) if callback: if DEBUG: DEBUG.info('calling callback: %s', callback) - cb_func, cb_args, cb_kwargs = self._make_callback(callback) obj = CallbackObject(exception=exception, url=args[0], tries=tries, retry=opts.retry) - cb_func(obj, *cb_args, **cb_kwargs) + _run_callback(callback, obj) if (opts.retry is None) or (tries == opts.retry): if DEBUG: DEBUG.info('retries exceeded, re-raising') @@ -1043,30 +1097,36 @@ class URLGrabber(object): elif not opts.range: if not opts.checkfunc is None: - cb_func, cb_args, cb_kwargs = \ - self._make_callback(opts.checkfunc) - obj = CallbackObject() - obj.filename = path - obj.url = url - apply(cb_func, (obj, )+cb_args, cb_kwargs) + obj = CallbackObject(filename=path, url=url) + _run_callback(opts.checkfunc, obj) return path + if opts.async: + opts.url = url + opts.filename = filename + opts.size = int(opts.size or 0) + _async_queue.append(opts) + return filename + def retryfunc(opts, url, filename): + tm = time.time() fo = PyCurlFileObject(url, filename, opts) try: fo._do_grab() + _TH.update(url, fo._amount_read - fo._reget_length, time.time() - tm, None) if not opts.checkfunc is None: - cb_func, cb_args, cb_kwargs = \ - self._make_callback(opts.checkfunc) - obj = CallbackObject() - obj.filename = filename - obj.url = url - apply(cb_func, (obj, )+cb_args, cb_kwargs) + obj = CallbackObject(filename=filename, url=url) + _run_callback(opts.checkfunc, obj) finally: fo.close() return filename - return self._retry(opts, retryfunc, url, filename) + try: + return self._retry(opts, retryfunc, url, filename) + except URLGrabError, e: + _TH.update(url, 0, 0, e) + opts.exception = e + return _run_callback(opts.failfunc, opts) def urlread(self, url, limit=None, **kwargs): """read the url into a string, up to 'limit' bytes @@ -1095,12 +1155,8 @@ class URLGrabber(object): else: s = fo.read(limit) if not opts.checkfunc is None: - cb_func, cb_args, cb_kwargs = \ - self._make_callback(opts.checkfunc) - obj = CallbackObject() - obj.data = s - obj.url = url - apply(cb_func, (obj, )+cb_args, cb_kwargs) + obj = CallbackObject(data=s, url=url) + _run_callback(opts.checkfunc, obj) finally: fo.close() return s @@ -1115,6 +1171,7 @@ class URLGrabber(object): return s def _make_callback(self, callback_obj): + # not used, left for compatibility if callable(callback_obj): return callback_obj, (), {} else: @@ -1346,14 +1403,8 @@ class PyCurlFileObject(object): return try: - e = None self.curl_obj.perform() - except pycurl.error, e: pass - self._do_perform_exc(e) - - def _do_perform_exc(self, e): - # handle pycurl exception 'e' - if e: + except pycurl.error, e: # XXX - break some of these out a bit more clearly # to other URLGrabErrors from # http://curl.haxx.se/libcurl/c/libcurl-errors.html @@ -1607,7 +1658,22 @@ class PyCurlFileObject(object): _was_filename = False if type(self.filename) in types.StringTypes and self.filename: _was_filename = True - self._do_open_fo() + self._prog_reportname = str(self.filename) + self._prog_basename = os.path.basename(self.filename) + + if self.append: mode = 'ab' + else: mode = 'wb' + + if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \ + (self.filename, mode)) + try: + self.fo = open(self.filename, mode) + except IOError, e: + err = URLGrabError(16, _(\ + 'error opening local file from %s, IOError: %s') % (self.url, e)) + err.url = self.url + raise err + else: self._prog_reportname = 'MEMORY' self._prog_basename = 'MEMORY' @@ -1627,7 +1693,29 @@ class PyCurlFileObject(object): raise e if _was_filename: - self._do_close_fo() + # close it up + self.fo.flush() + self.fo.close() + + # Set the URL where we got it from: + if xattr is not None: + # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes + try: + xattr.set(self.filename, 'user.xdg.origin.url', self.url) + except: + pass # URL too long. = IOError ... ignore everything. + + # set the time + mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME) + if mod_time != -1: + try: + os.utime(self.filename, (mod_time, mod_time)) + except OSError, e: + err = URLGrabError(16, _(\ + 'error setting timestamp on file %s from %s, OSError: %s') + % (self.filename, self.url, e)) + err.url = self.url + raise err # re open it try: self.fo = open(self.filename, 'r') @@ -1643,47 +1731,6 @@ class PyCurlFileObject(object): self._complete = True - def _do_open_fo(self): - self._prog_reportname = str(self.filename) - self._prog_basename = os.path.basename(self.filename) - if self.append: mode = 'ab' - else: mode = 'wb' - - if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \ - (self.filename, mode)) - try: - self.fo = open(self.filename, mode) - except IOError, e: - err = URLGrabError(16, _(\ - 'error opening local file from %s, IOError: %s') % (self.url, e)) - err.url = self.url - raise err - - def _do_close_fo(self): - # close it up - self.fo.flush() - self.fo.close() - - # Set the URL where we got it from: - if xattr is not None: - # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes - try: - xattr.set(self.filename, 'user.xdg.origin.url', self.url) - except: - pass # URL too long. = IOError ... ignore everything. - - # set the time - mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME) - if mod_time != -1: - try: - os.utime(self.filename, (mod_time, mod_time)) - except OSError, e: - err = URLGrabError(16, _(\ - 'error setting timestamp on file %s from %s, OSError: %s') - % (self.filename, self.url, e)) - err.url = self.url - raise err - def _fill_buffer(self, amt=None): """fill the buffer to contain at least 'amt' bytes by reading from the underlying file object. If amt is None, then it will @@ -1858,6 +1905,425 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0, ##################################################################### +# Serializer + parser: A replacement of the rather bulky Json code. +# +# - handles basic python literals, lists and tuples. +# - serialized strings never contain ' ' or '\n' +# +##################################################################### + +_quoter_map = {} +for c in '%[(,)] \n': + _quoter_map[c] = '%%%02x' % ord(c) +del c + +def _dumps(v): + if v is None: return 'None' + if v is True: return 'True' + if v is False: return 'False' + if type(v) in (int, long, float): + return str(v) + if type(v) == unicode: + v = v.encode('UTF8') + if type(v) == str: + def quoter(c): return _quoter_map.get(c, c) + return "'%s'" % ''.join(map(quoter, v)) + if type(v) == tuple: + return "(%s)" % ','.join(map(_dumps, v)) + if type(v) == list: + return "[%s]" % ','.join(map(_dumps, v)) + raise TypeError, 'Can\'t serialize %s' % v + +def _loads(s): + def decode(v): + if v == 'None': return None + if v == 'True': return True + if v == 'False': return False + try: return int(v) + except ValueError: pass + try: return float(v) + except ValueError: pass + if len(v) >= 2 and v[0] == v[-1] == "'": + ret = []; i = 1 + while True: + j = v.find('%', i) + ret.append(v[i:j]) # skips the final "'" + if j == -1: break + ret.append(chr(int(v[j + 1:j + 3], 16))) + i = j + 3 + v = ''.join(ret) + return v + stk = None + l = [] + i = j = 0 + while True: + if j == len(s) or s[j] in ',)]': + if j > i: + l.append(decode(s[i:j])) + if j == len(s): break + if s[j] in ')]': + if s[j] == ')': + l = tuple(l) + stk[0].append(l) + l, stk = stk + i = j = j + 1 + elif s[j] in '[(': + stk = l, stk + l = [] + i = j = j + 1 + else: + j += 1 # safe because '[(,)]' are quoted + if stk: raise ValueError + if len(l) == 1: l = l[0] + return l + + +##################################################################### +# External downloader process +##################################################################### + +def _readlines(fd): + buf = os.read(fd, 4096) + if not buf: return None + # whole lines only, no buffering + while buf[-1] != '\n': + buf += os.read(fd, 4096) + return buf[:-1].split('\n') + +import subprocess + +class _ExternalDownloader: + def __init__(self): + self.popen = subprocess.Popen( + '/usr/libexec/urlgrabber-ext-down', + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + ) + self.stdin = self.popen.stdin.fileno() + self.stdout = self.popen.stdout.fileno() + self.running = {} + self.cnt = 0 + + # list of options we pass to downloader + _options = ( + 'url', 'filename', + 'timeout', 'close_connection', 'keepalive', + 'throttle', 'bandwidth', 'range', 'reget', + 'user_agent', 'http_headers', 'ftp_headers', + 'proxy', 'prefix', 'username', 'password', + 'ssl_ca_cert', + 'ssl_cert', 'ssl_cert_type', + 'ssl_key', 'ssl_key_type', + 'ssl_key_pass', + 'ssl_verify_peer', 'ssl_verify_host', + 'size', 'max_header_size', 'ip_resolve', + ) + + def start(self, opts): + arg = [] + for k in self._options: + v = getattr(opts, k) + if v is None: continue + arg.append('%s=%s' % (k, _dumps(v))) + if opts.progress_obj: + arg.append('progress_obj=True') + arg = ' '.join(arg) + if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url) + + self.cnt += 1 + self.running[self.cnt] = opts + os.write(self.stdin, arg +'\n') + + def perform(self): + ret = [] + lines = _readlines(self.stdout) + if not lines: + if DEBUG: DEBUG.info('downloader died') + raise KeyboardInterrupt + for line in lines: + # parse downloader output + line = line.split(' ', 5) + _id, size = map(int, line[:2]) + if len(line) == 2: + opts = self.running[_id] + m = opts.progress_obj + if m: + if not m.last_update_time: + m.start(text = opts.text) + m.update(size) + continue + # job done + opts = self.running.pop(_id) + if line[4] == 'OK': + ug_err = None + if DEBUG: DEBUG.info('success') + else: + ug_err = URLGrabError(int(line[4]), line[5]) + if DEBUG: DEBUG.info('failure: %s', err) + _TH.update(opts.url, int(line[2]), float(line[3]), ug_err) + ret.append((opts, size, ug_err)) + return ret + + def abort(self): + self.popen.stdin.close() + self.popen.stdout.close() + self.popen.wait() + +class _ExternalDownloaderPool: + def __init__(self): + self.epoll = select.epoll() + self.running = {} + self.cache = {} + + def start(self, opts): + host = urlparse.urlsplit(opts.url).netloc + dl = self.cache.pop(host, None) + if not dl: + dl = _ExternalDownloader() + fl = fcntl.fcntl(dl.stdin, fcntl.F_GETFD) + fcntl.fcntl(dl.stdin, fcntl.F_SETFD, fl | fcntl.FD_CLOEXEC) + self.epoll.register(dl.stdout, select.EPOLLIN) + self.running[dl.stdout] = dl + dl.start(opts) + + def perform(self): + ret = [] + for fd, event in self.epoll.poll(): + assert event & select.EPOLLIN + done = self.running[fd].perform() + if not done: continue + assert len(done) == 1 + ret.extend(done) + + # dl finished, move it to the cache + host = urlparse.urlsplit(done[0][0].url).netloc + if host in self.cache: self.cache[host].abort() + self.epoll.unregister(fd) + self.cache[host] = self.running.pop(fd) + return ret + + def abort(self): + for dl in self.running.values(): + self.epoll.unregister(dl.stdout) + dl.abort() + for dl in self.cache.values(): + dl.abort() + + +##################################################################### +# High level async API +##################################################################### + +_async_queue = [] + +def parallel_wait(meter = 'text'): + '''Process queued requests in parallel. + ''' + + if meter: + count = total = 0 + for opts in _async_queue: + count += 1 + total += opts.size + if meter == 'text': + from progress import TextMultiFileMeter + meter = TextMultiFileMeter() + meter.start(count, total) + + dl = _ExternalDownloaderPool() + host_con = {} # current host connection counts + + def start(opts, tries): + key, limit = opts.async + host_con[key] = host_con.get(key, 0) + 1 + opts.tries = tries + opts.progress_obj = meter and meter.newMeter() + if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url) + dl.start(opts) + + def perform(): + for opts, size, ug_err in dl.perform(): + key, limit = opts.async + host_con[key] -= 1 + if meter: + m = opts.progress_obj + m.basename = os.path.basename(opts.filename) + if ug_err: + m.failure(ug_err.args[1]) + else: + # file size might have changed + meter.re.total += size - opts.size + m.end(size) + meter.removeMeter(m) + + if ug_err is None: + if opts.checkfunc: + try: _run_callback(opts.checkfunc, opts) + except URLGrabError, ug_err: pass + if ug_err is None: + continue + + retry = opts.retry or 0 + if opts.failure_callback: + opts.exception = ug_err + try: _run_callback(opts.failure_callback, opts) + except URLGrabError, ug_err: + retry = 0 # no retries + if opts.tries < retry and ug_err.args[0] in opts.retrycodes: + start(opts, opts.tries + 1) # simple retry + continue + + if opts.mirror_group: + mg, failed = opts.mirror_group + opts.mirror = key + opts.exception = ug_err + action = _run_callback(mg.failure_callback, opts) + if not (action and action.get('fail')): + # mask this mirror and retry + failed.add(key) + _async_queue.append(opts) + continue + + # urlgrab failed + opts.exception = ug_err + _run_callback(opts.failfunc, opts) + + try: + idx = 0 + while True: + if idx >= len(_async_queue): + # the queue is empty + if not dl.running: break + # pending dl may extend it + perform() + continue + + # handle next request + opts = _async_queue[idx] + idx += 1 + + # check global limit + while len(dl.running) >= opts.max_connections: + perform() + + if opts.mirror_group: + mg, failed = opts.mirror_group + + # find the best mirror + best = None + for mirror in mg.mirrors: + key = mirror['mirror'] + if key in failed: continue + + # estimate mirror speed + speed = _TH.estimate(key) + speed /= 1 + host_con.get(key, 0) + if best is None or speed > best_speed: + best = mirror + best_speed = speed + + if best is None: + opts.exception = URLGrabError(256, _('No more mirrors to try.')) + _run_callback(opts.failfunc, opts) + continue + + # update the current mirror and limit + key = best['mirror'] + limit = best.get('kwargs', {}).get('max_connections', 3) + opts.async = key, limit + + # update URL and proxy + url = mg._join_url(key, opts.relative_url) + url, parts = opts.urlparser.parse(url, opts) + opts.find_proxy(url, parts[0]) + opts.url = url + + # check host limit, then start + key, limit = opts.async + while host_con.get(key, 0) >= limit: + perform() + start(opts, 1) + except IOError, e: + if e.errno != 4: raise + raise KeyboardInterrupt + + finally: + dl.abort() + if meter: meter.end() + del _async_queue[:] + _TH.save() + + +##################################################################### +# Host bandwidth estimation +##################################################################### + +class _TH: + hosts = {} + dirty = None + + @staticmethod + def load(): + filename = default_grabber.opts.timedhosts + if filename and _TH.dirty is None: + try: + for line in open(filename): + host, speed, fail, ts = line.split() + _TH.hosts[host] = int(speed), int(fail), int(ts) + except IOError: pass + _TH.dirty = False + + @staticmethod + def save(): + filename = default_grabber.opts.timedhosts + if filename and _TH.dirty is True: + tmp = '%s.%d' % (filename, os.getpid()) + try: + f = open(tmp, 'w') + for host in _TH.hosts: + f.write(host + ' %d %d %d\n' % _TH.hosts[host]) + f.close() + os.rename(tmp, filename) + except IOError: pass + _TH.dirty = False + + @staticmethod + def update(url, dl_size, dl_time, ug_err): + _TH.load() + host = urlparse.urlsplit(url).netloc + speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0) + now = time.time() + + if ug_err is None: + # k1: the older, the less useful + # k2: if it was <1MiB, don't trust it much + # speeds vary, use 10:1 smoothing + k1 = 2**((ts - now) / default_grabber.opts.half_life) + k2 = min(dl_size / 1e6, 1.0) / 10 + speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2) + fail = 0 + elif getattr(ug_err, 'code', None) == 404: + fail = 0 # alive, at least + else: + fail += 1 # seems dead + + _TH.hosts[host] = speed, fail, now + _TH.dirty = True + + @staticmethod + def estimate(url): + _TH.load() + host = urlparse.urlsplit(url).netloc + default_speed = default_grabber.opts.default_speed + try: speed, fail, ts = _TH.hosts[host] + except KeyError: return default_speed + + speed *= 2**-fail + k = 2**((ts - time.time()) / default_grabber.opts.half_life) + speed = k * speed + (1 - k) * default_speed + return speed + +##################################################################### # TESTING def _main_test(): try: url, filename = sys.argv[1:3] diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py index 8731aed..d699b61 100644 --- a/urlgrabber/mirror.py +++ b/urlgrabber/mirror.py @@ -76,6 +76,9 @@ CUSTOMIZATION 'grabber' is omitted, the default grabber will be used. If kwargs are omitted, then (duh) they will not be used. + kwarg 'max_connections' is used to store the max connection + limit of this mirror. + 3) Pass keyword arguments when instantiating the mirror group. See, for example, the failure_callback argument. @@ -91,6 +94,7 @@ import random import thread # needed for locking to make this threadsafe from grabber import URLGrabError, CallbackObject, DEBUG, _to_utf8 +from grabber import _run_callback, _do_raise, _async_queue def _(st): return st @@ -254,7 +258,7 @@ class MirrorGroup: # if these values are found in **kwargs passed to one of the urlXXX # methods, they will be stripped before getting passed on to the # grabber - options = ['default_action', 'failure_callback'] + options = ['default_action', 'failure_callback', 'failfunc'] def _process_kwargs(self, kwargs): self.failure_callback = kwargs.get('failure_callback') @@ -403,10 +407,25 @@ class MirrorGroup: self._failure(gr, obj) def urlgrab(self, url, filename=None, **kwargs): + if kwargs.get('async'): + opts = self.grabber.opts.derive(**kwargs) + opts.mirror_group = self, set() + opts.relative_url = _to_utf8(url) + + opts.url = 'http://tbd' + opts.filename = filename + opts.size = int(opts.size or 0) + _async_queue.append(opts) + return filename + kw = dict(kwargs) kw['filename'] = filename func = 'urlgrab' - return self._mirror_try(func, url, kw) + try: + return self._mirror_try(func, url, kw) + except URLGrabError, e: + obj = CallbackObject(url=url, filename=filename, exception=e, **kwargs) + return _run_callback(kwargs.get('failfunc', _do_raise), obj) def urlopen(self, url, **kwargs): kw = dict(kwargs) diff --git a/urlgrabber/progress.py b/urlgrabber/progress.py index 3d7e99a..4c126c5 100644 --- a/urlgrabber/progress.py +++ b/urlgrabber/progress.py @@ -576,7 +576,6 @@ class TextMultiFileMeter(MultiFileMeter): self.fo.write(out) finally: self._lock.release() - self._do_update_meter(meter, now) def _do_failure_meter(self, meter, message, now): self._lock.acquire() @@ -599,15 +598,6 @@ class TextMultiFileMeter(MultiFileMeter): pass finally: self._lock.release() - - def _do_end(self, now): - self._do_update_meter(None, now) - self._lock.acquire() - try: - self.fo.write('\n') - self.fo.flush() - finally: - self._lock.release() ###################################################################### # support classes and functions