Dre4m Shell
Server IP : 127.0.0.2  /  Your IP : 13.58.199.13
Web Server : Apache/2.4.18 (Ubuntu)
System :
User : www-data ( )
PHP Version : 7.0.33-0ubuntu0.16.04.16
Disable Function : disk_free_space,disk_total_space,diskfreespace,dl,exec,fpaththru,getmyuid,getmypid,highlight_file,ignore_user_abord,leak,listen,link,opcache_get_configuration,opcache_get_status,passthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,php_uname,phpinfo,posix_ctermid,posix_getcwd,posix_getegid,posix_geteuid,posix_getgid,posix_getgrgid,posix_getgrnam,posix_getgroups,posix_getlogin,posix_getpgid,posix_getpgrp,posix_getpid,posix,_getppid,posix_getpwnam,posix_getpwuid,posix_getrlimit,posix_getsid,posix_getuid,posix_isatty,posix_kill,posix_mkfifo,posix_setegid,posix_seteuid,posix_setgid,posix_setpgid,posix_setsid,posix_setuid,posix_times,posix_ttyname,posix_uname,pclose,popen,proc_open,proc_close,proc_get_status,proc_nice,proc_terminate,shell_exec,source,show_source,system,virtual
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python2.7/dist-packages/ZSI/twisted/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python2.7/dist-packages/ZSI/twisted/WSsecurity.py
###########################################################################
# Joshua R. Boverhof, LBNL
# See Copyright for copyright notice!
# $Id: WSsecurity.py 1134 2006-02-24 00:23:06Z boverhof $
###########################################################################

import sys, time, warnings
import sha, base64

# twisted & related imports
from zope.interface import classProvides, implements, Interface
from twisted.python import log, failure
from twisted.web.error import NoResource
from twisted.web.server import NOT_DONE_YET
from twisted.internet import reactor
import twisted.web.http
import twisted.web.resource

# ZSI imports
from ZSI import _get_element_nsuri_name, EvaluateException, ParseException
from ZSI.parse import ParsedSoap
from ZSI.writer import SoapWriter
from ZSI.TC import _get_global_element_declaration as GED
from ZSI import fault
from ZSI.wstools.Namespaces import OASIS, DSIG
from WSresource import DefaultHandlerChain, HandlerChainInterface,\
    WSAddressCallbackHandler, DataHandler, WSAddressHandler


#
# Global Element Declarations
# 
UsernameTokenDec = GED(OASIS.WSSE, "UsernameToken")
SecurityDec = GED(OASIS.WSSE, "Security")
SignatureDec = GED(DSIG.BASE, "Signature")
PasswordDec = GED(OASIS.WSSE, "Password")
NonceDec = GED(OASIS.WSSE, "Nonce")
CreatedDec = GED(OASIS.UTILITY, "Created")

if None in [UsernameTokenDec,SecurityDec,SignatureDec,PasswordDec,NonceDec,CreatedDec]:
    raise ImportError, 'required global element(s) unavailable: %s ' %({
        (OASIS.WSSE, "UsernameToken"):UsernameTokenDec,
        (OASIS.WSSE, "Security"):SecurityDec,
        (DSIG.BASE, "Signature"):SignatureDec,
        (OASIS.WSSE, "Password"):PasswordDec,
        (OASIS.WSSE, "Nonce"):NonceDec,
        (OASIS.UTILITY, "Created"):CreatedDec,
        })
    
    
# 
# Stability: Unstable, Untested, Not Finished.
# 

