# Copyright 2014 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """USB echo gadget module. This gadget has pairs of IN/OUT endpoints that echo packets back to the host. """ import uuid import composite_gadget import usb_constants import usb_descriptors class EchoCompositeFeature(composite_gadget.CompositeFeature): """Composite device feature that echos data back to the host. """ def __init__(self, endpoints): """Create an echo gadget. """ fs_interfaces = [] hs_interfaces = [] if len(endpoints) >= 1: iface_num, iface_string, in_endpoint, out_endpoint = endpoints[0] fs_intr_interface_desc = usb_descriptors.InterfaceDescriptor( bInterfaceNumber=iface_num, bInterfaceClass=usb_constants.DeviceClass.VENDOR, bInterfaceSubClass=0, bInterfaceProtocol=0, iInterface=iface_string, ) fs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=out_endpoint, bmAttributes=usb_constants.TransferType.INTERRUPT, wMaxPacketSize=64, bInterval=1 # 1ms )) fs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=in_endpoint, bmAttributes=usb_constants.TransferType.INTERRUPT, wMaxPacketSize=64, bInterval=1 # 1ms )) fs_interfaces.append(fs_intr_interface_desc) hs_intr_interface_desc = usb_descriptors.InterfaceDescriptor( bInterfaceNumber=iface_num, bInterfaceClass=usb_constants.DeviceClass.VENDOR, bInterfaceSubClass=0, bInterfaceProtocol=0, iInterface=iface_string ) hs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=out_endpoint, bmAttributes=usb_constants.TransferType.INTERRUPT, wMaxPacketSize=64, bInterval=4 # 1ms )) hs_intr_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=in_endpoint, bmAttributes=usb_constants.TransferType.INTERRUPT, wMaxPacketSize=64, bInterval=4 # 1ms )) hs_interfaces.append(hs_intr_interface_desc) if len(endpoints) >= 2: iface_num, iface_string, in_endpoint, out_endpoint = endpoints[1] fs_bulk_interface_desc = usb_descriptors.InterfaceDescriptor( bInterfaceNumber=iface_num, bInterfaceClass=usb_constants.DeviceClass.VENDOR, bInterfaceSubClass=0, bInterfaceProtocol=0, iInterface=iface_string ) fs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=out_endpoint, bmAttributes=usb_constants.TransferType.BULK, wMaxPacketSize=64, bInterval=0 )) fs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=in_endpoint, bmAttributes=usb_constants.TransferType.BULK, wMaxPacketSize=64, bInterval=0 )) fs_interfaces.append(fs_bulk_interface_desc) hs_bulk_interface_desc = usb_descriptors.InterfaceDescriptor( bInterfaceNumber=iface_num, bInterfaceClass=usb_constants.DeviceClass.VENDOR, bInterfaceSubClass=0, bInterfaceProtocol=0, iInterface=iface_string ) hs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=out_endpoint, bmAttributes=usb_constants.TransferType.BULK, wMaxPacketSize=512, bInterval=0 )) hs_bulk_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=in_endpoint, bmAttributes=usb_constants.TransferType.BULK, wMaxPacketSize=512, bInterval=0 )) hs_interfaces.append(hs_bulk_interface_desc) if len(endpoints) >= 3: iface_num, iface_string, in_endpoint, out_endpoint = endpoints[2] fs_interfaces.append(usb_descriptors.InterfaceDescriptor( bInterfaceNumber=iface_num, bInterfaceClass=usb_constants.DeviceClass.VENDOR, bInterfaceSubClass=0, bInterfaceProtocol=0, iInterface=iface_string )) fs_isoc_interface_desc = usb_descriptors.InterfaceDescriptor( bInterfaceNumber=iface_num, bAlternateSetting=1, bInterfaceClass=usb_constants.DeviceClass.VENDOR, bInterfaceSubClass=0, bInterfaceProtocol=0, iInterface=iface_string ) fs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=out_endpoint, bmAttributes=usb_constants.TransferType.ISOCHRONOUS, wMaxPacketSize=1023, bInterval=1 # 1ms )) fs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=in_endpoint, bmAttributes=usb_constants.TransferType.ISOCHRONOUS, wMaxPacketSize=1023, bInterval=1 # 1ms )) fs_interfaces.append(fs_isoc_interface_desc) hs_interfaces.append(usb_descriptors.InterfaceDescriptor( bInterfaceNumber=iface_num, bInterfaceClass=usb_constants.DeviceClass.VENDOR, bInterfaceSubClass=0, bInterfaceProtocol=0, iInterface=iface_string )) hs_isoc_interface_desc = usb_descriptors.InterfaceDescriptor( bInterfaceNumber=iface_num, bAlternateSetting=1, bInterfaceClass=usb_constants.DeviceClass.VENDOR, bInterfaceSubClass=0, bInterfaceProtocol=0, iInterface=iface_string ) hs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=out_endpoint, bmAttributes=usb_constants.TransferType.ISOCHRONOUS, wMaxPacketSize=512, bInterval=4 # 1ms )) hs_isoc_interface_desc.AddEndpoint(usb_descriptors.EndpointDescriptor( bEndpointAddress=in_endpoint, bmAttributes=usb_constants.TransferType.ISOCHRONOUS, wMaxPacketSize=512, bInterval=4 # 1ms )) hs_interfaces.append(hs_isoc_interface_desc) super(EchoCompositeFeature, self).__init__(fs_interfaces, hs_interfaces) def ReceivePacket(self, endpoint, data): """Echo a packet back to the host. Args: endpoint: Incoming endpoint (must be an OUT pipe). data: Packet data. """ assert endpoint & usb_constants.Dir.IN == 0 self.SendPacket(endpoint | usb_constants.Dir.IN, data) class EchoGadget(composite_gadget.CompositeGadget): """Echo gadget. """ def __init__(self): """Create an echo gadget. """ device_desc = usb_descriptors.DeviceDescriptor( idVendor=usb_constants.VendorID.GOOGLE, idProduct=usb_constants.ProductID.GOOGLE_ECHO_GADGET, bcdUSB=0x0200, iManufacturer=1, iProduct=2, iSerialNumber=3, bcdDevice=0x0100) feature = EchoCompositeFeature( endpoints=[(0, 4, 0x81, 0x01), (1, 5, 0x82, 0x02), (2, 6, 0x83, 0x03)]) super(EchoGadget, self).__init__(device_desc, [feature]) self.AddStringDescriptor(1, 'Google Inc.') self.AddStringDescriptor(2, 'Echo Gadget') self.AddStringDescriptor(3, '{:06X}'.format(uuid.getnode())) self.AddStringDescriptor(4, 'Interrupt Echo') self.AddStringDescriptor(5, 'Bulk Echo') self.AddStringDescriptor(6, 'Isochronous Echo') # Enable Microsoft OS Descriptors for Windows 8 and above. self.EnableMicrosoftOSDescriptorsV1(vendor_code=0x01) # These are used to force Windows to load WINUSB.SYS for the echo functions. self.SetMicrosoftCompatId(0, 'WINUSB') self.SetMicrosoftCompatId(1, 'WINUSB') self.SetMicrosoftCompatId(2, 'WINUSB') self.AddDeviceCapabilityDescriptor(usb_descriptors.ContainerIdDescriptor( ContainerID=uuid.uuid4().bytes_le)) def RegisterHandlers(): """Registers web request handlers with the application server.""" import server from tornado import web class WebConfigureHandler(web.RequestHandler): def post(self): server.SwitchGadget(EchoGadget()) server.app.add_handlers('.*$', [ (r'/echo/configure', WebConfigureHandler), ])