Dre4m Shell
Server IP : 127.0.0.2  /  Your IP : 18.117.9.230
Web Server : Apache/2.4.18 (Ubuntu)
System :
User : www-data ( )
PHP Version : 7.0.33-0ubuntu0.16.04.16
Disable Function : disk_free_space,disk_total_space,diskfreespace,dl,exec,fpaththru,getmyuid,getmypid,highlight_file,ignore_user_abord,leak,listen,link,opcache_get_configuration,opcache_get_status,passthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,php_uname,phpinfo,posix_ctermid,posix_getcwd,posix_getegid,posix_geteuid,posix_getgid,posix_getgrgid,posix_getgrnam,posix_getgroups,posix_getlogin,posix_getpgid,posix_getpgrp,posix_getpid,posix,_getppid,posix_getpwnam,posix_getpwuid,posix_getrlimit,posix_getsid,posix_getuid,posix_isatty,posix_kill,posix_mkfifo,posix_setegid,posix_seteuid,posix_setgid,posix_setpgid,posix_setsid,posix_setuid,posix_times,posix_ttyname,posix_uname,pclose,popen,proc_open,proc_close,proc_get_status,proc_nice,proc_terminate,shell_exec,source,show_source,system,virtual
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python2.7/dist-packages/usb/backend/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python2.7/dist-packages/usb/backend/libusb1.py
# Copyright (C) 2009-2014 Wander Lairson Costa
#
# The following terms apply to all files associated
# with the software unless explicitly disclaimed in individual files.
#
# The authors hereby grant permission to use, copy, modify, distribute,
# and license this software and its documentation for any purpose, provided
# that existing copyright notices are retained in all copies and that this
# notice is included verbatim in any distributions. No written agreement,
# license, or royalty fee is required for any of the authorized uses.
# Modifications to this software may be copyrighted by their authors
# and need not follow the licensing terms described here, provided that
# the new terms are clearly indicated on the first page of each file where
# they apply.
#
# IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
# FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
# ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
# DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
# INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT.  THIS SOFTWARE
# IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE
# NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
# MODIFICATIONS.

from ctypes import *
import usb.util
import sys
import logging
from usb._debug import methodtrace
import usb._interop as _interop
import errno
import math
from usb.core import USBError
import usb.libloader

__author__ = 'Wander Lairson Costa'

__all__ = [
            'get_backend',
            'LIBUSB_SUCESS',
            'LIBUSB_ERROR_IO',
            'LIBUSB_ERROR_INVALID_PARAM',
            'LIBUSB_ERROR_ACCESS',
            'LIBUSB_ERROR_NO_DEVICE',
            'LIBUSB_ERROR_NOT_FOUND',
            'LIBUSB_ERROR_BUSY',
            'LIBUSB_ERROR_TIMEOUT',
            'LIBUSB_ERROR_OVERFLOW',
            'LIBUSB_ERROR_PIPE',
            'LIBUSB_ERROR_INTERRUPTED',
            'LIBUSB_ERROR_NO_MEM',
            'LIBUSB_ERROR_NOT_SUPPORTED',
            'LIBUSB_ERROR_OTHER'
            'LIBUSB_TRANSFER_COMPLETED',
            'LIBUSB_TRANSFER_ERROR',
            'LIBUSB_TRANSFER_TIMED_OUT',
            'LIBUSB_TRANSFER_CANCELLED',
            'LIBUSB_TRANSFER_STALL',
            'LIBUSB_TRANSFER_NO_DEVICE',
            'LIBUSB_TRANSFER_OVERFLOW'
        ]

_logger = logging.getLogger('usb.backend.libusb1')

# libusb.h

# transfer_type codes
# Control endpoint
_LIBUSB_TRANSFER_TYPE_CONTROL = 0,
# Isochronous endpoint
_LIBUSB_TRANSFER_TYPE_ISOCHRONOUS = 1
# Bulk endpoint
_LIBUSB_TRANSFER_TYPE_BULK = 2
# Interrupt endpoint
_LIBUSB_TRANSFER_TYPE_INTERRUPT = 3

# return codes