class WSSecurityHandler:
    """Web Services Security: SOAP Message Security 1.0
    
    Class Variables:
        debug -- If True provide more detailed SOAP:Fault information to clients.
    """
    classProvides(HandlerChainInterface)
    debug = True
    
    @classmethod
    def processRequest(cls, ps, **kw):
        if type(ps) is not ParsedSoap:
            raise TypeError,'Expecting ParsedSoap instance'
        
        security = ps.ParseHeaderElements([cls.securityDec])
        
        # Assume all security headers are supposed to be processed here.
        for pyobj in security or []:
            for any in pyobj.Any or []:
                
                if any.typecode is UsernameTokenDec:
                    try:
                        ps = cls.UsernameTokenProfileHandler.processRequest(ps, any)
                    except Exception, ex:
                        if cls.debug: raise
                        raise RuntimeError, 'Unauthorized Username/passphrase combination'
                    continue
                
                if any.typecode is SignatureDec:
                    try:
                        ps = cls.SignatureHandler.processRequest(ps, any)
                    except Exception, ex:
                        if cls.debug: raise
                        raise RuntimeError, 'Invalid Security Header'
                    continue
                
                raise RuntimeError, 'WS-Security, Unsupported token %s' %str(any)
            
        return ps

    @classmethod
    def processResponse(cls, output, **kw):
        return output


    class UsernameTokenProfileHandler:
        """Web Services Security UsernameToken Profile 1.0
        
        Class Variables:
            targetNamespace --
        """
        classProvides(HandlerChainInterface)
        
        # Class Variables
        targetNamespace = OASIS.WSSE
        sweepInterval = 60*5
        nonces = None
            
        # Set to None to disable
        PasswordText = targetNamespace + "#PasswordText"
        PasswordDigest = targetNamespace + "#PasswordDigest"
            
        # Override passwordCallback 
        passwordCallback = lambda cls,username: None
        
        @classmethod
        def sweep(cls, index):
            """remove nonces every sweepInterval.
            Parameters:
                index -- remove all nonces up to this index.
            """
            if cls.nonces is None: 
                cls.nonces = []
            
            seconds = cls.sweepInterval
            cls.nonces = cls.nonces[index:]
            reactor.callLater(seconds, cls.sweep, len(cls.nonces))
        
        @classmethod
        def processRequest(cls, ps, token, **kw):
            """
            Parameters:
                ps -- ParsedSoap instance
                token -- UsernameToken pyclass instance
            """
            if token.typecode is not UsernameTokenDec:
                raise TypeError, 'expecting GED (%s,%s) representation.' %(
                    UsernameTokenDec.nspname, UsernameTokenDec.pname)
                    
            username = token.Username
            
            # expecting only one password
            # may have a nonce and a created
            password = nonce = timestamp = None
            for any in token.Any or []:
                if any.typecode is PasswordDec:
                    password = any
                    continue
                
                if any.typecode is NonceTypeDec:
                    nonce = any
                    continue
                
                if any.typecode is CreatedTypeDec:
                    timestamp = any
                    continue
                
                raise TypeError, 'UsernameTokenProfileHander unexpected %s' %str(any)

            if password is None:
                raise RuntimeError, 'Unauthorized, no password'
            
            # TODO: not yet supporting complexType simpleContent in pyclass_type
            attrs = getattr(password, password.typecode.attrs_aname, {})
            pwtype = attrs.get('Type', cls.PasswordText)
            
            # Clear Text Passwords
            if cls.PasswordText is not None and pwtype == cls.PasswordText:
                if password == cls.passwordCallback(username):
                    return ps
                
                raise RuntimeError, 'Unauthorized, clear text password failed'
            
            if cls.nonces is None: cls.sweep(0)
            if nonce is not None:
                if nonce in cls.nonces:
                    raise RuntimeError, 'Invalid Nonce'
                
                # created was 10 seconds ago or sooner
                if created is not None and created < time.gmtime(time.time()-10):
                    raise RuntimeError, 'UsernameToken created is expired' 
                
                cls.nonces.append(nonce)
            
            # PasswordDigest, recommended that implemenations
            # require a Nonce and Created
            if cls.PasswordDigest is not None and pwtype == cls.PasswordDigest:
                digest = sha.sha()
                for i in (nonce, created, cls.passwordCallback(username)):
                    if i is None: continue
                    digest.update(i)

                if password == base64.encodestring(digest.digest()).strip():
                    return ps
                
                raise RuntimeError, 'Unauthorized, digest failed'
            
            raise RuntimeError, 'Unauthorized, contents of UsernameToken unknown'
            
        @classmethod
        def processResponse(cls, output, **kw):
            return output
        
    @staticmethod
    def hmac_sha1(xml):
        return 
    
    class SignatureHandler:
        """Web Services Security UsernameToken Profile 1.0
        """
        digestMethods = {
            DSIG.BASE+"#sha1":sha.sha,
            }
        signingMethods = {
            DSIG.BASE+"#hmac-sha1":hmac_sha1,
            }
        canonicalizationMethods = {
            DSIG.C14N_EXCL:lambda node: Canonicalize(node, unsuppressedPrefixes=[]),
            DSIG.C14N:lambda node: Canonicalize(node),
            }
            
        @classmethod
        def processRequest(cls, ps, signature, **kw):
            """
            Parameters:
                ps -- ParsedSoap instance
                signature -- Signature pyclass instance
            """
            if token.typecode is not SignatureDec:
                raise TypeError, 'expecting GED (%s,%s) representation.' %(
                    SignatureDec.nspname, SignatureDec.pname)
                    
            si = signature.SignedInfo
            si.CanonicalizationMethod
            calgo = si.CanonicalizationMethod.get_attribute_Algorithm()
            for any in si.CanonicalizationMethod.Any:
                pass
            
            # Check Digest
            si.Reference
            context = XPath.Context.Context(ps.dom, processContents={'wsu':OASIS.UTILITY})
            exp = XPath.Compile('//*[@wsu:Id="%s"]' %si.Reference.get_attribute_URI())
            nodes = exp.evaluate(context)
            if len(nodes) != 1:
                raise RuntimeError, 'A SignedInfo Reference must refer to one node %s.' %(
                    si.Reference.get_attribute_URI())
                    
            try:
                xml = cls.canonicalizeMethods[calgo](nodes[0])
            except IndexError:
                raise RuntimeError, 'Unsupported canonicalization algorithm'
            
            try:
                digest = cls.digestMethods[salgo]
            except IndexError:
                raise RuntimeError, 'unknown digestMethods Algorithm'
            
            digestValue = base64.encodestring(digest(xml).digest()).strip()
            if si.Reference.DigestValue != digestValue:
                raise RuntimeError, 'digest does not match'
            
            if si.Reference.Transforms:
                pass
            
            signature.KeyInfo
            signature.KeyInfo.KeyName
            signature.KeyInfo.KeyValue
            signature.KeyInfo.RetrievalMethod
            signature.KeyInfo.X509Data
            signature.KeyInfo.PGPData
            signature.KeyInfo.SPKIData
            signature.KeyInfo.MgmtData
            signature.KeyInfo.Any 
            
            signature.Object
            
            # TODO: Check Signature
            signature.SignatureValue
            si.SignatureMethod
            salgo = si.SignatureMethod.get_attribute_Algorithm()
            if si.SignatureMethod.HMACOutputLength:
                pass
            for any in si.SignatureMethod.Any:
                pass
            
            # <SignedInfo><Reference URI="">
            exp = XPath.Compile('//child::*[attribute::URI = "%s"]/..' %(
                                 si.Reference.get_attribute_URI()))
            nodes = exp.evaluate(context)
            if len(nodes) != 1:
                raise RuntimeError, 'A SignedInfo Reference must refer to one node %s.' %(
                    si.Reference.get_attribute_URI())
                    
            try:
                xml = cls.canonicalizeMethods[calgo](nodes[0])
            except IndexError:
                raise RuntimeError, 'Unsupported canonicalization algorithm'
            
            # TODO: Check SignatureValue
            
        @classmethod
        def processResponse(cls, output, **kw):
            return output
        

    class X509TokenProfileHandler:
        """Web Services Security UsernameToken Profile 1.0
        """
        targetNamespace = DSIG.BASE
        
        # Token Types
        singleCertificate = targetNamespace + "#X509v3"
        certificatePath = targetNamespace + "#X509PKIPathv1"
        setCerticatesCRLs = targetNamespace + "#PKCS7"
        
        @classmethod
        def processRequest(cls, ps, signature, **kw):
            return ps



