Source code for dp_tornado.helper.web.url

# -*- coding: utf-8 -*-


import requests.utils

from dp_tornado.engine.helper import Helper as dpHelper

try:
    import urllib.parse as _parse
except:
    import urllib as _parse

try:
    import urlparse
except ImportError:
    import urllib.parse as urlparse


[docs]class UrlParse(object): def __init__(self, request=None, scheme='', netloc='', path='', params=None, query=None, framgment=''): if request and 'X-Proxy-Prefix' in request.headers: prefix_path = request.headers['X-Proxy-Prefix'].strip() prefix_path = prefix_path[:-1] if prefix_path.endswith('/') else prefix_path if path.startswith(prefix_path): path = path[len(prefix_path):] or '/' self.request = request self.scheme = scheme self.netloc = netloc self.path = path self.params = params or {} self.query = query or {} self.fragment = framgment def __str__(self): return ('scheme=%s, netloc=%s, path=%s, params=%s, query=%s, fragment=%s' % (self.scheme, self.netloc, self.path, self.params, self.query, self.fragment)) def __eq__(self, other): if self.scheme != other.scheme: return False if self.netloc != other.netloc: return False if self.path != other.path: return False if self.fragment != other.fragment: return False if len(self.query) != len(other.query): return False for k, v in self.query.items(): if v != other.query[k]: return False return True def __ne__(self, other): return not self.__eq__(other)
[docs] def build(self): url = '' if self.scheme: url = '%s://' % self.scheme if self.netloc: url = '%s%s' % (url, self.netloc) if self.path: url = '%s%s' % (url, self.path) if self.query: url = '%s?%s' % (url, _parse.urlencode(self.query)) return url
[docs]class UrlHelper(dpHelper):
[docs] def validate(self, e): return self.helper.validator.url.validate(e)
[docs] def join(self, *args): base = '' for e in args: base = urlparse.urljoin(base, e) return base
[docs] def parse(self, e): if self.helper.misc.type.check.string(e): uri = self.helper.string.cast.string(e) request = None else: request = e.request if hasattr(e, 'request_uri'): request_uri = e.request_uri() elif hasattr(request, 'uri'): request_uri = request.uri else: raise Exception uri = '%s://%s%s' % (request.protocol, request.host, request_uri) if self.helper.misc.system.py_version <= 2: p = urlparse.urlparse(uri) query = dict(urlparse.parse_qsl(p.query, keep_blank_values=True)) else: p = _parse.urlparse(uri) query = dict(_parse.parse_qsl(p.query, keep_blank_values=True)) return UrlParse(request, p.scheme, p.netloc, p.path, p.params, query, p.fragment)
[docs] def build(self, url=None, query=None): if self.helper.misc.type.check.string(url): uri = self.parse(url) elif not hasattr(url, 'build'): uri = self.parse('') else: uri = url if query: for k, v in query.items(): uri.query[k] = v return uri.build()
[docs] def encode(self, *args, **kwargs): return self.quote(*args, **kwargs)
[docs] def decode(self, *args, **kwargs): return self.unquote(*args, **kwargs)
[docs] def quote(self, e, safe='', **kwargs): if self.helper.misc.system.py_version <= 2: return requests.utils.quote(e, safe=safe) else: return requests.utils.quote(e, safe=safe, **kwargs)
[docs] def unquote(self, e, **kwargs): if self.helper.misc.system.py_version <= 2: return requests.utils.unquote(e) else: return requests.utils.unquote(e, **kwargs)