LIBUSB_SUCCESS = 0
LIBUSB_ERROR_IO = -1
LIBUSB_ERROR_INVALID_PARAM = -2
LIBUSB_ERROR_ACCESS = -3
LIBUSB_ERROR_NO_DEVICE = -4
LIBUSB_ERROR_NOT_FOUND = -5
LIBUSB_ERROR_BUSY = -6
LIBUSB_ERROR_TIMEOUT = -7
LIBUSB_ERROR_OVERFLOW = -8
LIBUSB_ERROR_PIPE = -9
LIBUSB_ERROR_INTERRUPTED = -10
LIBUSB_ERROR_NO_MEM = -11
LIBUSB_ERROR_NOT_SUPPORTED = -12
LIBUSB_ERROR_OTHER = -99

# map return code to errno values
_libusb_errno = {
    0:None,
    LIBUSB_ERROR_IO:errno.__dict__.get('EIO', None),
    LIBUSB_ERROR_INVALID_PARAM:errno.__dict__.get('EINVAL', None),
    LIBUSB_ERROR_ACCESS:errno.__dict__.get('EACCES', None),
    LIBUSB_ERROR_NO_DEVICE:errno.__dict__.get('ENODEV', None),
    LIBUSB_ERROR_NOT_FOUND:errno.__dict__.get('ENOENT', None),
    LIBUSB_ERROR_BUSY:errno.__dict__.get('EBUSY', None),
    LIBUSB_ERROR_TIMEOUT:errno.__dict__.get('ETIMEDOUT', None),
    LIBUSB_ERROR_OVERFLOW:errno.__dict__.get('EOVERFLOW', None),
    LIBUSB_ERROR_PIPE:errno.__dict__.get('EPIPE', None),
    LIBUSB_ERROR_INTERRUPTED:errno.__dict__.get('EINTR', None),
    LIBUSB_ERROR_NO_MEM:errno.__dict__.get('ENOMEM', None),
    LIBUSB_ERROR_NOT_SUPPORTED:errno.__dict__.get('ENOSYS', None),
    LIBUSB_ERROR_OTHER:None
}

# Transfer status codes:
# Note that this does not indicate
# that the entire amount of requested data was transferred.
LIBUSB_TRANSFER_COMPLETED = 0
LIBUSB_TRANSFER_ERROR = 1
LIBUSB_TRANSFER_TIMED_OUT = 2
LIBUSB_TRANSFER_CANCELLED = 3
LIBUSB_TRANSFER_STALL = 4
LIBUSB_TRANSFER_NO_DEVICE = 5
LIBUSB_TRANSFER_OVERFLOW = 6

# map return codes to strings
_str_transfer_error = {
    LIBUSB_TRANSFER_COMPLETED:'Success (no error)',
    LIBUSB_TRANSFER_ERROR:'Transfer failed',
    LIBUSB_TRANSFER_TIMED_OUT:'Transfer timed out',
    LIBUSB_TRANSFER_CANCELLED:'Transfer was cancelled',
    LIBUSB_TRANSFER_STALL:'For bulk/interrupt endpoints: halt condition '\
                          'detected (endpoint stalled). For control '\
                          'endpoints: control request not supported.',
    LIBUSB_TRANSFER_NO_DEVICE:'Device was disconnected',
    LIBUSB_TRANSFER_OVERFLOW:'Device sent more data than requested'
}

# map transfer codes to errno codes
_transfer_errno = {
    LIBUSB_TRANSFER_COMPLETED:0,
    LIBUSB_TRANSFER_ERROR:errno.__dict__.get('EIO', None),
    LIBUSB_TRANSFER_TIMED_OUT:errno.__dict__.get('ETIMEDOUT', None),
    LIBUSB_TRANSFER_CANCELLED:errno.__dict__.get('EAGAIN', None),
    LIBUSB_TRANSFER_STALL:errno.__dict__.get('EIO', None),
    LIBUSB_TRANSFER_NO_DEVICE:errno.__dict__.get('ENODEV', None),
    LIBUSB_TRANSFER_OVERFLOW:errno.__dict__.get('EOVERFLOW', None)
}

# Data structures

class _libusb_endpoint_descriptor(Structure):
    _fields_ = [('bLength', c_uint8),
                ('bDescriptorType', c_uint8),
                ('bEndpointAddress', c_uint8),
                ('bmAttributes', c_uint8),
                ('wMaxPacketSize', c_uint16),
                ('bInterval', c_uint8),
                ('bRefresh', c_uint8),
                ('bSynchAddress', c_uint8),
                ('extra', POINTER(c_ubyte)),
                ('extra_length', c_int)]

