#!/usr/bin/env python # Copyright (c) 2012 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import commands import filecmp import logging import os import shutil import sys import tempfile import urllib import pyauto_functional # Must be imported before pyauto import pyauto import pyauto_utils import test_utils class DownloadsTest(pyauto.PyUITest): """TestCase for Downloads.""" def setUp(self): pyauto.PyUITest.setUp(self) # Record all entries in the download dir download_dir = self.GetDownloadDirectory().value() self._existing_downloads = [] if os.path.isdir(download_dir): self._existing_downloads += os.listdir(download_dir) self._files_to_remove = [] # Files to remove after browser shutdown def tearDown(self): # Cleanup all files we created in the download dir download_dir = self.GetDownloadDirectory().value() if os.path.isdir(download_dir): for name in os.listdir(download_dir): if name not in self._existing_downloads: self._files_to_remove.append(os.path.join(download_dir, name)) pyauto.PyUITest.tearDown(self) # Delete all paths marked for deletion after browser shutdown. for item in self._files_to_remove: pyauto_utils.RemovePath(item) def _DeleteAfterShutdown(self, path): """Delete |path| after browser has been shut down. This is so that all handles to the path would have been gone by then. Silently Ignores errors, when the path does not exist, or if the path could not be deleted. """ self._files_to_remove.append(path) def _ClearLocalDownloadState(self, path): """Prepare for downloading the given path. Clears the given path and the corresponding .crdownload, to prepare it to be downloaded. """ os.path.exists(path) and os.remove(path) crdownload = path + '.crdownload' os.path.exists(crdownload) and os.remove(crdownload) def _GetDangerousDownload(self): """Returns the file path for a dangerous download for this OS.""" sub_path = os.path.join(self.DataDir(), 'downloads', 'dangerous') if self.IsWin(): return os.path.join(sub_path, 'dangerous.com') return os.path.join(sub_path, 'dangerous.jar') def _EqualFileContents(self, file1, file2): """Determine if 2 given files have the same contents.""" if not (os.path.exists(file1) and os.path.exists(file2)): return False return filecmp.cmp(file1, file2, shallow=False) def _GetDownloadId(self, download_index=0): """Return the download id for the download at the given index. Args: download_index: The index of the download in the list of downloads. Default is 0. """ return self.GetDownloadsInfo().Downloads()[download_index]['id'] def _MakeFile(self, size): """Make a file on-the-fly with the given size. Note that it's really a 1byte file even though ls -lh will report it as of file |size| (du reports the correct usage on disk), but it's good enough for downloads tests because chrome will treat it as a file of size |size| when downloading. Returns: the path to the created file. """ fd, file_path = tempfile.mkstemp(suffix='.zip', prefix='file-downloads-') os.lseek(fd, size, 0) os.write(fd, 'a') os.close(fd) # Make it readable by chronos on chromeos os.chmod(file_path, 0755) logging.debug('Created temporary file %s of size %d' % (file_path, size)) self._DeleteAfterShutdown(file_path) return file_path def _GetAllDownloadIDs(self): """Return a list of all download ids.""" return [download['id'] for download in self.GetDownloadsInfo().Downloads()] def testNoDownloadWaitingNeeded(self): """Make sure "wait for downloads" returns quickly if we have none.""" self.WaitForAllDownloadsToComplete() def testZip(self): """Download a zip and verify that it downloaded correctly. Also verify that the download shelf showed up. """ test_dir = os.path.join(os.path.abspath(self.DataDir()), 'downloads') file_path = os.path.join(test_dir, 'a_zip_file.zip') file_url = self.GetFileURLForPath(file_path) downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), 'a_zip_file.zip') self._ClearLocalDownloadState(downloaded_pkg) self.DownloadAndWaitForStart(file_url) # Wait for the download to finish. self.WaitForAllDownloadsToComplete() # Verify that the download shelf is visible self.assertTrue(self.IsDownloadShelfVisible()) # Verify that the file was correctly downloaded self.assertTrue(os.path.exists(downloaded_pkg)) self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg)) def testZipInIncognito(self): """Download and verify a zip in incognito window.""" test_dir = os.path.join(os.path.abspath(self.DataDir()), 'downloads') file_path = os.path.join(test_dir, 'a_zip_file.zip') file_url = self.GetFileURLForPath(file_path) downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), 'a_zip_file.zip') self._ClearLocalDownloadState(downloaded_pkg) self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW) # Trigger download and wait in new incognito window. self.DownloadAndWaitForStart(file_url, windex=1) self.WaitForAllDownloadsToComplete(windex=1) incognito_downloads = self.GetDownloadsInfo(1).Downloads() # Verify that download info exists in the correct profile. self.assertEqual(len(incognito_downloads), 1) self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg), msg='%s (size %d) and %s (size %d) do not match' % ( file_path, os.path.getsize(file_path), downloaded_pkg, os.path.getsize(downloaded_pkg))) self.assertTrue(self.IsDownloadShelfVisible(1)) def testSaveDangerousFile(self): """Verify that we can download and save a dangerous file.""" file_path = self._GetDangerousDownload() downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), os.path.basename(file_path)) self._ClearLocalDownloadState(downloaded_pkg) self._TriggerUnsafeDownload(os.path.basename(file_path)) self.PerformActionOnDownload(self._GetDownloadId(), 'save_dangerous_download') self.WaitForAllDownloadsToComplete() # Verify that the file was downloaded. self.assertTrue(os.path.exists(downloaded_pkg)) self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg)) self._DeleteAfterShutdown(downloaded_pkg) def testDeclineDangerousDownload(self): """Verify that we can decline dangerous downloads""" file_path = self._GetDangerousDownload() downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), os.path.basename(file_path)) self._ClearLocalDownloadState(downloaded_pkg) self._TriggerUnsafeDownload(os.path.basename(file_path)) self.PerformActionOnDownload(self._GetDownloadId(), 'decline_dangerous_download') self.assertFalse(os.path.exists(downloaded_pkg)) self.assertFalse(self.GetDownloadsInfo().Downloads()) self.assertFalse(self.IsDownloadShelfVisible()) def testRemoveDownload(self): """Verify that we can remove a download.""" file_url = self.GetFileURLForDataPath('downloads', 'a_zip_file.zip') downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), 'a_zip_file.zip') self._ClearLocalDownloadState(downloaded_pkg) self.DownloadAndWaitForStart(file_url) self.WaitForAllDownloadsToComplete() self.PerformActionOnDownload(self._GetDownloadId(), 'remove') # The download is removed from downloads, but not from the disk. self.assertFalse(self.GetDownloadsInfo().Downloads()) self.assertTrue(os.path.exists(downloaded_pkg)) self._DeleteAfterShutdown(downloaded_pkg) def testBigZip(self): """Verify that we can download a 1GB file. This test needs 2 GB of free space, 1 GB for the original zip file and another for the downloaded one. Note: This test increases automation timeout to 4 min. Things might seem to hang. """ # Create a 1 GB file on the fly file_path = self._MakeFile(2**30) # Ensure there's sufficient space remaining to download file. free_space = test_utils.GetFreeSpace(self.GetDownloadDirectory().value()) assert free_space >= 2**30, \ 'Not enough disk space to download. Got %d free' % free_space file_url = self.GetFileURLForPath(file_path) downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), os.path.basename(file_path)) self._ClearLocalDownloadState(downloaded_pkg) self.DownloadAndWaitForStart(file_url) self._DeleteAfterShutdown(downloaded_pkg) self.WaitForAllDownloadsToComplete(timeout=10 * 60 * 1000); # Verify that the file was correctly downloaded self.assertTrue(os.path.exists(downloaded_pkg), 'Downloaded file %s missing.' % downloaded_pkg) self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg), 'Downloaded file %s does not match original' % downloaded_pkg) def testFileRenaming(self): """Test file renaming when downloading a already-existing filename.""" test_dir = os.path.join(os.path.abspath(self.DataDir()), 'downloads') file_url = 'file://%s' % os.path.join(test_dir, 'a_zip_file.zip') download_dir = self.GetDownloadDirectory().value() num_times = 5 assert num_times > 1, 'needs to be > 1 to work' renamed_files = [] for i in range(num_times): expected_filename = os.path.join(download_dir, 'a_zip_file.zip') if i > 0: # Files after first download are renamed. expected_filename = os.path.join(download_dir, 'a_zip_file (%d).zip' % i) renamed_files.append(expected_filename) self._ClearLocalDownloadState(expected_filename) self.DownloadAndWaitForStart(file_url) self.WaitForAllDownloadsToComplete() # Verify that all files exist and have the right name for filename in renamed_files: self.assertTrue(os.path.exists(filename)) self._DeleteAfterShutdown(filename) def testCrazyFilenames(self): """Test downloading with filenames containing special chars. The files are created on the fly and cleaned after use. """ download_dir = self.GetDownloadDirectory().value() filename = os.path.join(self.DataDir(), 'downloads', 'crazy_filenames.txt') crazy_filenames = self.EvalDataFrom(filename) logging.info('Testing with %d crazy filenames' % len(crazy_filenames)) def _CreateFile(name): """Create and fill the given file with some junk.""" fp = open(name, 'w') # name could be unicode print >>fp, 'This is a junk file named %s. ' % repr(name) * 100 fp.close() # Temp dir for hosting crazy filenames. temp_dir = tempfile.mkdtemp(prefix='download') self._DeleteAfterShutdown(unicode(temp_dir)) # Windows has a dual nature dealing with unicode filenames. # While the files are internally saved as unicode, there's a non-unicode # aware API that returns a locale-dependent coding on the true unicode # filenames. This messes up things. # Filesystem-interfacing functions like os.listdir() need to # be given unicode strings to "do the right thing" on win. # Ref: http://boodebr.org/main/python/all-about-python-and-unicode for filename in crazy_filenames: # filename is unicode. utf8_filename = filename.encode('utf-8') file_path = os.path.join(temp_dir, utf8_filename) _CreateFile(os.path.join(temp_dir, filename)) # unicode file. file_url = self.GetFileURLForPath(file_path) downloaded_file = os.path.join(download_dir, filename) self._ClearLocalDownloadState(downloaded_file) self.DownloadAndWaitForStart(file_url) self.WaitForAllDownloadsToComplete() # Verify downloads. downloads = self.GetDownloadsInfo().Downloads() self.assertEqual(len(downloads), len(crazy_filenames)) for filename in crazy_filenames: downloaded_file = os.path.join(download_dir, filename) self.assertTrue(os.path.exists(downloaded_file)) self.assertTrue( # Verify file contents. self._EqualFileContents(downloaded_file, os.path.join(temp_dir, filename))) os.path.exists(downloaded_file) and os.remove(downloaded_file) def _TriggerUnsafeDownload(self, filename, tab_index=0, windex=0): """Trigger download of an unsafe/dangerous filetype. Files explictly requested by the user (like navigating to a package, or clicking on a link) aren't marked unsafe. Only the ones where the user didn't directly initiate a download are marked unsafe. Navigates to download-dangerous.html which triggers the download. Waits until the download starts. Args: filename: the name of the file to trigger the download. This should exist in the 'dangerous' directory. tab_index: tab index. Default 0. windex: window index. Default 0. """ dangerous_dir = os.path.join( self.DataDir(), 'downloads', 'dangerous') assert os.path.isfile(os.path.join(dangerous_dir, filename)) file_url = self.GetFileURLForPath(os.path.join( dangerous_dir, 'download-dangerous.html')) + '?' + filename num_downloads = len(self.GetDownloadsInfo().Downloads()) self.NavigateToURL(file_url, windex, tab_index) # It might take a while for the download to kick in, hold on until then. self.assertTrue(self.WaitUntil( lambda: len(self.GetDownloadsInfo().Downloads()) == num_downloads + 1)) # Wait for Download Shelf to appear to reduce flakiness. self.assertTrue(self.WaitUntil(self.IsDownloadShelfVisible)) def testPauseAndResume(self): """Verify that pause and resume work while downloading a file. Note: This test increases automation timeout to 2 min. Things might seem to hang. """ # Create a 250 MB file on the fly file_path = self._MakeFile(2**28) # Ensure there's sufficient space remaining to download file. free_space = test_utils.GetFreeSpace(self.GetDownloadDirectory().value()) assert free_space >= 2**28, \ 'Not enough disk space to download. Got %d free' % free_space file_url = self.GetFileURLForPath(file_path) downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), os.path.basename(file_path)) self._ClearLocalDownloadState(downloaded_pkg) self.DownloadAndWaitForStart(file_url) self._DeleteAfterShutdown(downloaded_pkg) self._DeleteAfterShutdown(file_path) # Pause the download and assert that it is paused. pause_dict = self.PerformActionOnDownload(self._GetDownloadId(), 'toggle_pause') if pause_dict['state'] == 'COMPLETE': logging.info('The download completed before pause. Stopping test.') return self.assertTrue(pause_dict['is_paused']) self.assertTrue(pause_dict['state'] == 'IN_PROGRESS') # Resume the download and assert it is not paused. resume_dict = self.PerformActionOnDownload(self._GetDownloadId(), 'toggle_pause') self.assertFalse(resume_dict['is_paused']) self.WaitForAllDownloadsToComplete(timeout=10 * 60 * 1000); # Verify that the file was correctly downloaded after pause and resume. self.assertTrue(os.path.exists(downloaded_pkg), 'Downloaded file %s missing.' % downloaded_pkg) self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg), 'Downloaded file %s does not match original' % downloaded_pkg) def testCancelDownload(self): """Verify that we can cancel a download.""" # Create a big file (250 MB) on the fly, so that the download won't finish # before being cancelled. file_path = self._MakeFile(2**28) # Ensure there's sufficient space remaining to download file. free_space = test_utils.GetFreeSpace(self.GetDownloadDirectory().value()) assert free_space >= 2**28, \ 'Not enough disk space to download. Got %d free' % free_space file_url = self.GetFileURLForPath(file_path) downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), os.path.basename(file_path)) self._ClearLocalDownloadState(downloaded_pkg) self.DownloadAndWaitForStart(file_url) self.PerformActionOnDownload(self._GetDownloadId(), 'cancel') self._DeleteAfterShutdown(file_path) state = self.GetDownloadsInfo().Downloads()[0]['state'] if state == 'COMPLETE': logging.info('The download completed before cancel. Test stopped.') return # Verify the download has been cancelled. self.assertEqual('CANCELLED', self.GetDownloadsInfo().Downloads()[0]['state']) self.assertFalse(os.path.exists(downloaded_pkg)) def testDownloadsPersistence(self): """Verify that download history persists on session restart.""" test_dir = os.path.join(os.path.abspath(self.DataDir()), 'downloads') file_url = self.GetFileURLForPath(os.path.join(test_dir, 'a_zip_file.zip')) downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), 'a_zip_file.zip') self._ClearLocalDownloadState(downloaded_pkg) self.DownloadAndWaitForStart(file_url) self.WaitForAllDownloadsToComplete() downloads = self.GetDownloadsInfo().Downloads() self.assertEqual(1, len(downloads)) self.assertEqual('a_zip_file.zip', downloads[0]['file_name']) file_url = downloads[0]['url'] self.RestartBrowser(clear_profile=False) # Trigger the download service to get loaded after restart. self.NavigateToURL('chrome://downloads/') # Verify that there's no download shelf anymore. self.assertFalse(self.IsDownloadShelfVisible(), 'Download shelf persisted browser restart.') # Verify that the download history persists. downloads = self.GetDownloadsInfo().Downloads() self.assertEqual(1, len(downloads)) self.assertEqual('a_zip_file.zip', downloads[0]['file_name']) self.assertEqual(file_url, downloads[0]['url']) self._DeleteAfterShutdown(downloaded_pkg) def testExtendedAttributesOnMac(self): """Verify that Chrome sets the extended attributes on a file. This test is for mac only. """ if not self.IsMac(): logging.info('Skipping testExtendedAttributesOnMac on non-Mac') return downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), 'a_zip_file.zip') self._ClearLocalDownloadState(downloaded_pkg) file_url = 'http://src.chromium.org/viewvc/chrome/trunk/src/chrome/'\ 'test/data/downloads/a_zip_file.zip' self.DownloadAndWaitForStart(file_url) self.WaitForAllDownloadsToComplete() import xattr self.assertTrue('com.apple.quarantine' in xattr.listxattr(downloaded_pkg)) def testDownloadPercentage(self): """Verify that during downloading, % values increases, and once download is over, % value is 100""" file_path = self._MakeFile(2**24) # Ensure there's sufficient space remaining to download file. free_space = test_utils.GetFreeSpace(self.GetDownloadDirectory().value()) assert free_space >= 2**24, \ 'Not enough disk space to download. Got %d free' % free_space file_url = self.GetFileURLForPath(file_path) downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), os.path.basename(file_path)) os.path.exists(downloaded_pkg) and os.remove(downloaded_pkg) self.DownloadAndWaitForStart(file_url) downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(), os.path.basename(file_path)) downloads = self.GetDownloadsInfo().Downloads() old_percentage = downloads[0]['PercentComplete'] def _PercentInc(): percent = self.GetDownloadsInfo().Downloads()[0]['PercentComplete'] return old_percentage == 100 or percent > old_percentage, self.assertTrue(self.WaitUntil(_PercentInc), msg='Download percentage value is not increasing') # Once download is completed, percentage is 100. self.WaitForAllDownloadsToComplete() downloads = self.GetDownloadsInfo().Downloads() self.assertEqual(downloads[0]['PercentComplete'], 100, 'Download percentage should be 100 after download completed') os.path.exists(file_path) and os.remove(file_path) os.path.exists(downloaded_pkg) and os.remove(downloaded_pkg) def testDownloadIncognitoAndRegular(self): """Download the same zip file in regular and incognito window and verify that it downloaded correctly with same file name appended with counter for the second download in regular window. """ test_dir = os.path.join(os.path.abspath(self.DataDir()), 'downloads') file_path = os.path.join(test_dir, 'a_zip_file.zip') file_url = self.GetFileURLForPath(file_path) downloaded_pkg_regul = os.path.join(self.GetDownloadDirectory().value(), 'a_zip_file.zip') downloaded_pkg_incog = os.path.join(self.GetDownloadDirectory().value(), 'a_zip_file (1).zip') self._ClearLocalDownloadState(downloaded_pkg_regul) self._ClearLocalDownloadState(downloaded_pkg_incog) self.DownloadAndWaitForStart(file_url, 0) self.WaitForAllDownloadsToComplete(windex=0) self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW) self.DownloadAndWaitForStart(file_url, 1) self.WaitForAllDownloadsToComplete(windex=1) # Verify download in regular window. self.assertTrue(os.path.exists(downloaded_pkg_regul)) self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg_regul)) # Verify download in incognito window. # bug 69738 WaitForAllDownloadsToComplete is flaky for this test case. # Using extra WaitUntil until this is resolved. self.assertTrue(self.WaitUntil( lambda: os.path.exists(downloaded_pkg_incog))) self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg_incog)) if __name__ == '__main__': pyauto_functional.Main()