# Copyright 2014 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """WSGI application to manage a USB gadget. """ import datetime import hashlib import re import subprocess import sys import time import urllib2 from tornado import httpserver from tornado import ioloop from tornado import web import default_gadget VERSION_PATTERN = re.compile(r'.*usb_gadget-([a-z0-9]{32})\.zip') address = None chip = None claimed_by = None default = default_gadget.DefaultGadget() gadget = None hardware = None interface = None port = None def SwitchGadget(new_gadget): if chip.IsConfigured(): chip.Destroy() global gadget gadget = new_gadget gadget.AddStringDescriptor(3, address) chip.Create(gadget) class VersionHandler(web.RequestHandler): def get(self): version = 'unpackaged' for path in sys.path: match = VERSION_PATTERN.match(path) if match: version = match.group(1) break self.write(version) class UpdateHandler(web.RequestHandler): def post(self): fileinfo = self.request.files['file'][0] match = VERSION_PATTERN.match(fileinfo['filename']) if match is None: self.write('Filename must contain MD5 hash.') self.set_status(400) return content = fileinfo['body'] md5sum = hashlib.md5(content).hexdigest() if md5sum != match.group(1): self.write('File hash does not match.') self.set_status(400) return filename = 'usb_gadget-{}.zip'.format(md5sum) with open(filename, 'wb') as f: f.write(content) args = ['/usr/bin/python', filename, '--interface', interface, '--port', str(port), '--hardware', hardware] if claimed_by is not None: args.extend(['--start-claimed', claimed_by]) print 'Reloading with version {}...'.format(md5sum) global http_server if chip.IsConfigured(): chip.Destroy() http_server.stop() child = subprocess.Popen(args, close_fds=True) while True: child.poll() if child.returncode is not None: self.write('New package exited with error {}.' .format(child.returncode)) self.set_status(500) http_server = httpserver.HTTPServer(app) http_server.listen(port) SwitchGadget(gadget) return try: f = urllib2.urlopen('http://{}/version'.format(address)) if f.getcode() == 200: # Update complete, wait 1 second to make sure buffers are flushed. io_loop = ioloop.IOLoop.instance() io_loop.add_timeout(datetime.timedelta(seconds=1), io_loop.stop) return except urllib2.URLError: pass time.sleep(0.1) class ClaimHandler(web.RequestHandler): def post(self): global claimed_by if claimed_by is None: claimed_by = self.get_argument('session_id') else: self.write('Device is already claimed by "{}".'.format(claimed_by)) self.set_status(403) class UnclaimHandler(web.RequestHandler): def post(self): global claimed_by claimed_by = None if gadget != default: SwitchGadget(default) class UnconfigureHandler(web.RequestHandler): def post(self): SwitchGadget(default) class DisconnectHandler(web.RequestHandler): def post(self): if chip.IsConfigured(): chip.Destroy() class ReconnectHandler(web.RequestHandler): def post(self): if not chip.IsConfigured(): chip.Create(gadget) app = web.Application([ (r'/version', VersionHandler), (r'/update', UpdateHandler), (r'/claim', ClaimHandler), (r'/unclaim', UnclaimHandler), (r'/unconfigure', UnconfigureHandler), (r'/disconnect', DisconnectHandler), (r'/reconnect', ReconnectHandler), ]) http_server = httpserver.HTTPServer(app)