class _libusb_interface_descriptor(Structure):
    _fields_ = [('bLength', c_uint8),
                ('bDescriptorType', c_uint8),
                ('bInterfaceNumber', c_uint8),
                ('bAlternateSetting', c_uint8),
                ('bNumEndpoints', c_uint8),
                ('bInterfaceClass', c_uint8),
                ('bInterfaceSubClass', c_uint8),
                ('bInterfaceProtocol', c_uint8),
                ('iInterface', c_uint8),
                ('endpoint', POINTER(_libusb_endpoint_descriptor)),
                ('extra', POINTER(c_ubyte)),
                ('extra_length', c_int)]

class _libusb_interface(Structure):
    _fields_ = [('altsetting', POINTER(_libusb_interface_descriptor)),
                ('num_altsetting', c_int)]

class _libusb_config_descriptor(Structure):
    _fields_ = [('bLength', c_uint8),
                ('bDescriptorType', c_uint8),
                ('wTotalLength', c_uint16),
                ('bNumInterfaces', c_uint8),
                ('bConfigurationValue', c_uint8),
                ('iConfiguration', c_uint8),
                ('bmAttributes', c_uint8),
                ('bMaxPower', c_uint8),
                ('interface', POINTER(_libusb_interface)),
                ('extra', POINTER(c_ubyte)),
                ('extra_length', c_int)]

class _libusb_device_descriptor(Structure):
    _fields_ = [('bLength', c_uint8),
                ('bDescriptorType', c_uint8),
                ('bcdUSB', c_uint16),
                ('bDeviceClass', c_uint8),
                ('bDeviceSubClass', c_uint8),
                ('bDeviceProtocol', c_uint8),
                ('bMaxPacketSize0', c_uint8),
                ('idVendor', c_uint16),
                ('idProduct', c_uint16),
                ('bcdDevice', c_uint16),
                ('iManufacturer', c_uint8),
                ('iProduct', c_uint8),
                ('iSerialNumber', c_uint8),
                ('bNumConfigurations', c_uint8)]


# Isochronous packet descriptor.
class _libusb_iso_packet_descriptor(Structure):
    _fields_ = [('length', c_uint),
                ('actual_length', c_uint),
                ('status', c_int)] # enum libusb_transfer_status

_libusb_device_handle = c_void_p

class _libusb_transfer(Structure):
    pass
_libusb_transfer_p = POINTER(_libusb_transfer)

_libusb_transfer_cb_fn_p = CFUNCTYPE(None, _libusb_transfer_p)

_libusb_transfer._fields_ = [('dev_handle', _libusb_device_handle),
                             ('flags', c_uint8),
                             ('endpoint', c_uint8),
                             ('type', c_uint8),
                             ('timeout', c_uint),
                             ('status', c_int), # enum libusb_transfer_status
                             ('length', c_int),
                             ('actual_length', c_int),
                             ('callback', _libusb_transfer_cb_fn_p),
                             ('user_data', py_object),
                             ('buffer', c_void_p),
                             ('num_iso_packets', c_int),
                             ('iso_packet_desc', _libusb_iso_packet_descriptor)
]

def _get_iso_packet_list(transfer):
    list_type = _libusb_iso_packet_descriptor * transfer.num_iso_packets
    return list_type.from_address(addressof(transfer.iso_packet_desc))

_lib = None

def _load_library(find_library=None):
    # Windows backend uses stdcall calling convention
    #
    # On FreeBSD 8/9, libusb 1.0 and libusb 0.1 are in the same shared
    # object libusb.so, so if we found libusb library name, we must assure
    # it is 1.0 version. We just try to get some symbol from 1.0 version
    if sys.platform == 'win32':
        win_cls = WinDLL
    else:
        win_cls = None

    return usb.libloader.load_locate_library(
                ('usb-1.0', 'libusb-1.0', 'usb'),
                'cygusb-1.0.dll', 'Libusb 1',
                win_cls=win_cls,
                find_library=find_library, check_symbols=('libusb_init',))

