parent
dadfd07b88
commit
a16417493c
@ -0,0 +1,904 @@
|
|||||||
|
diff --git a/scripts/urlgrabber-ext-down b/scripts/urlgrabber-ext-down
|
||||||
|
new file mode 100755
|
||||||
|
index 0000000..c37e6a8
|
||||||
|
--- /dev/null
|
||||||
|
+++ b/scripts/urlgrabber-ext-down
|
||||||
|
@@ -0,0 +1,55 @@
|
||||||
|
+#! /usr/bin/python
|
||||||
|
+# A very simple external downloader
|
||||||
|
+
|
||||||
|
+import time, os, errno, sys
|
||||||
|
+from urlgrabber.grabber import \
|
||||||
|
+ _readlines, URLGrabberOptions, _loads, \
|
||||||
|
+ PyCurlFileObject, URLGrabError
|
||||||
|
+
|
||||||
|
+def write(fmt, *arg):
|
||||||
|
+ try: os.write(1, fmt % arg)
|
||||||
|
+ except OSError, e:
|
||||||
|
+ if e.arg[0] != errno.EPIPE: raise
|
||||||
|
+ sys.exit(1)
|
||||||
|
+
|
||||||
|
+class ProxyProgress:
|
||||||
|
+ def start(self, *d1, **d2):
|
||||||
|
+ self.next_update = 0
|
||||||
|
+ def update(self, _amount_read):
|
||||||
|
+ t = time.time()
|
||||||
|
+ if t < self.next_update: return
|
||||||
|
+ self.next_update = t + 0.31
|
||||||
|
+ write('%d %d\n', self._id, _amount_read)
|
||||||
|
+
|
||||||
|
+def main():
|
||||||
|
+ import signal
|
||||||
|
+ signal.signal(signal.SIGINT, lambda n, f: sys.exit(1))
|
||||||
|
+ cnt = 0
|
||||||
|
+ while True:
|
||||||
|
+ lines = _readlines(0)
|
||||||
|
+ if not lines: break
|
||||||
|
+ for line in lines:
|
||||||
|
+ cnt += 1
|
||||||
|
+ opts = URLGrabberOptions()
|
||||||
|
+ opts._id = cnt
|
||||||
|
+ for k in line.split(' '):
|
||||||
|
+ k, v = k.split('=', 1)
|
||||||
|
+ setattr(opts, k, _loads(v))
|
||||||
|
+ if opts.progress_obj:
|
||||||
|
+ opts.progress_obj = ProxyProgress()
|
||||||
|
+ opts.progress_obj._id = cnt
|
||||||
|
+ tm = time.time()
|
||||||
|
+ try:
|
||||||
|
+ fo = PyCurlFileObject(opts.url, opts.filename, opts)
|
||||||
|
+ fo._do_grab()
|
||||||
|
+ fo.fo.close()
|
||||||
|
+ size = fo._amount_read
|
||||||
|
+ dlsz = size - fo._reget_length
|
||||||
|
+ ug_err = 'OK'
|
||||||
|
+ except URLGrabError, e:
|
||||||
|
+ size = dlsz = 0
|
||||||
|
+ ug_err = '%d %s' % e.args
|
||||||
|
+ write('%d %d %d %.3f %s\n', opts._id, size, dlsz, time.time() - tm, ug_err)
|
||||||
|
+
|
||||||
|
+if __name__ == '__main__':
|
||||||
|
+ main()
|
||||||
|
diff --git a/setup.py b/setup.py
|
||||||
|
index d0b87b8..bfa4a18 100644
|
||||||
|
--- a/setup.py
|
||||||
|
+++ b/setup.py
|
||||||
|
@@ -15,8 +15,10 @@ url = _urlgrabber.__url__
|
||||||
|
packages = ['urlgrabber']
|
||||||
|
package_dir = {'urlgrabber':'urlgrabber'}
|
||||||
|
scripts = ['scripts/urlgrabber']
|
||||||
|
-data_files = [('share/doc/' + name + '-' + version,
|
||||||
|
- ['README','LICENSE', 'TODO', 'ChangeLog'])]
|
||||||
|
+data_files = [
|
||||||
|
+ ('share/doc/' + name + '-' + version, ['README','LICENSE', 'TODO', 'ChangeLog']),
|
||||||
|
+ ('libexec', ['scripts/urlgrabber-ext-down']),
|
||||||
|
+]
|
||||||
|
options = { 'clean' : { 'all' : 1 } }
|
||||||
|
classifiers = [
|
||||||
|
'Development Status :: 4 - Beta',
|
||||||
|
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
|
||||||
|
index 38ae1f7..094be77 100644
|
||||||
|
--- a/urlgrabber/grabber.py
|
||||||
|
+++ b/urlgrabber/grabber.py
|
||||||
|
@@ -263,6 +263,33 @@ GENERAL ARGUMENTS (kwargs)
|
||||||
|
What type of name to IP resolving to use, default is to do both IPV4 and
|
||||||
|
IPV6.
|
||||||
|
|
||||||
|
+ async = (key, limit)
|
||||||
|
+
|
||||||
|
+ When this option is set, the urlgrab() is not processed immediately
|
||||||
|
+ but queued. parallel_wait() then processes grabs in parallel, limiting
|
||||||
|
+ the numer of connections in each 'key' group to at most 'limit'.
|
||||||
|
+
|
||||||
|
+ max_connections
|
||||||
|
+
|
||||||
|
+ The global connection limit.
|
||||||
|
+
|
||||||
|
+ timedhosts
|
||||||
|
+
|
||||||
|
+ The filename of the host download statistics. If defined, urlgrabber
|
||||||
|
+ will update the stats at the end of every download. At the end of
|
||||||
|
+ parallel_wait(), the updated stats are saved. If synchronous grabs
|
||||||
|
+ are used, you should call th_save().
|
||||||
|
+
|
||||||
|
+ default_speed, half_life
|
||||||
|
+
|
||||||
|
+ These options only affect the async mirror selection code.
|
||||||
|
+ The default_speed option sets the speed estimate for mirrors
|
||||||
|
+ we have never downloaded from, and defaults to 1 MBps.
|
||||||
|
+
|
||||||
|
+ The speed estimate also drifts exponentially from the speed
|
||||||
|
+ actually measured to the default speed, with default
|
||||||
|
+ period of 30 days.
|
||||||
|
+
|
||||||
|
|
||||||
|
RETRY RELATED ARGUMENTS
|
||||||
|
|
||||||
|
@@ -343,6 +370,15 @@ RETRY RELATED ARGUMENTS
|
||||||
|
but it cannot (without severe trickiness) prevent the exception
|
||||||
|
from being raised.
|
||||||
|
|
||||||
|
+ failfunc = None
|
||||||
|
+
|
||||||
|
+ The callback that gets called when urlgrab request fails.
|
||||||
|
+ If defined, urlgrab() calls it instead of raising URLGrabError.
|
||||||
|
+ Callback syntax is identical to failure_callback.
|
||||||
|
+
|
||||||
|
+ Contrary to failure_callback, it's called only once. It's primary
|
||||||
|
+ purpose is to use urlgrab() without a try/except block.
|
||||||
|
+
|
||||||
|
interrupt_callback = None
|
||||||
|
|
||||||
|
This callback is called if KeyboardInterrupt is received at any
|
||||||
|
@@ -444,7 +480,7 @@ import pycurl
|
||||||
|
from ftplib import parse150
|
||||||
|
from StringIO import StringIO
|
||||||
|
from httplib import HTTPException
|
||||||
|
-import socket
|
||||||
|
+import socket, select, fcntl
|
||||||
|
from byterange import range_tuple_normalize, range_tuple_to_header, RangeError
|
||||||
|
|
||||||
|
try:
|
||||||
|
@@ -878,6 +914,7 @@ class URLGrabberOptions:
|
||||||
|
self.retry = None
|
||||||
|
self.retrycodes = [-1,2,4,5,6,7]
|
||||||
|
self.checkfunc = None
|
||||||
|
+ self.failfunc = _do_raise
|
||||||
|
self.copy_local = 0
|
||||||
|
self.close_connection = 0
|
||||||
|
self.range = None
|
||||||
|
@@ -886,6 +923,7 @@ class URLGrabberOptions:
|
||||||
|
self.keepalive = 1
|
||||||
|
self.proxies = None
|
||||||
|
self.libproxy = False
|
||||||
|
+ self.proxy = None
|
||||||
|
self.reget = None
|
||||||
|
self.failure_callback = None
|
||||||
|
self.interrupt_callback = None
|
||||||
|
@@ -913,6 +951,12 @@ class URLGrabberOptions:
|
||||||
|
self.size = None # if we know how big the thing we're getting is going
|
||||||
|
# to be. this is ultimately a MAXIMUM size for the file
|
||||||
|
self.max_header_size = 2097152 #2mb seems reasonable for maximum header size
|
||||||
|
+ self.async = None # blocking by default
|
||||||
|
+ self.mirror_group = None
|
||||||
|
+ self.max_connections = 5
|
||||||
|
+ self.timedhosts = None
|
||||||
|
+ self.half_life = 30*24*60*60 # 30 days
|
||||||
|
+ self.default_speed = 1e6 # 1 MBit
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return self.format()
|
||||||
|
@@ -932,6 +976,17 @@ class URLGrabberOptions:
|
||||||
|
s = s + indent + '}'
|
||||||
|
return s
|
||||||
|
|
||||||
|
+def _do_raise(obj):
|
||||||
|
+ raise obj.exception
|
||||||
|
+
|
||||||
|
+def _run_callback(cb, obj):
|
||||||
|
+ if not cb:
|
||||||
|
+ return
|
||||||
|
+ if callable(cb):
|
||||||
|
+ return cb(obj)
|
||||||
|
+ cb, arg, karg = cb
|
||||||
|
+ return cb(obj, *arg, **karg)
|
||||||
|
+
|
||||||
|
class URLGrabber(object):
|
||||||
|
"""Provides easy opening of URLs with a variety of options.
|
||||||
|
|
||||||
|
@@ -977,10 +1032,9 @@ class URLGrabber(object):
|
||||||
|
if DEBUG: DEBUG.info('exception: %s', exception)
|
||||||
|
if callback:
|
||||||
|
if DEBUG: DEBUG.info('calling callback: %s', callback)
|
||||||
|
- cb_func, cb_args, cb_kwargs = self._make_callback(callback)
|
||||||
|
obj = CallbackObject(exception=exception, url=args[0],
|
||||||
|
tries=tries, retry=opts.retry)
|
||||||
|
- cb_func(obj, *cb_args, **cb_kwargs)
|
||||||
|
+ _run_callback(callback, obj)
|
||||||
|
|
||||||
|
if (opts.retry is None) or (tries == opts.retry):
|
||||||
|
if DEBUG: DEBUG.info('retries exceeded, re-raising')
|
||||||
|
@@ -1043,30 +1097,36 @@ class URLGrabber(object):
|
||||||
|
|
||||||
|
elif not opts.range:
|
||||||
|
if not opts.checkfunc is None:
|
||||||
|
- cb_func, cb_args, cb_kwargs = \
|
||||||
|
- self._make_callback(opts.checkfunc)
|
||||||
|
- obj = CallbackObject()
|
||||||
|
- obj.filename = path
|
||||||
|
- obj.url = url
|
||||||
|
- apply(cb_func, (obj, )+cb_args, cb_kwargs)
|
||||||
|
+ obj = CallbackObject(filename=path, url=url)
|
||||||
|
+ _run_callback(opts.checkfunc, obj)
|
||||||
|
return path
|
||||||
|
|
||||||
|
+ if opts.async:
|
||||||
|
+ opts.url = url
|
||||||
|
+ opts.filename = filename
|
||||||
|
+ opts.size = int(opts.size or 0)
|
||||||
|
+ _async_queue.append(opts)
|
||||||
|
+ return filename
|
||||||
|
+
|
||||||
|
def retryfunc(opts, url, filename):
|
||||||
|
+ tm = time.time()
|
||||||
|
fo = PyCurlFileObject(url, filename, opts)
|
||||||
|
try:
|
||||||
|
fo._do_grab()
|
||||||
|
+ _TH.update(url, fo._amount_read - fo._reget_length, time.time() - tm, None)
|
||||||
|
if not opts.checkfunc is None:
|
||||||
|
- cb_func, cb_args, cb_kwargs = \
|
||||||
|
- self._make_callback(opts.checkfunc)
|
||||||
|
- obj = CallbackObject()
|
||||||
|
- obj.filename = filename
|
||||||
|
- obj.url = url
|
||||||
|
- apply(cb_func, (obj, )+cb_args, cb_kwargs)
|
||||||
|
+ obj = CallbackObject(filename=filename, url=url)
|
||||||
|
+ _run_callback(opts.checkfunc, obj)
|
||||||
|
finally:
|
||||||
|
fo.close()
|
||||||
|
return filename
|
||||||
|
|
||||||
|
- return self._retry(opts, retryfunc, url, filename)
|
||||||
|
+ try:
|
||||||
|
+ return self._retry(opts, retryfunc, url, filename)
|
||||||
|
+ except URLGrabError, e:
|
||||||
|
+ _TH.update(url, 0, 0, e)
|
||||||
|
+ opts.exception = e
|
||||||
|
+ return _run_callback(opts.failfunc, opts)
|
||||||
|
|
||||||
|
def urlread(self, url, limit=None, **kwargs):
|
||||||
|
"""read the url into a string, up to 'limit' bytes
|
||||||
|
@@ -1095,12 +1155,8 @@ class URLGrabber(object):
|
||||||
|
else: s = fo.read(limit)
|
||||||
|
|
||||||
|
if not opts.checkfunc is None:
|
||||||
|
- cb_func, cb_args, cb_kwargs = \
|
||||||
|
- self._make_callback(opts.checkfunc)
|
||||||
|
- obj = CallbackObject()
|
||||||
|
- obj.data = s
|
||||||
|
- obj.url = url
|
||||||
|
- apply(cb_func, (obj, )+cb_args, cb_kwargs)
|
||||||
|
+ obj = CallbackObject(data=s, url=url)
|
||||||
|
+ _run_callback(opts.checkfunc, obj)
|
||||||
|
finally:
|
||||||
|
fo.close()
|
||||||
|
return s
|
||||||
|
@@ -1115,6 +1171,7 @@ class URLGrabber(object):
|
||||||
|
return s
|
||||||
|
|
||||||
|
def _make_callback(self, callback_obj):
|
||||||
|
+ # not used, left for compatibility
|
||||||
|
if callable(callback_obj):
|
||||||
|
return callback_obj, (), {}
|
||||||
|
else:
|
||||||
|
@@ -1346,14 +1403,8 @@ class PyCurlFileObject(object):
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
- e = None
|
||||||
|
self.curl_obj.perform()
|
||||||
|
- except pycurl.error, e: pass
|
||||||
|
- self._do_perform_exc(e)
|
||||||
|
-
|
||||||
|
- def _do_perform_exc(self, e):
|
||||||
|
- # handle pycurl exception 'e'
|
||||||
|
- if e:
|
||||||
|
+ except pycurl.error, e:
|
||||||
|
# XXX - break some of these out a bit more clearly
|
||||||
|
# to other URLGrabErrors from
|
||||||
|
# http://curl.haxx.se/libcurl/c/libcurl-errors.html
|
||||||
|
@@ -1607,7 +1658,22 @@ class PyCurlFileObject(object):
|
||||||
|
_was_filename = False
|
||||||
|
if type(self.filename) in types.StringTypes and self.filename:
|
||||||
|
_was_filename = True
|
||||||
|
- self._do_open_fo()
|
||||||
|
+ self._prog_reportname = str(self.filename)
|
||||||
|
+ self._prog_basename = os.path.basename(self.filename)
|
||||||
|
+
|
||||||
|
+ if self.append: mode = 'ab'
|
||||||
|
+ else: mode = 'wb'
|
||||||
|
+
|
||||||
|
+ if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \
|
||||||
|
+ (self.filename, mode))
|
||||||
|
+ try:
|
||||||
|
+ self.fo = open(self.filename, mode)
|
||||||
|
+ except IOError, e:
|
||||||
|
+ err = URLGrabError(16, _(\
|
||||||
|
+ 'error opening local file from %s, IOError: %s') % (self.url, e))
|
||||||
|
+ err.url = self.url
|
||||||
|
+ raise err
|
||||||
|
+
|
||||||
|
else:
|
||||||
|
self._prog_reportname = 'MEMORY'
|
||||||
|
self._prog_basename = 'MEMORY'
|
||||||
|
@@ -1627,7 +1693,29 @@ class PyCurlFileObject(object):
|
||||||
|
raise e
|
||||||
|
|
||||||
|
if _was_filename:
|
||||||
|
- self._do_close_fo()
|
||||||
|
+ # close it up
|
||||||
|
+ self.fo.flush()
|
||||||
|
+ self.fo.close()
|
||||||
|
+
|
||||||
|
+ # Set the URL where we got it from:
|
||||||
|
+ if xattr is not None:
|
||||||
|
+ # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes
|
||||||
|
+ try:
|
||||||
|
+ xattr.set(self.filename, 'user.xdg.origin.url', self.url)
|
||||||
|
+ except:
|
||||||
|
+ pass # URL too long. = IOError ... ignore everything.
|
||||||
|
+
|
||||||
|
+ # set the time
|
||||||
|
+ mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME)
|
||||||
|
+ if mod_time != -1:
|
||||||
|
+ try:
|
||||||
|
+ os.utime(self.filename, (mod_time, mod_time))
|
||||||
|
+ except OSError, e:
|
||||||
|
+ err = URLGrabError(16, _(\
|
||||||
|
+ 'error setting timestamp on file %s from %s, OSError: %s')
|
||||||
|
+ % (self.filename, self.url, e))
|
||||||
|
+ err.url = self.url
|
||||||
|
+ raise err
|
||||||
|
# re open it
|
||||||
|
try:
|
||||||
|
self.fo = open(self.filename, 'r')
|
||||||
|
@@ -1643,47 +1731,6 @@ class PyCurlFileObject(object):
|
||||||
|
|
||||||
|
self._complete = True
|
||||||
|
|
||||||
|
- def _do_open_fo(self):
|
||||||
|
- self._prog_reportname = str(self.filename)
|
||||||
|
- self._prog_basename = os.path.basename(self.filename)
|
||||||
|
- if self.append: mode = 'ab'
|
||||||
|
- else: mode = 'wb'
|
||||||
|
-
|
||||||
|
- if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \
|
||||||
|
- (self.filename, mode))
|
||||||
|
- try:
|
||||||
|
- self.fo = open(self.filename, mode)
|
||||||
|
- except IOError, e:
|
||||||
|
- err = URLGrabError(16, _(\
|
||||||
|
- 'error opening local file from %s, IOError: %s') % (self.url, e))
|
||||||
|
- err.url = self.url
|
||||||
|
- raise err
|
||||||
|
-
|
||||||
|
- def _do_close_fo(self):
|
||||||
|
- # close it up
|
||||||
|
- self.fo.flush()
|
||||||
|
- self.fo.close()
|
||||||
|
-
|
||||||
|
- # Set the URL where we got it from:
|
||||||
|
- if xattr is not None:
|
||||||
|
- # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes
|
||||||
|
- try:
|
||||||
|
- xattr.set(self.filename, 'user.xdg.origin.url', self.url)
|
||||||
|
- except:
|
||||||
|
- pass # URL too long. = IOError ... ignore everything.
|
||||||
|
-
|
||||||
|
- # set the time
|
||||||
|
- mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME)
|
||||||
|
- if mod_time != -1:
|
||||||
|
- try:
|
||||||
|
- os.utime(self.filename, (mod_time, mod_time))
|
||||||
|
- except OSError, e:
|
||||||
|
- err = URLGrabError(16, _(\
|
||||||
|
- 'error setting timestamp on file %s from %s, OSError: %s')
|
||||||
|
- % (self.filename, self.url, e))
|
||||||
|
- err.url = self.url
|
||||||
|
- raise err
|
||||||
|
-
|
||||||
|
def _fill_buffer(self, amt=None):
|
||||||
|
"""fill the buffer to contain at least 'amt' bytes by reading
|
||||||
|
from the underlying file object. If amt is None, then it will
|
||||||
|
@@ -1858,6 +1905,425 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0,
|
||||||
|
|
||||||
|
|
||||||
|
#####################################################################
|
||||||
|
+# Serializer + parser: A replacement of the rather bulky Json code.
|
||||||
|
+#
|
||||||
|
+# - handles basic python literals, lists and tuples.
|
||||||
|
+# - serialized strings never contain ' ' or '\n'
|
||||||
|
+#
|
||||||
|
+#####################################################################
|
||||||
|
+
|
||||||
|
+_quoter_map = {}
|
||||||
|
+for c in '%[(,)] \n':
|
||||||
|
+ _quoter_map[c] = '%%%02x' % ord(c)
|
||||||
|
+del c
|
||||||
|
+
|
||||||
|
+def _dumps(v):
|
||||||
|
+ if v is None: return 'None'
|
||||||
|
+ if v is True: return 'True'
|
||||||
|
+ if v is False: return 'False'
|
||||||
|
+ if type(v) in (int, long, float):
|
||||||
|
+ return str(v)
|
||||||
|
+ if type(v) == unicode:
|
||||||
|
+ v = v.encode('UTF8')
|
||||||
|
+ if type(v) == str:
|
||||||
|
+ def quoter(c): return _quoter_map.get(c, c)
|
||||||
|
+ return "'%s'" % ''.join(map(quoter, v))
|
||||||
|
+ if type(v) == tuple:
|
||||||
|
+ return "(%s)" % ','.join(map(_dumps, v))
|
||||||
|
+ if type(v) == list:
|
||||||
|
+ return "[%s]" % ','.join(map(_dumps, v))
|
||||||
|
+ raise TypeError, 'Can\'t serialize %s' % v
|
||||||
|
+
|
||||||
|
+def _loads(s):
|
||||||
|
+ def decode(v):
|
||||||
|
+ if v == 'None': return None
|
||||||
|
+ if v == 'True': return True
|
||||||
|
+ if v == 'False': return False
|
||||||
|
+ try: return int(v)
|
||||||
|
+ except ValueError: pass
|
||||||
|
+ try: return float(v)
|
||||||
|
+ except ValueError: pass
|
||||||
|
+ if len(v) >= 2 and v[0] == v[-1] == "'":
|
||||||
|
+ ret = []; i = 1
|
||||||
|
+ while True:
|
||||||
|
+ j = v.find('%', i)
|
||||||
|
+ ret.append(v[i:j]) # skips the final "'"
|
||||||
|
+ if j == -1: break
|
||||||
|
+ ret.append(chr(int(v[j + 1:j + 3], 16)))
|
||||||
|
+ i = j + 3
|
||||||
|
+ v = ''.join(ret)
|
||||||
|
+ return v
|
||||||
|
+ stk = None
|
||||||
|
+ l = []
|
||||||
|
+ i = j = 0
|
||||||
|
+ while True:
|
||||||
|
+ if j == len(s) or s[j] in ',)]':
|
||||||
|
+ if j > i:
|
||||||
|
+ l.append(decode(s[i:j]))
|
||||||
|
+ if j == len(s): break
|
||||||
|
+ if s[j] in ')]':
|
||||||
|
+ if s[j] == ')':
|
||||||
|
+ l = tuple(l)
|
||||||
|
+ stk[0].append(l)
|
||||||
|
+ l, stk = stk
|
||||||
|
+ i = j = j + 1
|
||||||
|
+ elif s[j] in '[(':
|
||||||
|
+ stk = l, stk
|
||||||
|
+ l = []
|
||||||
|
+ i = j = j + 1
|
||||||
|
+ else:
|
||||||
|
+ j += 1 # safe because '[(,)]' are quoted
|
||||||
|
+ if stk: raise ValueError
|
||||||
|
+ if len(l) == 1: l = l[0]
|
||||||
|
+ return l
|
||||||
|
+
|
||||||
|
+
|
||||||
|
+#####################################################################
|
||||||
|
+# External downloader process
|
||||||
|
+#####################################################################
|
||||||
|
+
|
||||||
|
+def _readlines(fd):
|
||||||
|
+ buf = os.read(fd, 4096)
|
||||||
|
+ if not buf: return None
|
||||||
|
+ # whole lines only, no buffering
|
||||||
|
+ while buf[-1] != '\n':
|
||||||
|
+ buf += os.read(fd, 4096)
|
||||||
|
+ return buf[:-1].split('\n')
|
||||||
|
+
|
||||||
|
+import subprocess
|
||||||
|
+
|
||||||
|
+class _ExternalDownloader:
|
||||||
|
+ def __init__(self):
|
||||||
|
+ self.popen = subprocess.Popen(
|
||||||
|
+ '/usr/libexec/urlgrabber-ext-down',
|
||||||
|
+ stdin = subprocess.PIPE,
|
||||||
|
+ stdout = subprocess.PIPE,
|
||||||
|
+ )
|
||||||
|
+ self.stdin = self.popen.stdin.fileno()
|
||||||
|
+ self.stdout = self.popen.stdout.fileno()
|
||||||
|
+ self.running = {}
|
||||||
|
+ self.cnt = 0
|
||||||
|
+
|
||||||
|
+ # list of options we pass to downloader
|
||||||
|
+ _options = (
|
||||||
|
+ 'url', 'filename',
|
||||||
|
+ 'timeout', 'close_connection', 'keepalive',
|
||||||
|
+ 'throttle', 'bandwidth', 'range', 'reget',
|
||||||
|
+ 'user_agent', 'http_headers', 'ftp_headers',
|
||||||
|
+ 'proxy', 'prefix', 'username', 'password',
|
||||||
|
+ 'ssl_ca_cert',
|
||||||
|
+ 'ssl_cert', 'ssl_cert_type',
|
||||||
|
+ 'ssl_key', 'ssl_key_type',
|
||||||
|
+ 'ssl_key_pass',
|
||||||
|
+ 'ssl_verify_peer', 'ssl_verify_host',
|
||||||
|
+ 'size', 'max_header_size', 'ip_resolve',
|
||||||
|
+ )
|
||||||
|
+
|
||||||
|
+ def start(self, opts):
|
||||||
|
+ arg = []
|
||||||
|
+ for k in self._options:
|
||||||
|
+ v = getattr(opts, k)
|
||||||
|
+ if v is None: continue
|
||||||
|
+ arg.append('%s=%s' % (k, _dumps(v)))
|
||||||
|
+ if opts.progress_obj:
|
||||||
|
+ arg.append('progress_obj=True')
|
||||||
|
+ arg = ' '.join(arg)
|
||||||
|
+ if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url)
|
||||||
|
+
|
||||||
|
+ self.cnt += 1
|
||||||
|
+ self.running[self.cnt] = opts
|
||||||
|
+ os.write(self.stdin, arg +'\n')
|
||||||
|
+
|
||||||
|
+ def perform(self):
|
||||||
|
+ ret = []
|
||||||
|
+ lines = _readlines(self.stdout)
|
||||||
|
+ if not lines:
|
||||||
|
+ if DEBUG: DEBUG.info('downloader died')
|
||||||
|
+ raise KeyboardInterrupt
|
||||||
|
+ for line in lines:
|
||||||
|
+ # parse downloader output
|
||||||
|
+ line = line.split(' ', 5)
|
||||||
|
+ _id, size = map(int, line[:2])
|
||||||
|
+ if len(line) == 2:
|
||||||
|
+ opts = self.running[_id]
|
||||||
|
+ m = opts.progress_obj
|
||||||
|
+ if m:
|
||||||
|
+ if not m.last_update_time:
|
||||||
|
+ m.start(text = opts.text)
|
||||||
|
+ m.update(size)
|
||||||
|
+ continue
|
||||||
|
+ # job done
|
||||||
|
+ opts = self.running.pop(_id)
|
||||||
|
+ if line[4] == 'OK':
|
||||||
|
+ ug_err = None
|
||||||
|
+ if DEBUG: DEBUG.info('success')
|
||||||
|
+ else:
|
||||||
|
+ ug_err = URLGrabError(int(line[4]), line[5])
|
||||||
|
+ if DEBUG: DEBUG.info('failure: %s', err)
|
||||||
|
+ _TH.update(opts.url, int(line[2]), float(line[3]), ug_err)
|
||||||
|
+ ret.append((opts, size, ug_err))
|
||||||
|
+ return ret
|
||||||
|
+
|
||||||
|
+ def abort(self):
|
||||||
|
+ self.popen.stdin.close()
|
||||||
|
+ self.popen.stdout.close()
|
||||||
|
+ self.popen.wait()
|
||||||
|
+
|
||||||
|
+class _ExternalDownloaderPool:
|
||||||
|
+ def __init__(self):
|
||||||
|
+ self.epoll = select.epoll()
|
||||||
|
+ self.running = {}
|
||||||
|
+ self.cache = {}
|
||||||
|
+
|
||||||
|
+ def start(self, opts):
|
||||||
|
+ host = urlparse.urlsplit(opts.url).netloc
|
||||||
|
+ dl = self.cache.pop(host, None)
|
||||||
|
+ if not dl:
|
||||||
|
+ dl = _ExternalDownloader()
|
||||||
|
+ fl = fcntl.fcntl(dl.stdin, fcntl.F_GETFD)
|
||||||
|
+ fcntl.fcntl(dl.stdin, fcntl.F_SETFD, fl | fcntl.FD_CLOEXEC)
|
||||||
|
+ self.epoll.register(dl.stdout, select.EPOLLIN)
|
||||||
|
+ self.running[dl.stdout] = dl
|
||||||
|
+ dl.start(opts)
|
||||||
|
+
|
||||||
|
+ def perform(self):
|
||||||
|
+ ret = []
|
||||||
|
+ for fd, event in self.epoll.poll():
|
||||||
|
+ assert event & select.EPOLLIN
|
||||||
|
+ done = self.running[fd].perform()
|
||||||
|
+ if not done: continue
|
||||||
|
+ assert len(done) == 1
|
||||||
|
+ ret.extend(done)
|
||||||
|
+
|
||||||
|
+ # dl finished, move it to the cache
|
||||||
|
+ host = urlparse.urlsplit(done[0][0].url).netloc
|
||||||
|
+ if host in self.cache: self.cache[host].abort()
|
||||||
|
+ self.epoll.unregister(fd)
|
||||||
|
+ self.cache[host] = self.running.pop(fd)
|
||||||
|
+ return ret
|
||||||
|
+
|
||||||
|
+ def abort(self):
|
||||||
|
+ for dl in self.running.values():
|
||||||
|
+ self.epoll.unregister(dl.stdout)
|
||||||
|
+ dl.abort()
|
||||||
|
+ for dl in self.cache.values():
|
||||||
|
+ dl.abort()
|
||||||
|
+
|
||||||
|
+
|
||||||
|
+#####################################################################
|
||||||
|
+# High level async API
|
||||||
|
+#####################################################################
|
||||||
|
+
|
||||||
|
+_async_queue = []
|
||||||
|
+
|
||||||
|
+def parallel_wait(meter = 'text'):
|
||||||
|
+ '''Process queued requests in parallel.
|
||||||
|
+ '''
|
||||||
|
+
|
||||||
|
+ if meter:
|
||||||
|
+ count = total = 0
|
||||||
|
+ for opts in _async_queue:
|
||||||
|
+ count += 1
|
||||||
|
+ total += opts.size
|
||||||
|
+ if meter == 'text':
|
||||||
|
+ from progress import TextMultiFileMeter
|
||||||
|
+ meter = TextMultiFileMeter()
|
||||||
|
+ meter.start(count, total)
|
||||||
|
+
|
||||||
|
+ dl = _ExternalDownloaderPool()
|
||||||
|
+ host_con = {} # current host connection counts
|
||||||
|
+
|
||||||
|
+ def start(opts, tries):
|
||||||
|
+ key, limit = opts.async
|
||||||
|
+ host_con[key] = host_con.get(key, 0) + 1
|
||||||
|
+ opts.tries = tries
|
||||||
|
+ opts.progress_obj = meter and meter.newMeter()
|
||||||
|
+ if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url)
|
||||||
|
+ dl.start(opts)
|
||||||
|
+
|
||||||
|
+ def perform():
|
||||||
|
+ for opts, size, ug_err in dl.perform():
|
||||||
|
+ key, limit = opts.async
|
||||||
|
+ host_con[key] -= 1
|
||||||
|
+ if meter:
|
||||||
|
+ m = opts.progress_obj
|
||||||
|
+ m.basename = os.path.basename(opts.filename)
|
||||||
|
+ if ug_err:
|
||||||
|
+ m.failure(ug_err.args[1])
|
||||||
|
+ else:
|
||||||
|
+ # file size might have changed
|
||||||
|
+ meter.re.total += size - opts.size
|
||||||
|
+ m.end(size)
|
||||||
|
+ meter.removeMeter(m)
|
||||||
|
+
|
||||||
|
+ if ug_err is None:
|
||||||
|
+ if opts.checkfunc:
|
||||||
|
+ try: _run_callback(opts.checkfunc, opts)
|
||||||
|
+ except URLGrabError, ug_err: pass
|
||||||
|
+ if ug_err is None:
|
||||||
|
+ continue
|
||||||
|
+
|
||||||
|
+ retry = opts.retry or 0
|
||||||
|
+ if opts.failure_callback:
|
||||||
|
+ opts.exception = ug_err
|
||||||
|
+ try: _run_callback(opts.failure_callback, opts)
|
||||||
|
+ except URLGrabError, ug_err:
|
||||||
|
+ retry = 0 # no retries
|
||||||
|
+ if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
|
||||||
|
+ start(opts, opts.tries + 1) # simple retry
|
||||||
|
+ continue
|
||||||
|
+
|
||||||
|
+ if opts.mirror_group:
|
||||||
|
+ mg, failed = opts.mirror_group
|
||||||
|
+ opts.mirror = key
|
||||||
|
+ opts.exception = ug_err
|
||||||
|
+ action = _run_callback(mg.failure_callback, opts)
|
||||||
|
+ if not (action and action.get('fail')):
|
||||||
|
+ # mask this mirror and retry
|
||||||
|
+ failed.add(key)
|
||||||
|
+ _async_queue.append(opts)
|
||||||
|
+ continue
|
||||||
|
+
|
||||||
|
+ # urlgrab failed
|
||||||
|
+ opts.exception = ug_err
|
||||||
|
+ _run_callback(opts.failfunc, opts)
|
||||||
|
+
|
||||||
|
+ try:
|
||||||
|
+ idx = 0
|
||||||
|
+ while True:
|
||||||
|
+ if idx >= len(_async_queue):
|
||||||
|
+ # the queue is empty
|
||||||
|
+ if not dl.running: break
|
||||||
|
+ # pending dl may extend it
|
||||||
|
+ perform()
|
||||||
|
+ continue
|
||||||
|
+
|
||||||
|
+ # handle next request
|
||||||
|
+ opts = _async_queue[idx]
|
||||||
|
+ idx += 1
|
||||||
|
+
|
||||||
|
+ # check global limit
|
||||||
|
+ while len(dl.running) >= opts.max_connections:
|
||||||
|
+ perform()
|
||||||
|
+
|
||||||
|
+ if opts.mirror_group:
|
||||||
|
+ mg, failed = opts.mirror_group
|
||||||
|
+
|
||||||
|
+ # find the best mirror
|
||||||
|
+ best = None
|
||||||
|
+ for mirror in mg.mirrors:
|
||||||
|
+ key = mirror['mirror']
|
||||||
|
+ if key in failed: continue
|
||||||
|
+
|
||||||
|
+ # estimate mirror speed
|
||||||
|
+ speed = _TH.estimate(key)
|
||||||
|
+ speed /= 1 + host_con.get(key, 0)
|
||||||
|
+ if best is None or speed > best_speed:
|
||||||
|
+ best = mirror
|
||||||
|
+ best_speed = speed
|
||||||
|
+
|
||||||
|
+ if best is None:
|
||||||
|
+ opts.exception = URLGrabError(256, _('No more mirrors to try.'))
|
||||||
|
+ _run_callback(opts.failfunc, opts)
|
||||||
|
+ continue
|
||||||
|
+
|
||||||
|
+ # update the current mirror and limit
|
||||||
|
+ key = best['mirror']
|
||||||
|
+ limit = best.get('kwargs', {}).get('max_connections', 3)
|
||||||
|
+ opts.async = key, limit
|
||||||
|
+
|
||||||
|
+ # update URL and proxy
|
||||||
|
+ url = mg._join_url(key, opts.relative_url)
|
||||||
|
+ url, parts = opts.urlparser.parse(url, opts)
|
||||||
|
+ opts.find_proxy(url, parts[0])
|
||||||
|
+ opts.url = url
|
||||||
|
+
|
||||||
|
+ # check host limit, then start
|
||||||
|
+ key, limit = opts.async
|
||||||
|
+ while host_con.get(key, 0) >= limit:
|
||||||
|
+ perform()
|
||||||
|
+ start(opts, 1)
|
||||||
|
+ except IOError, e:
|
||||||
|
+ if e.errno != 4: raise
|
||||||
|
+ raise KeyboardInterrupt
|
||||||
|
+
|
||||||
|
+ finally:
|
||||||
|
+ dl.abort()
|
||||||
|
+ if meter: meter.end()
|
||||||
|
+ del _async_queue[:]
|
||||||
|
+ _TH.save()
|
||||||
|
+
|
||||||
|
+
|
||||||
|
+#####################################################################
|
||||||
|
+# Host bandwidth estimation
|
||||||
|
+#####################################################################
|
||||||
|
+
|
||||||
|
+class _TH:
|
||||||
|
+ hosts = {}
|
||||||
|
+ dirty = None
|
||||||
|
+
|
||||||
|
+ @staticmethod
|
||||||
|
+ def load():
|
||||||
|
+ filename = default_grabber.opts.timedhosts
|
||||||
|
+ if filename and _TH.dirty is None:
|
||||||
|
+ try:
|
||||||
|
+ for line in open(filename):
|
||||||
|
+ host, speed, fail, ts = line.split()
|
||||||
|
+ _TH.hosts[host] = int(speed), int(fail), int(ts)
|
||||||
|
+ except IOError: pass
|
||||||
|
+ _TH.dirty = False
|
||||||
|
+
|
||||||
|
+ @staticmethod
|
||||||
|
+ def save():
|
||||||
|
+ filename = default_grabber.opts.timedhosts
|
||||||
|
+ if filename and _TH.dirty is True:
|
||||||
|
+ tmp = '%s.%d' % (filename, os.getpid())
|
||||||
|
+ try:
|
||||||
|
+ f = open(tmp, 'w')
|
||||||
|
+ for host in _TH.hosts:
|
||||||
|
+ f.write(host + ' %d %d %d\n' % _TH.hosts[host])
|
||||||
|
+ f.close()
|
||||||
|
+ os.rename(tmp, filename)
|
||||||
|
+ except IOError: pass
|
||||||
|
+ _TH.dirty = False
|
||||||
|
+
|
||||||
|
+ @staticmethod
|
||||||
|
+ def update(url, dl_size, dl_time, ug_err):
|
||||||
|
+ _TH.load()
|
||||||
|
+ host = urlparse.urlsplit(url).netloc
|
||||||
|
+ speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0)
|
||||||
|
+ now = time.time()
|
||||||
|
+
|
||||||
|
+ if ug_err is None:
|
||||||
|
+ # k1: the older, the less useful
|
||||||
|
+ # k2: if it was <1MiB, don't trust it much
|
||||||
|
+ # speeds vary, use 10:1 smoothing
|
||||||
|
+ k1 = 2**((ts - now) / default_grabber.opts.half_life)
|
||||||
|
+ k2 = min(dl_size / 1e6, 1.0) / 10
|
||||||
|
+ speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
|
||||||
|
+ fail = 0
|
||||||
|
+ elif getattr(ug_err, 'code', None) == 404:
|
||||||
|
+ fail = 0 # alive, at least
|
||||||
|
+ else:
|
||||||
|
+ fail += 1 # seems dead
|
||||||
|
+
|
||||||
|
+ _TH.hosts[host] = speed, fail, now
|
||||||
|
+ _TH.dirty = True
|
||||||
|
+
|
||||||
|
+ @staticmethod
|
||||||
|
+ def estimate(url):
|
||||||
|
+ _TH.load()
|
||||||
|
+ host = urlparse.urlsplit(url).netloc
|
||||||
|
+ default_speed = default_grabber.opts.default_speed
|
||||||
|
+ try: speed, fail, ts = _TH.hosts[host]
|
||||||
|
+ except KeyError: return default_speed
|
||||||
|
+
|
||||||
|
+ speed *= 2**-fail
|
||||||
|
+ k = 2**((ts - time.time()) / default_grabber.opts.half_life)
|
||||||
|
+ speed = k * speed + (1 - k) * default_speed
|
||||||
|
+ return speed
|
||||||
|
+
|
||||||
|
+#####################################################################
|
||||||
|
# TESTING
|
||||||
|
def _main_test():
|
||||||
|
try: url, filename = sys.argv[1:3]
|
||||||
|
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
|
||||||
|
index 8731aed..d699b61 100644
|
||||||
|
--- a/urlgrabber/mirror.py
|
||||||
|
+++ b/urlgrabber/mirror.py
|
||||||
|
@@ -76,6 +76,9 @@ CUSTOMIZATION
|
||||||
|
'grabber' is omitted, the default grabber will be used. If
|
||||||
|
kwargs are omitted, then (duh) they will not be used.
|
||||||
|
|
||||||
|
+ kwarg 'max_connections' is used to store the max connection
|
||||||
|
+ limit of this mirror.
|
||||||
|
+
|
||||||
|
3) Pass keyword arguments when instantiating the mirror group.
|
||||||
|
See, for example, the failure_callback argument.
|
||||||
|
|
||||||
|
@@ -91,6 +94,7 @@ import random
|
||||||
|
import thread # needed for locking to make this threadsafe
|
||||||
|
|
||||||
|
from grabber import URLGrabError, CallbackObject, DEBUG, _to_utf8
|
||||||
|
+from grabber import _run_callback, _do_raise, _async_queue
|
||||||
|
|
||||||
|
def _(st):
|
||||||
|
return st
|
||||||
|
@@ -254,7 +258,7 @@ class MirrorGroup:
|
||||||
|
# if these values are found in **kwargs passed to one of the urlXXX
|
||||||
|
# methods, they will be stripped before getting passed on to the
|
||||||
|
# grabber
|
||||||
|
- options = ['default_action', 'failure_callback']
|
||||||
|
+ options = ['default_action', 'failure_callback', 'failfunc']
|
||||||
|
|
||||||
|
def _process_kwargs(self, kwargs):
|
||||||
|
self.failure_callback = kwargs.get('failure_callback')
|
||||||
|
@@ -403,10 +407,25 @@ class MirrorGroup:
|
||||||
|
self._failure(gr, obj)
|
||||||
|
|
||||||
|
def urlgrab(self, url, filename=None, **kwargs):
|
||||||
|
+ if kwargs.get('async'):
|
||||||
|
+ opts = self.grabber.opts.derive(**kwargs)
|
||||||
|
+ opts.mirror_group = self, set()
|
||||||
|
+ opts.relative_url = _to_utf8(url)
|
||||||
|
+
|
||||||
|
+ opts.url = 'http://tbd'
|
||||||
|
+ opts.filename = filename
|
||||||
|
+ opts.size = int(opts.size or 0)
|
||||||
|
+ _async_queue.append(opts)
|
||||||
|
+ return filename
|
||||||
|
+
|
||||||
|
kw = dict(kwargs)
|
||||||
|
kw['filename'] = filename
|
||||||
|
func = 'urlgrab'
|
||||||
|
- return self._mirror_try(func, url, kw)
|
||||||
|
+ try:
|
||||||
|
+ return self._mirror_try(func, url, kw)
|
||||||
|
+ except URLGrabError, e:
|
||||||
|
+ obj = CallbackObject(url=url, filename=filename, exception=e, **kwargs)
|
||||||
|
+ return _run_callback(kwargs.get('failfunc', _do_raise), obj)
|
||||||
|
|
||||||
|
def urlopen(self, url, **kwargs):
|
||||||
|
kw = dict(kwargs)
|
||||||
|
diff --git a/urlgrabber/progress.py b/urlgrabber/progress.py
|
||||||
|
index 3d7e99a..4c126c5 100644
|
||||||
|
--- a/urlgrabber/progress.py
|
||||||
|
+++ b/urlgrabber/progress.py
|
||||||
|
@@ -576,7 +576,6 @@ class TextMultiFileMeter(MultiFileMeter):
|
||||||
|
self.fo.write(out)
|
||||||
|
finally:
|
||||||
|
self._lock.release()
|
||||||
|
- self._do_update_meter(meter, now)
|
||||||
|
|
||||||
|
def _do_failure_meter(self, meter, message, now):
|
||||||
|
self._lock.acquire()
|
||||||
|
@@ -599,15 +598,6 @@ class TextMultiFileMeter(MultiFileMeter):
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self._lock.release()
|
||||||
|
-
|
||||||
|
- def _do_end(self, now):
|
||||||
|
- self._do_update_meter(None, now)
|
||||||
|
- self._lock.acquire()
|
||||||
|
- try:
|
||||||
|
- self.fo.write('\n')
|
||||||
|
- self.fo.flush()
|
||||||
|
- finally:
|
||||||
|
- self._lock.release()
|
||||||
|
|
||||||
|
######################################################################
|
||||||
|
# support classes and functions
|
Loading…
Reference in new issue