summaryrefslogtreecommitdiffstats
path: root/chrome/test/functional/downloads.py
blob: 715ee57f375377efade29f31bfc31f9851a0ae8a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
#!/usr/bin/python
# Copyright (c) 2010 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import filecmp
import logging
import os
import shutil
import sys
import tempfile
import time
import urllib

import pyauto_functional  # Must be imported before pyauto
import pyauto
import pyauto_utils


class DownloadsTest(pyauto.PyUITest):
  """TestCase for Downloads."""

  def setUp(self):
    pyauto.PyUITest.setUp(self)
    # Record all entries in the download dir
    download_dir = self.GetDownloadDirectory().value()
    self._existing_downloads = []
    if os.path.isdir(download_dir):
      self._existing_downloads += os.listdir(download_dir)
    self._files_to_remove = []  # Files to remove after browser shutdown

  def tearDown(self):
    # Cleanup all files we created in the download dir
    download_dir = self.GetDownloadDirectory().value()
    if os.path.isdir(download_dir):
      for name in os.listdir(download_dir):
        if name not in self._existing_downloads:
          self._files_to_remove.append(os.path.join(download_dir, name))
    pyauto.PyUITest.tearDown(self)
    # Delete all paths marked for deletion after browser shutdown.
    for item in self._files_to_remove:
      pyauto_utils.RemovePath(item)

  def _DeleteAfterShutdown(self, path):
    """Delete |path| after browser has been shut down.

    This is so that all handles to the path would have been gone by then.
    Silently Ignores errors, when the path does not exist, or if the path
    could not be deleted.
    """
    self._files_to_remove.append(path)

  def _ClearLocalDownloadState(self, path):
    """Prepare for downloading the given path.

    Clears the given path and the corresponding .crdownload, to prepare it to
    be downloaded.
    """
    os.path.exists(path) and os.remove(path)
    crdownload = path + '.crdownload'
    os.path.exists(crdownload) and os.remove(crdownload)

  def _GetDangerousDownload(self):
    """Returns the file path for a dangerous download for this OS."""
    sub_path = os.path.join(self.DataDir(), 'downloads', 'dangerous')
    if self.IsMac():
      return os.path.join(sub_path, 'invalid-dummy.dmg')
    return os.path.join(sub_path, 'dangerous.exe')

  def _EqualFileContents(self, file1, file2):
    """Determine if 2 given files have the same contents."""
    if not (os.path.exists(file1) and os.path.exists(file2)):
      return False
    return filecmp.cmp(file1, file2, shallow=False)

  def _GetDownloadId(self, download_index=0):
    """Return the download id for the download at the given index.

    Args:
      download_index: The index of the download in the list of downloads.
                      Default is 0.
    """
    return self.GetDownloadsInfo().Downloads()[download_index]['id']

  def _MakeFile(self, size):
    """Make a file on-the-fly with the given size. Returns the path to the
       file.
    """
    fd, file_path = tempfile.mkstemp(suffix='.zip', prefix='file-downloads-')
    os.lseek(fd, size, 0)
    os.write(fd, 'a')
    os.close(fd)
    logging.debug('Created temporary file %s of size %d' % (file_path, size))
    return file_path

  def _CallFunctionWithNewTimeout(self, new_timeout, function):
    """Sets the timeout to |new_timeout| and calls |function|.

    This method resets the timeout before returning.
    """
    timeout_changer = pyauto.PyUITest.CmdExecutionTimeoutChanger(
        self, new_timeout)
    logging.info('Automation execution timeout has been changed to %d. '
                 'If the timeout is large the test might appear to hang.'
                 % new_timeout)
    function()
    del timeout_changer

  def _GetAllDownloadIDs(self):
    """Return a list of all download ids."""
    return [download['id'] for download in self.GetDownloadsInfo().Downloads()]

  def testNoDownloadWaitingNeeded(self):
    """Make sure "wait for downloads" returns quickly if we have none."""
    self.WaitForAllDownloadsToComplete()

  def testZip(self):
    """Download a zip and verify that it downloaded correctly.
       Also verify that the download shelf showed up.
    """
    test_dir = os.path.join(os.path.abspath(self.DataDir()), 'downloads')
    file_path = os.path.join(test_dir, 'a_zip_file.zip')
    file_url = self.GetFileURLForPath(file_path)
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  'a_zip_file.zip')
    self._ClearLocalDownloadState(downloaded_pkg)

    self.DownloadAndWaitForStart(file_url)

    # Wait for the download to finish.
    self.WaitForAllDownloadsToComplete()

    # Verify that the download shelf is visible
    self.assertTrue(self.IsDownloadShelfVisible())

    # Verify that the file was correctly downloaded
    self.assertTrue(os.path.exists(downloaded_pkg))
    self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg))

  def testZipInIncognito(self):
    """Download and verify a zip in incognito window."""
    test_dir = os.path.join(os.path.abspath(self.DataDir()), 'downloads')
    file_path = os.path.join(test_dir, 'a_zip_file.zip')
    file_url = self.GetFileURLForPath(file_path)
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  'a_zip_file.zip')
    self._ClearLocalDownloadState(downloaded_pkg)

    self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)  # open incognito window
    # Downloads from incognito window do not figure in GetDownloadsInfo()
    # since the download manager's list doesn't contain it.
    # Using WaitUntil is the only resort.
    self.NavigateToURL(file_url, 1, 0)
    self.assertTrue(self.WaitUntil(lambda: os.path.exists(downloaded_pkg)))
    self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg))
    self.assertTrue(self.IsDownloadShelfVisible(1))

  def testSaveDangerousFile(self):
    """Verify that we can download and save a dangerous file."""
    file_path = self._GetDangerousDownload()
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  os.path.basename(file_path))
    self._ClearLocalDownloadState(downloaded_pkg)
    self._TriggerUnsafeDownload(os.path.basename(file_path))
    self.PerformActionOnDownload(self._GetDownloadId(),
                                 'save_dangerous_download')
    self.WaitForDownloadToComplete(downloaded_pkg)

    # Verify that the file was downloaded.
    self.assertTrue(os.path.exists(downloaded_pkg))
    self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg))
    self._DeleteAfterShutdown(downloaded_pkg)

  def testDeclineDangerousDownload(self):
    """Verify that we can decline dangerous downloads"""
    file_path = self._GetDangerousDownload()
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  os.path.basename(file_path))
    self._ClearLocalDownloadState(downloaded_pkg)
    self._TriggerUnsafeDownload(os.path.basename(file_path))
    self.PerformActionOnDownload(self._GetDownloadId(),
                                 'decline_dangerous_download')
    self.assertFalse(os.path.exists(downloaded_pkg))
    self.assertFalse(self.GetDownloadsInfo().Downloads())
    self.assertFalse(self.IsDownloadShelfVisible())

  def testRemoveDownload(self):
    """Verify that we can remove a download."""
    test_dir = os.path.join(os.path.abspath(self.DataDir()), 'downloads')
    file_path = os.path.join(test_dir, 'a_zip_file.zip')
    file_url = self.GetFileURLForPath(file_path)
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  'a_zip_file.zip')
    self._ClearLocalDownloadState(downloaded_pkg)

    self.DownloadAndWaitForStart(file_url)
    self.PerformActionOnDownload(self._GetDownloadId(), 'remove')

    # The download is removed from downloads, but not from the disk.
    self.assertFalse(self.GetDownloadsInfo().Downloads())
    self.assertTrue(os.path.exists(downloaded_pkg))
    self._DeleteAfterShutdown(downloaded_pkg)

  def testBigZip(self):
    """Verify that we can download a 1GB file.

    This test needs 2 GB of free space, 1 GB for the original zip file and
    another for the downloaded one.

    Note: This test increases automation timeout to 4 min.  Things might seem
          to hang.
    """
    # Create a 1 GB file on the fly
    file_path = self._MakeFile(2**30)
    self._DeleteAfterShutdown(file_path)
    file_url = self.GetFileURLForPath(file_path)
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  os.path.basename(file_path))
    self._ClearLocalDownloadState(downloaded_pkg)
    self.DownloadAndWaitForStart(file_url)
    self._DeleteAfterShutdown(downloaded_pkg)
    # Waiting for big file to download might exceed automation timeout.
    # Temporarily increase the automation timeout.
    self._CallFunctionWithNewTimeout(4 * 60 * 1000,  # 4 min.
                                     self.WaitForAllDownloadsToComplete)
    # Verify that the file was correctly downloaded
    self.assertTrue(os.path.exists(downloaded_pkg),
                    'Downloaded file %s missing.' % downloaded_pkg)
    self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg),
                    'Downloaded file %s does not match original' %
                      downloaded_pkg)

  def testFileRenaming(self):
    """Test file renaming when downloading a already-existing filename."""
    test_dir = os.path.join(os.path.abspath(self.DataDir()), 'downloads')
    file_url = 'file://%s' % os.path.join(test_dir, 'a_zip_file.zip')
    download_dir = self.GetDownloadDirectory().value()

    num_times = 5
    assert num_times > 1, 'needs to be > 1 to work'
    renamed_files = []
    for i in range(num_times):
      expected_filename = os.path.join(download_dir, 'a_zip_file.zip')
      if i > 0:  # Files after first download are renamed.
        expected_filename = os.path.join(download_dir,
                                         'a_zip_file (%d).zip' % i)
        renamed_files.append(expected_filename)
      self._ClearLocalDownloadState(expected_filename)
      self.DownloadAndWaitForStart(file_url)

    self.WaitForAllDownloadsToComplete()

    # Verify that all files exist and have the right name
    for filename in renamed_files:
      self.assertTrue(os.path.exists(filename))
      self._DeleteAfterShutdown(filename)

  def testCrazyFilenames(self):
    """Test downloading with filenames containing special chars.

       The files are created on the fly and cleaned after use.
    """
    download_dir = self.GetDownloadDirectory().value()
    filename = os.path.join(self.DataDir(), 'downloads', 'crazy_filenames.txt')
    crazy_filenames = self.EvalDataFrom(filename)
    logging.info('Testing with %d crazy filenames' % len(crazy_filenames))

    def _CreateFile(name):
      """Create and fill the given file with some junk."""
      fp = open(name, 'w')  # name could be unicode
      print >>fp, 'This is a junk file named %s. ' % repr(name) * 100
      fp.close()

    # Temp dir for hosting crazy filenames.
    temp_dir = tempfile.mkdtemp(prefix='download')
    self._DeleteAfterShutdown(unicode(temp_dir))
    # Windows has a dual nature dealing with unicode filenames.
    # While the files are internally saved as unicode, there's a non-unicode
    # aware API that returns a locale-dependent coding on the true unicode
    # filenames.  This messes up things.
    # Filesystem-interfacing functions like os.listdir() need to
    # be given unicode strings to "do the right thing" on win.
    # Ref: http://boodebr.org/main/python/all-about-python-and-unicode
    for filename in crazy_filenames:  # filename is unicode.
      utf8_filename = filename.encode('utf-8')
      file_path = os.path.join(temp_dir, utf8_filename)
      _CreateFile(os.path.join(temp_dir, filename))  # unicode file.
      file_url = self.GetFileURLForPath(file_path)
      downloaded_file = os.path.join(download_dir, filename)
      self._ClearLocalDownloadState(downloaded_file)
      self.DownloadAndWaitForStart(file_url)
    self.WaitForAllDownloadsToComplete()

    # Verify downloads.
    downloads = self.GetDownloadsInfo().Downloads()
    self.assertEqual(len(downloads), len(crazy_filenames))

    for filename in crazy_filenames:
      downloaded_file = os.path.join(download_dir, filename)
      self.assertTrue(os.path.exists(downloaded_file))
      self.assertTrue(   # Verify file contents.
          self._EqualFileContents(downloaded_file,
                                  os.path.join(temp_dir, filename)))
      os.path.exists(downloaded_file) and os.remove(downloaded_file)

  def _TriggerUnsafeDownload(self, filename, tab_index=0, windex=0):
    """Trigger download of an unsafe/dangerous filetype.

    Files explictly requested by the user (like navigating to a package, or
    clicking on a link) aren't marked unsafe.
    Only the ones where the user didn't directly initiate a download are
    marked unsafe.

    Navigates to download-dangerous.html which triggers the download.
    Waits until the download starts.

    Args:
      filename: the name of the file to trigger the download.
                This should exist in the 'dangerous' directory.
      tab_index: tab index. Default 0.
      windex: window index. Default 0.
    """
    dangerous_dir = os.path.join(
        self.DataDir(), 'downloads', 'dangerous')
    assert os.path.isfile(os.path.join(dangerous_dir, filename))
    file_url = self.GetFileURLForDataPath(os.path.join(
        dangerous_dir, 'download-dangerous.html')) + '?' + filename
    num_downloads = len(self.GetDownloadsInfo().Downloads())
    self.NavigateToURL(file_url, windex, tab_index)
    # It might take a while for the download to kick in, hold on until then.
    self.assertTrue(self.WaitUntil(
        lambda: len(self.GetDownloadsInfo().Downloads()) == num_downloads + 1))

  def testNoUnsafeDownloadsOnRestart(self):
    """Verify that unsafe file should not show up on session restart."""
    file_path = self._GetDangerousDownload()
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  os.path.basename(file_path))
    self._ClearLocalDownloadState(downloaded_pkg)
    self._TriggerUnsafeDownload(os.path.basename(file_path))
    self.assertTrue(self.IsDownloadShelfVisible())
    # Restart the browser and assert that the download was removed.
    self.RestartBrowser(clear_profile=False)
    self.assertFalse(os.path.exists(downloaded_pkg))
    self.assertFalse(self.IsDownloadShelfVisible())
    self.NavigateToURL("chrome://downloads")
    self.assertFalse(self.GetDownloadsInfo().Downloads())

  def testPauseAndResume(self):
    """Verify that pause and resume work while downloading a file.

    Note: This test increases automation timeout to 2 min.  Things might seem
          to hang.
    """
    # Create a 250 MB file on the fly
    file_path = self._MakeFile(2**28)

    file_url = self.GetFileURLForPath(file_path)
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  os.path.basename(file_path))
    self._ClearLocalDownloadState(downloaded_pkg)
    self.DownloadAndWaitForStart(file_url)

    self._DeleteAfterShutdown(downloaded_pkg)
    self._DeleteAfterShutdown(file_path)

    # Pause the download and assert that it is paused.
    pause_dict = self.PerformActionOnDownload(self._GetDownloadId(),
                                              'toggle_pause')
    if pause_dict['state'] == 'COMPLETE':
      logging.info('The download completed before pause. Stopping test.')
      return

    self.assertTrue(pause_dict['is_paused'])
    self.assertTrue(pause_dict['state'] == 'IN_PROGRESS')

    # Resume the download and assert it is not paused.
    resume_dict = self.PerformActionOnDownload(self._GetDownloadId(),
                                               'toggle_pause')
    self.assertFalse(resume_dict['is_paused'])

    # Waiting for big file to download might exceed automation timeout.
    # Temporarily increase the automation timeout.
    self._CallFunctionWithNewTimeout(2 * 60 * 1000,  # 2 min.
                                     self.WaitForAllDownloadsToComplete)

    # Verify that the file was correctly downloaded after pause and resume.
    self.assertTrue(os.path.exists(downloaded_pkg),
                    'Downloaded file %s missing.' % downloaded_pkg)
    self.assertTrue(self._EqualFileContents(file_path, downloaded_pkg),
                    'Downloaded file %s does not match original' %
                    downloaded_pkg)

  def testCancelDownload(self):
    """Verify that we can cancel a download."""
    # Create a big file (250 MB) on the fly, so that the download won't finish
    # before being cancelled.
    file_path = self._MakeFile(2**28)
    file_url = self.GetFileURLForPath(file_path)
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  os.path.basename(file_path))
    self._ClearLocalDownloadState(downloaded_pkg)
    self.DownloadAndWaitForStart(file_url)
    self.PerformActionOnDownload(self._GetDownloadId(), 'cancel')
    self._DeleteAfterShutdown(file_path)

    state = self.GetDownloadsInfo().Downloads()[0]['state']
    if state == 'COMPLETE':
      logging.info('The download completed before cancel. Test stopped.')
      return

    # Verify the download has been cancelled.
    self.assertEqual('CANCELLED',
                     self.GetDownloadsInfo().Downloads()[0]['state'])
    self.assertFalse(os.path.exists(downloaded_pkg))

  def testDownloadsPersistence(self):
    """Verify that download history persists on session restart."""
    test_dir = os.path.join(os.path.abspath(self.DataDir()), 'downloads')
    file_url = self.GetFileURLForPath(os.path.join(test_dir, 'a_zip_file.zip'))
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  'a_zip_file.zip')
    self._ClearLocalDownloadState(downloaded_pkg)
    self.DownloadAndWaitForStart(file_url)
    downloads = self.GetDownloadsInfo().Downloads()
    self.assertEqual(1, len(downloads))
    self.assertEqual('a_zip_file.zip', downloads[0]['file_name'])
    file_url = downloads[0]['url']
    self.RestartBrowser(clear_profile=False)
    # Trigger the download service to get loaded after restart.
    self.NavigateToURL('chrome://downloads/')
    # Verify that there's no download shelf anymore.
    self.assertFalse(self.IsDownloadShelfVisible(),
                     'Download shelf persisted browser restart.')
    # Verify that the download history persists.
    downloads = self.GetDownloadsInfo().Downloads()
    self.assertEqual(1, len(downloads))
    self.assertEqual('a_zip_file.zip', downloads[0]['file_name'])
    self.assertEqual(file_url, downloads[0]['url'])
    self._DeleteAfterShutdown(downloaded_pkg)

  def testDownloadTheme(self):
    """Verify downloading and saving a theme file installs the theme."""
    test_dir = os.path.join(os.path.abspath(self.DataDir()), 'extensions')
    file_url = self.GetFileURLForPath(os.path.join(test_dir, 'theme.crx'))
    downloaded_pkg = os.path.join(self.GetDownloadDirectory().value(),
                                  'theme.crx')
    self._ClearLocalDownloadState(downloaded_pkg)

    self.DownloadAndWaitForStart(file_url)
    self.PerformActionOnDownload(self._GetDownloadId(),
                                 'save_dangerous_download')
    # Wait for the theme to be set automatically.
    self.assertTrue(self.WaitUntilDownloadedThemeSet('camo theme'))
    self.assertTrue(self.WaitUntil(lambda path: not os.path.exists(path),
                                   args=[downloaded_pkg]))


if __name__ == '__main__':
  pyauto_functional.Main()