def _setup_prototypes(lib):
    # void libusb_set_debug (libusb_context *ctx, int level)
    lib.libusb_set_debug.argtypes = [c_void_p, c_int]

    # int libusb_init (libusb_context **context)
    lib.libusb_init.argtypes = [POINTER(c_void_p)]

    # void libusb_exit (struct libusb_context *ctx)
    lib.libusb_exit.argtypes = [c_void_p]

    # ssize_t libusb_get_device_list (libusb_context *ctx,
    #                                 libusb_device ***list)
    lib.libusb_get_device_list.argtypes = [
            c_void_p,
            POINTER(POINTER(c_void_p))
        ]

    # void libusb_free_device_list (libusb_device **list,
    #                               int unref_devices)
    lib.libusb_free_device_list.argtypes = [
            POINTER(c_void_p),
            c_int
        ]

    # libusb_device *libusb_ref_device (libusb_device *dev)
    lib.libusb_ref_device.argtypes = [c_void_p]
    lib.libusb_ref_device.restype = c_void_p

    # void libusb_unref_device(libusb_device *dev)
    lib.libusb_unref_device.argtypes = [c_void_p]

    # int libusb_open(libusb_device *dev, libusb_device_handle **handle)
    lib.libusb_open.argtypes = [c_void_p, POINTER(_libusb_device_handle)]

    # void libusb_close(libusb_device_handle *dev_handle)
    lib.libusb_close.argtypes = [_libusb_device_handle]

    # int libusb_set_configuration(libusb_device_handle *dev,
    #                              int configuration)
    lib.libusb_set_configuration.argtypes = [_libusb_device_handle, c_int]

    # int libusb_get_configuration(libusb_device_handle *dev,
    #                              int *config)
    lib.libusb_get_configuration.argtypes = [_libusb_device_handle, POINTER(c_int)]

    # int libusb_claim_interface(libusb_device_handle *dev,
    #                               int interface_number)
    lib.libusb_claim_interface.argtypes = [_libusb_device_handle, c_int]

    # int libusb_release_interface(libusb_device_handle *dev,
    #                              int interface_number)
    lib.libusb_release_interface.argtypes = [_libusb_device_handle, c_int]

    # int libusb_set_interface_alt_setting(libusb_device_handle *dev,
    #                                      int interface_number,
    #                                      int alternate_setting)
    lib.libusb_set_interface_alt_setting.argtypes = [
            _libusb_device_handle,
            c_int,
            c_int
        ]

    # int libusb_reset_device (libusb_device_handle *dev)
    lib.libusb_reset_device.argtypes = [_libusb_device_handle]

    # int libusb_kernel_driver_active(libusb_device_handle *dev,
    #                                 int interface)
    lib.libusb_kernel_driver_active.argtypes = [
            _libusb_device_handle,
            c_int
        ]

    # int libusb_detach_kernel_driver(libusb_device_handle *dev,
    #                                 int interface)
    lib.libusb_detach_kernel_driver.argtypes = [
            _libusb_device_handle,
            c_int
        ]

    # int libusb_attach_kernel_driver(libusb_device_handle *dev,
    #                                 int interface)
    lib.libusb_attach_kernel_driver.argtypes = [
            _libusb_device_handle,
            c_int
        ]

    # int libusb_get_device_descriptor(
    #                   libusb_device *dev,
    #                   struct libusb_device_descriptor *desc
    #               )
    lib.libusb_get_device_descriptor.argtypes = [
            c_void_p,
            POINTER(_libusb_device_descriptor)
        ]

    # int libusb_get_config_descriptor(
    #           libusb_device *dev,
    #           uint8_t config_index,
    #           struct libusb_config_descriptor **config
    #       )
    lib.libusb_get_config_descriptor.argtypes = [
            c_void_p,
            c_uint8,
            POINTER(POINTER(_libusb_config_descriptor))
        ]

    # void  libusb_free_config_descriptor(
    #           struct libusb_config_descriptor *config
    #   )
    lib.libusb_free_config_descriptor.argtypes = [
            POINTER(_libusb_config_descriptor)
        ]

    # int libusb_get_string_descriptor_ascii(libusb_device_handle *dev,
    #                                         uint8_t desc_index,
    #                                         unsigned char *data,
    #                                         int length)
    lib.libusb_get_string_descriptor_ascii.argtypes = [
            _libusb_device_handle,
            c_uint8,
            POINTER(c_ubyte),
            c_int
        ]

    # int libusb_control_transfer(libusb_device_handle *dev_handle,
    #                             uint8_t bmRequestType,
    #                             uint8_t bRequest,
    #                             uint16_t wValue,
    #                             uint16_t wIndex,
    #                             unsigned char *data,
    #                             uint16_t wLength,
    #                             unsigned int timeout)
    lib.libusb_control_transfer.argtypes = [
            _libusb_device_handle,
            c_uint8,
            c_uint8,
            c_uint16,
            c_uint16,
            POINTER(c_ubyte),
            c_uint16,
            c_uint
        ]

    #int libusb_bulk_transfer(
    #           struct libusb_device_handle *dev_handle,
    #           unsigned char endpoint,
    #           unsigned char *data,
    #           int length,
    #           int *transferred,
    #           unsigned int timeout
    #       )
    lib.libusb_bulk_transfer.argtypes = [
                _libusb_device_handle,
                c_ubyte,
                POINTER(c_ubyte),
                c_int,
                POINTER(c_int),
                c_uint
            ]

    # int libusb_interrupt_transfer(
    #               libusb_device_handle *dev_handle,
    #               unsigned char endpoint,
    #               unsigned char *data,
    #               int length,
    #               int *actual_length,
    #               unsigned int timeout
    #           );
    lib.libusb_interrupt_transfer.argtypes = [
                    _libusb_device_handle,
                    c_ubyte,
                    POINTER(c_ubyte),
                    c_int,
                    POINTER(c_int),
                    c_uint
                ]

    # libusb_transfer* libusb_alloc_transfer(int iso_packets);
    lib.libusb_alloc_transfer.argtypes = [c_int]
    lib.libusb_alloc_transfer.restype = POINTER(_libusb_transfer)

    # void libusb_free_transfer(struct libusb_transfer *transfer)
    lib.libusb_free_transfer.argtypes = [POINTER(_libusb_transfer)]

    # int libusb_submit_transfer(struct libusb_transfer *transfer);
    lib.libusb_submit_transfer.argtypes = [POINTER(_libusb_transfer)]

    # const char *libusb_strerror(enum libusb_error errcode)
    lib.libusb_strerror.argtypes = [c_uint]
    lib.libusb_strerror.restype = c_char_p

    # int libusb_clear_halt(libusb_device_handle *dev, unsigned char endpoint)
    lib.libusb_clear_halt.argtypes = [_libusb_device_handle, c_ubyte]

    # void libusb_set_iso_packet_lengths(
    #               libusb_transfer* transfer,
    #               unsigned int length
    #           );
    def libusb_set_iso_packet_lengths(transfer_p, length):
        r"""This function is inline in the libusb.h file, so we must implement
            it.

        lib.libusb_set_iso_packet_lengths.argtypes = [
                        POINTER(_libusb_transfer),
                        c_int
                    ]
        """
        transfer = transfer_p.contents
        for iso_packet_desc in _get_iso_packet_list(transfer):
            iso_packet_desc.length = length
    lib.libusb_set_iso_packet_lengths = libusb_set_iso_packet_lengths

    #int libusb_get_max_iso_packet_size(libusb_device* dev,
    #                                   unsigned char endpoint);
    lib.libusb_get_max_iso_packet_size.argtypes = [c_void_p,
                                                   c_ubyte]

    # void libusb_fill_iso_transfer(
    #               struct libusb_transfer* transfer,
    #               libusb_device_handle*  dev_handle,
    #               unsigned char endpoint,
    #               unsigned char* buffer,
    #               int length,
    #               int num_iso_packets,
    #               libusb_transfer_cb_fn   callback,
    #               void * user_data,
    #               unsigned int timeout
    #           );
    def libusb_fill_iso_transfer(_libusb_transfer_p, dev_handle, endpoint, buffer, length,
                                 num_iso_packets, callback, user_data, timeout):
        r"""This function is inline in the libusb.h file, so we must implement
            it.

        lib.libusb_fill_iso_transfer.argtypes = [
                       _libusb_transfer,
                       _libusb_device_handle,
                       c_ubyte,
                       POINTER(c_ubyte),
                       c_int,
                       c_int,
                       _libusb_transfer_cb_fn_p,
                       c_void_p,
                       c_uint
                   ]
        """
        transfer = _libusb_transfer_p.contents
        transfer.dev_handle = dev_handle
        transfer.endpoint = endpoint
        transfer.type = _LIBUSB_TRANSFER_TYPE_ISOCHRONOUS
        transfer.timeout = timeout
        transfer.buffer = cast(buffer, c_void_p)
        transfer.length = length
        transfer.num_iso_packets = num_iso_packets
        transfer.user_data = user_data
        transfer.callback = callback
    lib.libusb_fill_iso_transfer = libusb_fill_iso_transfer

    # uint8_t libusb_get_bus_number(libusb_device *dev)
    lib.libusb_get_bus_number.argtypes = [c_void_p]
    lib.libusb_get_bus_number.restype = c_uint8

    # uint8_t libusb_get_device_address(libusb_device *dev)
    lib.libusb_get_device_address.argtypes = [c_void_p]
    lib.libusb_get_device_address.restype = c_uint8

    try:
        # uint8_t libusb_get_port_number(libusb_device *dev)
        lib.libusb_get_port_number.argtypes = [c_void_p]
        lib.libusb_get_port_number.restype = c_uint8
    except AttributeError:
        pass

    #int libusb_handle_events(libusb_context *ctx);
    lib.libusb_handle_events.argtypes = [c_void_p]