"""
<element name="KeyInfo" type="ds:KeyInfoType"/>
<complexType name="KeyInfoType" mixed="true">
  <choice maxOccurs="unbounded">
    <element ref="ds:KeyName"/>
    <element ref="ds:KeyValue"/>
    <element ref="ds:RetrievalMethod"/>
    <element ref="ds:X509Data"/>
    <element ref="ds:PGPData"/>
    <element ref="ds:SPKIData"/>
    <element ref="ds:MgmtData"/>
    <any processContents="lax" namespace="##other"/>
    <!-- (1,1) elements from (0,unbounded) namespaces -->
  </choice>
  <attribute name="Id" type="ID" use="optional"/>
</complexType>



<element name="Signature" type="ds:SignatureType"/>
<complexType name="SignatureType">
  <sequence>
    <element ref="ds:SignedInfo"/>
    <element ref="ds:SignatureValue"/>
    <element ref="ds:KeyInfo" minOccurs="0"/>
    <element ref="ds:Object" minOccurs="0" maxOccurs="unbounded"/>
  </sequence>
  <attribute name="Id" type="ID" use="optional"/>
</complexType>

  <element name="SignatureValue" type="ds:SignatureValueType"/>
  <complexType name="SignatureValueType">
    <simpleContent>
      <extension base="base64Binary">
        <attribute name="Id" type="ID" use="optional"/>
      </extension>
    </simpleContent>
  </complexType>

<!-- Start SignedInfo -->

<element name="SignedInfo" type="ds:SignedInfoType"/>
<complexType name="SignedInfoType">
  <sequence>
    <element ref="ds:CanonicalizationMethod"/>
    <element ref="ds:SignatureMethod"/>
    <element ref="ds:Reference" maxOccurs="unbounded"/>
  </sequence> 
  <attribute name="Id" type="ID" use="optional"/>
</complexType>
"""


class WSSecurityHandlerChainFactory:
    protocol = DefaultHandlerChain
    
    @classmethod
    def newInstance(cls):
            
        return cls.protocol(WSAddressCallbackHandler, DataHandler, 
            WSSecurityHandler, WSAddressHandler())




Anon7 - 2022
AnonSec Team