Server IP : 127.0.0.2 / Your IP : 3.14.4.171 Web Server : Apache/2.4.18 (Ubuntu) System : User : www-data ( ) PHP Version : 7.0.33-0ubuntu0.16.04.16 Disable Function : disk_free_space,disk_total_space,diskfreespace,dl,exec,fpaththru,getmyuid,getmypid,highlight_file,ignore_user_abord,leak,listen,link,opcache_get_configuration,opcache_get_status,passthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,php_uname,phpinfo,posix_ctermid,posix_getcwd,posix_getegid,posix_geteuid,posix_getgid,posix_getgrgid,posix_getgrnam,posix_getgroups,posix_getlogin,posix_getpgid,posix_getpgrp,posix_getpid,posix,_getppid,posix_getpwnam,posix_getpwuid,posix_getrlimit,posix_getsid,posix_getuid,posix_isatty,posix_kill,posix_mkfifo,posix_setegid,posix_seteuid,posix_setgid,posix_setpgid,posix_setsid,posix_setuid,posix_times,posix_ttyname,posix_uname,pclose,popen,proc_open,proc_close,proc_get_status,proc_nice,proc_terminate,shell_exec,source,show_source,system,virtual MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/python2.7/dist-packages/openid/yadis/ |
Upload File : |
# -*- test-case-name: yadis.test.test_etxrd -*- """ ElementTree interface to an XRD document. """ __all__ = [ 'nsTag', 'mkXRDTag', 'isXRDS', 'parseXRDS', 'getCanonicalID', 'getYadisXRD', 'getPriorityStrict', 'getPriority', 'prioSort', 'iterServices', 'expandService', 'expandServices', ] import sys import random from datetime import datetime from time import strptime from openid.oidutil import importElementTree ElementTree = importElementTree() # the different elementtree modules don't have a common exception # model. We just want to be able to catch the exceptions that signify # malformed XML data and wrap them, so that the other library code # doesn't have to know which XML library we're using. try: # Make the parser raise an exception so we can sniff out the type # of exceptions ElementTree.XML('> purposely malformed XML <') except (SystemExit, MemoryError, AssertionError, ImportError): raise except: XMLError = sys.exc_info()[0] from openid.yadis import xri class XRDSError(Exception): """An error with the XRDS document.""" # The exception that triggered this exception reason = None class XRDSFraud(XRDSError): """Raised when there's an assertion in the XRDS that it does not have the authority to make. """ def parseXRDS(text): """Parse the given text as an XRDS document. @return: ElementTree containing an XRDS document @raises XRDSError: When there is a parse error or the document does not contain an XRDS. """ try: element = ElementTree.XML(text) except XMLError, why: exc = XRDSError('Error parsing document as XML') exc.reason = why raise exc else: tree = ElementTree.ElementTree(element) if not isXRDS(tree): raise XRDSError('Not an XRDS document') return tree XRD_NS_2_0 = 'xri://$xrd*($v*2.0)' XRDS_NS = 'xri://$xrds' def nsTag(ns, t): return '{%s}%s' % (ns, t) def mkXRDTag(t): """basestring -> basestring Create a tag name in the XRD 2.0 XML namespace suitable for using with ElementTree """ return nsTag(XRD_NS_2_0, t) def mkXRDSTag(t): """basestring -> basestring Create a tag name in the XRDS XML namespace suitable for using with ElementTree """ return nsTag(XRDS_NS, t) # Tags that are used in Yadis documents root_tag = mkXRDSTag('XRDS') service_tag = mkXRDTag('Service') xrd_tag = mkXRDTag('XRD') type_tag = mkXRDTag('Type') uri_tag = mkXRDTag('URI') expires_tag = mkXRDTag('Expires') # Other XRD tags canonicalID_tag = mkXRDTag('CanonicalID') def isXRDS(xrd_tree): """Is this document an XRDS document?""" root = xrd_tree.getroot() return root.tag == root_tag def getYadisXRD(xrd_tree): """Return the XRD element that should contain the Yadis services""" xrd = None # for the side-effect of assigning the last one in the list to the # xrd variable for xrd in xrd_tree.findall(xrd_tag): pass # There were no elements found, or else xrd would be set to the # last one if xrd is None: raise XRDSError('No XRD present in tree') return xrd def getXRDExpiration(xrd_element, default=None): """Return the expiration date of this XRD element, or None if no expiration was specified. @type xrd_element: ElementTree node @param default: The value to use as the expiration if no expiration was specified in the XRD. @rtype: datetime.datetime @raises ValueError: If the xrd:Expires element is present, but its contents are not formatted according to the specification. """ expires_element = xrd_element.find(expires_tag) if expires_element is None: return default else: expires_string = expires_element.text # Will raise ValueError if the string is not the expected format expires_time = strptime(expires_string, "%Y-%m-%dT%H:%M:%SZ") return datetime(*expires_time[0:6]) def getCanonicalID(iname, xrd_tree): """Return the CanonicalID from this XRDS document. @param iname: the XRI being resolved. @type iname: unicode @param xrd_tree: The XRDS output from the resolver. @type xrd_tree: ElementTree @returns: The XRI CanonicalID or None. @returntype: unicode or None """ xrd_list = xrd_tree.findall(xrd_tag) xrd_list.reverse() try: canonicalID = xri.XRI(xrd_list[0].findall(canonicalID_tag)[0].text) except IndexError: return None childID = canonicalID.lower() for xrd in xrd_list[1:]: # XXX: can't use rsplit until we require python >= 2.4. parent_sought = childID[:childID.rindex('!')] parent = xri.XRI(xrd.findtext(canonicalID_tag)) if parent_sought != parent.lower(): raise XRDSFraud("%r can not come from %s" % (childID, parent)) childID = parent_sought root = xri.rootAuthority(iname) if not xri.providerIsAuthoritative(root, childID): raise XRDSFraud("%r can not come from root %r" % (childID, root)) return canonicalID class _Max(object): """Value that compares greater than any other value. Should only be used as a singleton. Implemented for use as a priority value for when a priority is not specified.""" def __cmp__(self, other): if other is self: return 0 return 1 Max = _Max() def getPriorityStrict(element): """Get the priority of this element. Raises ValueError if the value of the priority is invalid. If no priority is specified, it returns a value that compares greater than any other value. """ prio_str = element.get('priority') if prio_str is not None: prio_val = int(prio_str) if prio_val >= 0: return prio_val else: raise ValueError('Priority values must be non-negative integers') # Any errors in parsing the priority fall through to here return Max def getPriority(element): """Get the priority of this element Returns Max if no priority is specified or the priority value is invalid. """ try: return getPriorityStrict(element) except ValueError: return Max def prioSort(elements): """Sort a list of elements that have priority attributes""" # Randomize the services before sorting so that equal priority # elements are load-balanced. random.shuffle(elements) prio_elems = [(getPriority(e), e) for e in elements] prio_elems.sort() sorted_elems = [s for (_, s) in prio_elems] return sorted_elems def iterServices(xrd_tree): """Return an iterable over the Service elements in the Yadis XRD sorted by priority""" xrd = getYadisXRD(xrd_tree) return prioSort(xrd.findall(service_tag)) def sortedURIs(service_element): """Given a Service element, return a list of the contents of all URI tags in priority order.""" return [uri_element.text for uri_element in prioSort(service_element.findall(uri_tag))] def getTypeURIs(service_element): """Given a Service element, return a list of the contents of all Type tags""" return [type_element.text for type_element in service_element.findall(type_tag)] def expandService(service_element): """Take a service element and expand it into an iterator of: ([type_uri], uri, service_element) """ uris = sortedURIs(service_element) if not uris: uris = [None] expanded = [] for uri in uris: type_uris = getTypeURIs(service_element) expanded.append((type_uris, uri, service_element)) return expanded def expandServices(service_elements): """Take a sorted iterator of service elements and expand it into a sorted iterator of: ([type_uri], uri, service_element) There may be more than one item in the resulting list for each service element if there is more than one URI or type for a service, but each triple will be unique. If there is no URI or Type for a Service element, it will not appear in the result. """ expanded = [] for service_element in service_elements: expanded.extend(expandService(service_element)) return expanded