def _strerror(errcode):
    return _lib.libusb_strerror(errcode).decode('utf8')

# check a libusb function call
def _check(ret):
    if hasattr(ret, 'value'):
        ret = ret.value

    if ret < 0:
        if ret == LIBUSB_ERROR_NOT_SUPPORTED:
            raise NotImplementedError(_strerror(ret))
        else:
            raise USBError(_strerror(ret), ret, _libusb_errno[ret])

    return ret

# wrap a device
class _Device(object):
    def __init__(self, devid):
        self.devid = _lib.libusb_ref_device(devid)
    def __del__(self):
        _lib.libusb_unref_device(self.devid)

# wrap a descriptor and keep a reference to another object
# Thanks to Thomas Reitmayr.
class _WrapDescriptor(object):
    def __init__(self, desc, obj = None):
        self.obj = obj
        self.desc = desc
    def __getattr__(self, name):
        return getattr(self.desc, name)

# wrap a configuration descriptor
class _ConfigDescriptor(object):
    def __init__(self, desc):
        self.desc = desc
    def __del__(self):
        _lib.libusb_free_config_descriptor(self.desc)
    def __getattr__(self, name):
        return getattr(self.desc.contents, name)


# iterator for libusb devices
class _DevIterator(object):
    def __init__(self, ctx):
        self.dev_list = POINTER(c_void_p)()
        self.num_devs = _check(_lib.libusb_get_device_list(
                                    ctx,
                                    byref(self.dev_list))
                                )
    def __iter__(self):
        for i in range(self.num_devs):
            yield _Device(self.dev_list[i])
    def __del__(self):
        _lib.libusb_free_device_list(self.dev_list, 1)

