Server IP : 127.0.0.2 / Your IP : 3.144.103.205 Web Server : Apache/2.4.18 (Ubuntu) System : User : www-data ( ) PHP Version : 7.0.33-0ubuntu0.16.04.16 Disable Function : disk_free_space,disk_total_space,diskfreespace,dl,exec,fpaththru,getmyuid,getmypid,highlight_file,ignore_user_abord,leak,listen,link,opcache_get_configuration,opcache_get_status,passthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,php_uname,phpinfo,posix_ctermid,posix_getcwd,posix_getegid,posix_geteuid,posix_getgid,posix_getgrgid,posix_getgrnam,posix_getgroups,posix_getlogin,posix_getpgid,posix_getpgrp,posix_getpid,posix,_getppid,posix_getpwnam,posix_getpwuid,posix_getrlimit,posix_getsid,posix_getuid,posix_isatty,posix_kill,posix_mkfifo,posix_setegid,posix_seteuid,posix_setgid,posix_setpgid,posix_setsid,posix_setuid,posix_times,posix_ttyname,posix_uname,pclose,popen,proc_open,proc_close,proc_get_status,proc_nice,proc_terminate,shell_exec,source,show_source,system,virtual MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/python2.7/dist-packages/usb/backend/ |
Upload File : |
# Copyright (C) 2009-2014 Wander Lairson Costa # # The following terms apply to all files associated # with the software unless explicitly disclaimed in individual files. # # The authors hereby grant permission to use, copy, modify, distribute, # and license this software and its documentation for any purpose, provided # that existing copyright notices are retained in all copies and that this # notice is included verbatim in any distributions. No written agreement, # license, or royalty fee is required for any of the authorized uses. # Modifications to this software may be copyrighted by their authors # and need not follow the licensing terms described here, provided that # the new terms are clearly indicated on the first page of each file where # they apply. # # IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY # FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES # ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY # DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, # INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE # IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE # NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR # MODIFICATIONS. from ctypes import * import os import usb.backend import usb.util import sys from usb.core import USBError from usb._debug import methodtrace import usb._interop as _interop import logging import usb.libloader __author__ = 'Wander Lairson Costa' __all__ = ['get_backend'] _logger = logging.getLogger('usb.backend.libusb0') # usb.h _PC_PATH_MAX = 4 if sys.platform.find('bsd') != -1 or sys.platform.find('mac') != -1 or \ sys.platform.find('darwin') != -1: _PATH_MAX = 1024 elif sys.platform == 'win32' or sys.platform == 'cygwin': _PATH_MAX = 511 else: _PATH_MAX = os.pathconf('.', _PC_PATH_MAX) # libusb-win32 makes all structures packed, while # default libusb only does for some structures # _PackPolicy defines the structure packing according # to the platform. class _PackPolicy(object): pass if sys.platform == 'win32' or sys.platform == 'cygwin': _PackPolicy._pack_ = 1 # Data structures class _usb_descriptor_header(Structure): _pack_ = 1 _fields_ = [('blength', c_uint8), ('bDescriptorType', c_uint8)] class _usb_string_descriptor(Structure): _pack_ = 1 _fields_ = [('bLength', c_uint8), ('bDescriptorType', c_uint8), ('wData', c_uint16)] class _usb_endpoint_descriptor(Structure, _PackPolicy): _fields_ = [('bLength', c_uint8), ('bDescriptorType', c_uint8), ('bEndpointAddress', c_uint8), ('bmAttributes', c_uint8), ('wMaxPacketSize', c_uint16), ('bInterval', c_uint8), ('bRefresh', c_uint8), ('bSynchAddress', c_uint8), ('extra', POINTER(c_uint8)), ('extralen', c_int)] class _usb_interface_descriptor(Structure, _PackPolicy): _fields_ = [('bLength', c_uint8), ('bDescriptorType', c_uint8), ('bInterfaceNumber', c_uint8), ('bAlternateSetting', c_uint8), ('bNumEndpoints', c_uint8), ('bInterfaceClass', c_uint8), ('bInterfaceSubClass', c_uint8), ('bInterfaceProtocol', c_uint8), ('iInterface', c_uint8), ('endpoint', POINTER(_usb_endpoint_descriptor)), ('extra', POINTER(c_uint8)), ('extralen', c_int)] class _usb_interface(Structure, _PackPolicy): _fields_ = [('altsetting', POINTER(_usb_interface_descriptor)), ('num_altsetting', c_int)] class _usb_config_descriptor(Structure, _PackPolicy): _fields_ = [('bLength', c_uint8), ('bDescriptorType', c_uint8), ('wTotalLength', c_uint16), ('bNumInterfaces', c_uint8), ('bConfigurationValue', c_uint8), ('iConfiguration', c_uint8), ('bmAttributes', c_uint8), ('bMaxPower', c_uint8), ('interface', POINTER(_usb_interface)), ('extra', POINTER(c_uint8)), ('extralen', c_int)] class _usb_device_descriptor(Structure, _PackPolicy): _pack_ = 1 _fields_ = [('bLength', c_uint8), ('bDescriptorType', c_uint8), ('bcdUSB', c_uint16), ('bDeviceClass', c_uint8), ('bDeviceSubClass', c_uint8), ('bDeviceProtocol', c_uint8), ('bMaxPacketSize0', c_uint8), ('idVendor', c_uint16), ('idProduct', c_uint16), ('bcdDevice', c_uint16), ('iManufacturer', c_uint8), ('iProduct', c_uint8), ('iSerialNumber', c_uint8), ('bNumConfigurations', c_uint8)] class _usb_device(Structure, _PackPolicy): pass class _usb_bus(Structure, _PackPolicy): pass _usb_device._fields_ = [('next', POINTER(_usb_device)), ('prev', POINTER(_usb_device)), ('filename', c_int8 * (_PATH_MAX + 1)), ('bus', POINTER(_usb_bus)), ('descriptor', _usb_device_descriptor), ('config', POINTER(_usb_config_descriptor)), ('dev', c_void_p), ('devnum', c_uint8), ('num_children', c_ubyte), ('children', POINTER(POINTER(_usb_device)))] _usb_bus._fields_ = [('next', POINTER(_usb_bus)), ('prev', POINTER(_usb_bus)), ('dirname', c_char * (_PATH_MAX + 1)), ('devices', POINTER(_usb_device)), ('location', c_uint32), ('root_dev', POINTER(_usb_device))] _usb_dev_handle = c_void_p class _DeviceDescriptor: def __init__(self, dev): desc = dev.descriptor self.bLength = desc.bLength self.bDescriptorType = desc.bDescriptorType self.bcdUSB = desc.bcdUSB self.bDeviceClass = desc.bDeviceClass self.bDeviceSubClass = desc.bDeviceSubClass self.bDeviceProtocol = desc.bDeviceProtocol self.bMaxPacketSize0 = desc.bMaxPacketSize0 self.idVendor = desc.idVendor self.idProduct = desc.idProduct self.bcdDevice = desc.bcdDevice self.iManufacturer = desc.iManufacturer self.iProduct = desc.iProduct self.iSerialNumber = desc.iSerialNumber self.bNumConfigurations = desc.bNumConfigurations self.address = dev.devnum self.bus = dev.bus[0].location self.port_number = None _lib = None def _load_library(find_library=None): return usb.libloader.load_locate_library( ('usb-0.1', 'usb', 'libusb0'), 'cygusb0.dll', 'Libusb 0', find_library=find_library ) def _setup_prototypes(lib): # usb_dev_handle *usb_open(struct usb_device *dev); lib.usb_open.argtypes = [POINTER(_usb_device)] lib.usb_open.restype = _usb_dev_handle # int usb_close(usb_dev_handle *dev); lib.usb_close.argtypes = [_usb_dev_handle] # int usb_get_string(usb_dev_handle *dev, # int index, # int langid, # char *buf, # size_t buflen); lib.usb_get_string.argtypes = [ _usb_dev_handle, c_int, c_int, c_char_p, c_size_t ] # int usb_get_string_simple(usb_dev_handle *dev, # int index, # char *buf, # size_t buflen); lib.usb_get_string_simple.argtypes = [ _usb_dev_handle, c_int, c_char_p, c_size_t ] # int usb_get_descriptor_by_endpoint(usb_dev_handle *udev, # int ep, # unsigned char type, # unsigned char index, # void *buf, # int size); lib.usb_get_descriptor_by_endpoint.argtypes = [ _usb_dev_handle, c_int, c_ubyte, c_ubyte, c_void_p, c_int ] # int usb_get_descriptor(usb_dev_handle *udev, # unsigned char type, # unsigned char index, # void *buf, # int size); lib.usb_get_descriptor.argtypes = [ _usb_dev_handle, c_ubyte, c_ubyte, c_void_p, c_int ] # int usb_bulk_write(usb_dev_handle *dev, # int ep, # const char *bytes, # int size, # int timeout); lib.usb_bulk_write.argtypes = [ _usb_dev_handle, c_int, c_char_p, c_int, c_int ] # int usb_bulk_read(usb_dev_handle *dev, # int ep, # char *bytes, # int size, # int timeout); lib.usb_bulk_read.argtypes = [ _usb_dev_handle, c_int, c_char_p, c_int, c_int ] # int usb_interrupt_write(usb_dev_handle *dev, # int ep, # const char *bytes, # int size, # int timeout); lib.usb_interrupt_write.argtypes = [ _usb_dev_handle, c_int, c_char_p, c_int, c_int ] # int usb_interrupt_read(usb_dev_handle *dev, # int ep, # char *bytes, # int size, # int timeout); lib.usb_interrupt_read.argtypes = [ _usb_dev_handle, c_int, c_char_p, c_int, c_int ] # int usb_control_msg(usb_dev_handle *dev, # int requesttype, # int request, # int value, # int index, # char *bytes, # int size, # int timeout); lib.usb_control_msg.argtypes = [ _usb_dev_handle, c_int, c_int, c_int, c_int, c_char_p, c_int, c_int ] # int usb_set_configuration(usb_dev_handle *dev, int configuration); lib.usb_set_configuration.argtypes = [_usb_dev_handle, c_int] # int usb_claim_interface(usb_dev_handle *dev, int interface); lib.usb_claim_interface.argtypes = [_usb_dev_handle, c_int] # int usb_release_interface(usb_dev_handle *dev, int interface); lib.usb_release_interface.argtypes = [_usb_dev_handle, c_int] # int usb_set_altinterface(usb_dev_handle *dev, int alternate); lib.usb_set_altinterface.argtypes = [_usb_dev_handle, c_int] # int usb_resetep(usb_dev_handle *dev, unsigned int ep); lib.usb_resetep.argtypes = [_usb_dev_handle, c_int] # int usb_clear_halt(usb_dev_handle *dev, unsigned int ep); lib.usb_clear_halt.argtypes = [_usb_dev_handle, c_int] # int usb_reset(usb_dev_handle *dev); lib.usb_reset.argtypes = [_usb_dev_handle] # char *usb_strerror(void); lib.usb_strerror.argtypes = [] lib.usb_strerror.restype = c_char_p # void usb_set_debug(int level); lib.usb_set_debug.argtypes = [c_int] # struct usb_device *usb_device(usb_dev_handle *dev); lib.usb_device.argtypes = [_usb_dev_handle] lib.usb_device.restype = POINTER(_usb_device) # struct usb_bus *usb_get_busses(void); lib.usb_get_busses.restype = POINTER(_usb_bus) def _check(ret): if ret is None: errmsg = _lib.usb_strerror() else: if hasattr(ret, 'value'): ret = ret.value if ret < 0: errmsg = _lib.usb_strerror() # No error means that we need to get the error # message from the return code # Thanks to Nicholas Wheeler to point out the problem... # Also see issue #2860940 if errmsg.lower() == 'no error': errmsg = os.strerror(-ret) else: return ret raise USBError(errmsg, ret) # implementation of libusb 0.1.x backend class _LibUSB(usb.backend.IBackend): @methodtrace(_logger) def enumerate_devices(self): _check(_lib.usb_find_busses()) _check(_lib.usb_find_devices()) bus = _lib.usb_get_busses() while bool(bus): dev = bus[0].devices while bool(dev): yield dev[0] dev = dev[0].next bus = bus[0].next @methodtrace(_logger) def get_device_descriptor(self, dev): return _DeviceDescriptor(dev) @methodtrace(_logger) def get_configuration_descriptor(self, dev, config): if config >= dev.descriptor.bNumConfigurations: raise IndexError('Invalid configuration index ' + str(config)) config_desc = dev.config[config] config_desc.extra_descriptors = config_desc.extra[:config_desc.extralen] return config_desc @methodtrace(_logger) def get_interface_descriptor(self, dev, intf, alt, config): cfgdesc = self.get_configuration_descriptor(dev, config) if intf >= cfgdesc.bNumInterfaces: raise IndexError('Invalid interface index ' + str(interface)) interface = cfgdesc.interface[intf] if alt >= interface.num_altsetting: raise IndexError('Invalid alternate setting index ' + str(alt)) intf_desc = interface.altsetting[alt] intf_desc.extra_descriptors = intf_desc.extra[:intf_desc.extralen] return intf_desc @methodtrace(_logger) def get_endpoint_descriptor(self, dev, ep, intf, alt, config): interface = self.get_interface_descriptor(dev, intf, alt, config) if ep >= interface.bNumEndpoints: raise IndexError('Invalid endpoint index ' + str(ep)) ep_desc = interface.endpoint[ep] ep_desc.extra_descriptors = ep_desc.extra[:ep_desc.extralen] return ep_desc @methodtrace(_logger) def open_device(self, dev): return _check(_lib.usb_open(dev)) @methodtrace(_logger) def close_device(self, dev_handle): _check(_lib.usb_close(dev_handle)) @methodtrace(_logger) def set_configuration(self, dev_handle, config_value): _check(_lib.usb_set_configuration(dev_handle, config_value)) @methodtrace(_logger) def set_interface_altsetting(self, dev_handle, intf, altsetting): _check(_lib.usb_set_altinterface(dev_handle, altsetting)) @methodtrace(_logger) def get_configuration(self, dev_handle): bmRequestType = usb.util.build_request_type( usb.util.CTRL_IN, usb.util.CTRL_TYPE_STANDARD, usb.util.CTRL_RECIPIENT_DEVICE ) buff = usb.util.create_buffer(1) ret = self.ctrl_transfer( dev_handle, bmRequestType, 0x08, 0, 0, buff, 100) assert ret == 1 return buff[0] @methodtrace(_logger) def claim_interface(self, dev_handle, intf): _check(_lib.usb_claim_interface(dev_handle, intf)) @methodtrace(_logger) def release_interface(self, dev_handle, intf): _check(_lib.usb_release_interface(dev_handle, intf)) @methodtrace(_logger) def bulk_write(self, dev_handle, ep, intf, data, timeout): return self.__write(_lib.usb_bulk_write, dev_handle, ep, intf, data, timeout) @methodtrace(_logger) def bulk_read(self, dev_handle, ep, intf, buff, timeout): return self.__read(_lib.usb_bulk_read, dev_handle, ep, intf, buff, timeout) @methodtrace(_logger) def intr_write(self, dev_handle, ep, intf, data, timeout): return self.__write(_lib.usb_interrupt_write, dev_handle, ep, intf, data, timeout) @methodtrace(_logger) def intr_read(self, dev_handle, ep, intf, buff, timeout): return self.__read(_lib.usb_interrupt_read, dev_handle, ep, intf, buff, timeout) @methodtrace(_logger) def ctrl_transfer(self, dev_handle, bmRequestType, bRequest, wValue, wIndex, data, timeout): address, length = data.buffer_info() length *= data.itemsize return _check(_lib.usb_control_msg( dev_handle, bmRequestType, bRequest, wValue, wIndex, cast(address, c_char_p), length, timeout )) @methodtrace(_logger) def clear_halt(self, dev_handle, ep): _check(_lib.usb_clear_halt(dev_handle, ep)) @methodtrace(_logger) def reset_device(self, dev_handle): _check(_lib.usb_reset(dev_handle)) @methodtrace(_logger) def detach_kernel_driver(self, dev_handle, intf): _check(_lib.usb_detach_kernel_driver_np(dev_handle, intf)) def __write(self, fn, dev_handle, ep, intf, data, timeout): address, length = data.buffer_info() length *= data.itemsize return int(_check(fn( dev_handle, ep, cast(address, c_char_p), length, timeout ))) def __read(self, fn, dev_handle, ep, intf, buff, timeout): address, length = buff.buffer_info() length *= buff.itemsize ret = int(_check(fn( dev_handle, ep, cast(address, c_char_p), length, timeout ))) return ret def get_backend(find_library=None): global _lib try: if _lib is None: _lib = _load_library(find_library) _setup_prototypes(_lib) _lib.usb_init() return _LibUSB() except usb.libloader.LibaryException: # exception already logged (if any) _logger.error('Error loading libusb 0.1 backend', exc_info=False) return None except Exception: _logger.error('Error loading libusb 0.1 backend', exc_info=True) return None