class _DeviceHandle(object):
    def __init__(self, dev):
        self.handle = _libusb_device_handle()
        self.devid = dev.devid
        _check(_lib.libusb_open(self.devid, byref(self.handle)))

class _IsoTransferHandler(object):
    def __init__(self, dev_handle, ep, buff, timeout):
        address, length = buff.buffer_info()

        packet_length = _lib.libusb_get_max_iso_packet_size(dev_handle.devid, ep)
        packet_count = int(math.ceil(float(length) / packet_length))

        self.transfer = _lib.libusb_alloc_transfer(packet_count)

        _lib.libusb_fill_iso_transfer(self.transfer,
                                      dev_handle.handle,
                                      ep,
                                      cast(address, POINTER(c_ubyte)),
                                      length,
                                      packet_count,
                                      _libusb_transfer_cb_fn_p(self.__callback),
                                      None,
                                      timeout)

        self.__set_packets_length(length, packet_length)

    def __del__(self):
        _lib.libusb_free_transfer(self.transfer)

    def submit(self, ctx = None):
        self.__callback_done = 0
        _check(_lib.libusb_submit_transfer(self.transfer))

        while not self.__callback_done:
            _check(_lib.libusb_handle_events(ctx))

        return self.__compute_size_transf_data()

    def __compute_size_transf_data(self):
        return sum([t.actual_length for t in
                    _get_iso_packet_list(self.transfer.contents)])

    def __set_packets_length(self, n, packet_length):
        _lib.libusb_set_iso_packet_lengths(self.transfer, packet_length)
        r = n % packet_length
        if r:
            iso_packets = _get_iso_packet_list(self.transfer.contents)
            iso_packets[-1].length = r

    def __callback(self, transfer):
        if transfer.contents.status == LIBUSB_TRANSFER_COMPLETED:
            self.__callback_done = 1
        else:
            status = int(transfer.contents.status)
            raise usb.USBError(_str_transfer_error[status],
                               status,
                               _transfer_errno[status])

# implementation of libusb 1.0 backend
class _LibUSB(usb.backend.IBackend):
    @methodtrace(_logger)
    def __init__(self, lib):
        usb.backend.IBackend.__init__(self)
        self.lib = lib
        self.ctx = c_void_p()
        _check(self.lib.libusb_init(byref(self.ctx)))

    @methodtrace(_logger)
    def __del__(self):
        self.lib.libusb_exit(self.ctx)


    @methodtrace(_logger)
    def enumerate_devices(self):
        return _DevIterator(self.ctx)

    @methodtrace(_logger)
    def get_device_descriptor(self, dev):
        dev_desc = _libusb_device_descriptor()
        _check(self.lib.libusb_get_device_descriptor(dev.devid, byref(dev_desc)))
        dev_desc.bus = self.lib.libusb_get_bus_number(dev.devid)
        dev_desc.address = self.lib.libusb_get_device_address(dev.devid)

        # Only available in newer versions of libusb
        try:
            dev_desc.port_number = self.lib.libusb_get_port_number(dev.devid)
        except AttributeError:
            dev_desc.port_number = None

        return dev_desc

    @methodtrace(_logger)
    def get_configuration_descriptor(self, dev, config):
        cfg = POINTER(_libusb_config_descriptor)()
        _check(self.lib.libusb_get_config_descriptor(
                dev.devid,
                config, byref(cfg)))
        config_desc = _ConfigDescriptor(cfg)
        config_desc.extra_descriptors = (
                config_desc.extra[:config_desc.extra_length])
        return config_desc

    @methodtrace(_logger)
    def get_interface_descriptor(self, dev, intf, alt, config):
        cfg = self.get_configuration_descriptor(dev, config)
        if intf >= cfg.bNumInterfaces:
            raise IndexError('Invalid interface index ' + str(intf))
        i = cfg.interface[intf]
        if alt >= i.num_altsetting:
            raise IndexError('Invalid alternate setting index ' + str(alt))
        intf_desc = i.altsetting[alt]
        intf_desc.extra_descriptors = intf_desc.extra[:intf_desc.extra_length]
        return _WrapDescriptor(intf_desc, cfg)

    @methodtrace(_logger)
    def get_endpoint_descriptor(self, dev, ep, intf, alt, config):
        i = self.get_interface_descriptor(dev, intf, alt, config)
        if ep > i.bNumEndpoints:
            raise IndexError('Invalid endpoint index ' + str(ep))
        ep_desc = i.endpoint[ep]
        ep_desc.extra_descriptors = ep_desc.extra[:ep_desc.extra_length]
        return _WrapDescriptor(ep_desc, i)

    @methodtrace(_logger)
    def open_device(self, dev):
        return _DeviceHandle(dev)

    @methodtrace(_logger)
    def close_device(self, dev_handle):
        self.lib.libusb_close(dev_handle.handle)

    @methodtrace(_logger)
    def set_configuration(self, dev_handle, config_value):
        _check(self.lib.libusb_set_configuration(dev_handle.handle, config_value))

    @methodtrace(_logger)
    def get_configuration(self, dev_handle):
        config = c_int()
        _check(self.lib.libusb_get_configuration(dev_handle.handle, byref(config)))
        return config.value

    @methodtrace(_logger)
    def set_interface_altsetting(self, dev_handle, intf, altsetting):
        _check(self.lib.libusb_set_interface_alt_setting(
                                dev_handle.handle,
                                intf,
                                altsetting))

    @methodtrace(_logger)
    def claim_interface(self, dev_handle, intf):
        _check(self.lib.libusb_claim_interface(dev_handle.handle, intf))

    @methodtrace(_logger)
    def release_interface(self, dev_handle, intf):
        _check(self.lib.libusb_release_interface(dev_handle.handle, intf))

    @methodtrace(_logger)
    def bulk_write(self, dev_handle, ep, intf, data, timeout):
        return self.__write(self.lib.libusb_bulk_transfer,
                            dev_handle,
                            ep,
                            intf,
                            data,
                            timeout)

    @methodtrace(_logger)
    def bulk_read(self, dev_handle, ep, intf, buff, timeout):
        return self.__read(self.lib.libusb_bulk_transfer,
                           dev_handle,
                           ep,
                           intf,
                           buff,
                           timeout)

    @methodtrace(_logger)
    def intr_write(self, dev_handle, ep, intf, data, timeout):
        return self.__write(self.lib.libusb_interrupt_transfer,
                            dev_handle,
                            ep,
                            intf,
                            data,
                            timeout)

    @methodtrace(_logger)
    def intr_read(self, dev_handle, ep, intf, buff, timeout):
        return self.__read(self.lib.libusb_interrupt_transfer,
                           dev_handle,
                           ep,
                           intf,
                           buff,
                           timeout)

    @methodtrace(_logger)
    def iso_write(self, dev_handle, ep, intf, data, timeout):
        handler = _IsoTransferHandler(dev_handle, ep, data, timeout)
        return handler.submit(self.ctx)

    @methodtrace(_logger)
    def iso_read(self, dev_handle, ep, intf, buff, timeout):
        handler = _IsoTransferHandler(dev_handle, ep, buff, timeout)
        return handler.submit(self.ctx)

    @methodtrace(_logger)
    def ctrl_transfer(self,
                      dev_handle,
                      bmRequestType,
                      bRequest,
                      wValue,
                      wIndex,
                      data,
                      timeout):
        addr, length = data.buffer_info()
        length *= data.itemsize

        ret = _check(self.lib.libusb_control_transfer(
                                        dev_handle.handle,
                                        bmRequestType,
                                        bRequest,
                                        wValue,
                                        wIndex,
                                        cast(addr, POINTER(c_ubyte)),
                                        length,
                                        timeout))

        return ret

    @methodtrace(_logger)
    def clear_halt(self, dev_handle, ep):
        _check(self.lib.libusb_clear_halt(dev_handle.handle, ep))

    @methodtrace(_logger)
    def reset_device(self, dev_handle):
        _check(self.lib.libusb_reset_device(dev_handle.handle))

    @methodtrace(_logger)
    def is_kernel_driver_active(self, dev_handle, intf):
        return bool(_check(self.lib.libusb_kernel_driver_active(dev_handle.handle,
                        intf)))

    @methodtrace(_logger)
    def detach_kernel_driver(self, dev_handle, intf):
        _check(self.lib.libusb_detach_kernel_driver(dev_handle.handle, intf))

    @methodtrace(_logger)
    def attach_kernel_driver(self, dev_handle, intf):
        _check(self.lib.libusb_attach_kernel_driver(dev_handle.handle, intf))

    def __write(self, fn, dev_handle, ep, intf, data, timeout):
        address, length = data.buffer_info()
        length *= data.itemsize
        transferred = c_int()
        retval = fn(dev_handle.handle,
                  ep,
                  cast(address, POINTER(c_ubyte)),
                  length,
                  byref(transferred),
                  timeout)
        # do not assume LIBUSB_ERROR_TIMEOUT means no I/O.
        if not (transferred.value and retval == LIBUSB_ERROR_TIMEOUT):
            _check(retval)

        return transferred.value

    def __read(self, fn, dev_handle, ep, intf, buff, timeout):
        address, length = buff.buffer_info()
        length *= buff.itemsize
        transferred = c_int()
        retval = fn(dev_handle.handle,
                  ep,
                  cast(address, POINTER(c_ubyte)),
                  length,
                  byref(transferred),
                  timeout)
        # do not assume LIBUSB_ERROR_TIMEOUT means no I/O.
        if not (transferred.value and retval == LIBUSB_ERROR_TIMEOUT):
            _check(retval)
        return transferred.value

def get_backend(find_library=None):
    global _lib
    try:
        if _lib is None:
            _lib = _load_library(find_library=find_library)
            _setup_prototypes(_lib)
        return _LibUSB(_lib)
    except usb.libloader.LibaryException:
        # exception already logged (if any)
        _logger.error('Error loading libusb 1.0 backend', exc_info=False)
        return None
    except Exception:
        _logger.error('Error loading libusb 1.0 backend', exc_info=True)
        return None

Anon7 - 2022
AnonSec Team