summaryrefslogtreecommitdiffstats
path: root/chrome/test/functional/databases.py
blob: c4f064a1f411295f14f8429133add2c290c5add3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
#!/usr/bin/python
# Copyright (c) 2011 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import simplejson
import os

import pyauto_functional
import pyauto


class SQLExecutionError(RuntimeError):
  """Represents an error that occurs while executing an SQL statement."""
  pass


class DatabasesTest(pyauto.PyUITest):
  """Test of Web SQL Databases."""

  def __init__(self, methodName='runTest'):
    super(DatabasesTest, self).__init__(methodName)
    # HTML page used for database testing.
    self.TEST_PAGE_URL = self.GetFileURLForDataPath(
        'database', 'database_tester.html')

  def _ParseAndCheckResultFromTestPage(self, json):
    """Helper function that parses the message sent from |TEST_PAGE_URL| and
    checks that it succeeded.

    Args:
      json: the message, encoded in JSON, that the test page sent to us

    Returns:
      dictionary representing the result from the test page, with format:
      {
        'succeeded': boolean
        'errorMsg': optional string
        'returnValue': optional any type
      }

    Raises:
      SQLExecutionError if the message contains an error message
    """
    result_dict = simplejson.loads(json)
    if result_dict['succeeded'] == False:
      raise SQLExecutionError(result_dict['errorMsg'])
    return result_dict

  def _CreateTable(self, tab_index=0, windex=0):
    """Creates a table in the database.

    This should only be called once per database. This should be called before
    attempting to insert, update, delete, or get the records in the database.

    Defaults to first tab in first window.

    Args:
      tab_index: index of the tab that will create the database
      windex: index of the window containing the tab that will create the
              database
    """
    json = self.CallJavascriptFunc('createTable', [], tab_index, windex)
    self._ParseAndCheckResultFromTestPage(json)

  def _InsertRecord(self, record, tab_index=0, windex=0):
    """Inserts a record, i.e., a row, into the database.

    Defaults to first tab in first window.

    Args:
      record: string that will be added as a new row in the database
      tab_index: index of the tab that will insert the record
      windex: index of the window containing the tab that will insert the record
    """
    json = self.CallJavascriptFunc('insertRecord', [record], tab_index, windex)
    self._ParseAndCheckResultFromTestPage(json)

  def _UpdateRecord(self, index, updated_record, tab_index=0, windex=0):
    """Updates a record, i.e., a row, in the database.

    Defaults to first tab in first window.

    Args:
      index: index of the record to update. Index 0 refers to the oldest item in
             the database
      updated_record: string that will be used to update the row in the database
      tab_index: index of the tab that will update the record
      windex: index of the window containing the tab that will update the record
    """
    json = self.CallJavascriptFunc(
        'updateRecord', [index, updated_record], tab_index, windex)
    self._ParseAndCheckResultFromTestPage(json)

  def _DeleteRecord(self, index, tab_index=0, windex=0):
    """Deletes a record, i.e., a row, from the database.

    Defaults to first tab in first window.

    Args:
      index: index of the record to be deleted. Index 0 refers to the oldest
             item in the database
      tab_index: index of the tab that will delete the record
      windex: index of the window containing the tab that will delete the record
    """
    json = self.CallJavascriptFunc('deleteRecord', [index], tab_index, windex)
    self._ParseAndCheckResultFromTestPage(json)

  def _GetRecords(self, tab_index=0, windex=0):
    """Returns all the records, i.e., rows, in the database.

    The records are ordererd from oldest to newest.

    Defaults to first tab in first window.

    Returns:
      array of all the records in the database

    Args:
      tab_index: index of the tab that will query the database
      windex: index of the window containing the tab that will query the
              database
    """
    json = self.CallJavascriptFunc('getRecords', [], tab_index, windex)
    return self._ParseAndCheckResultFromTestPage(json)['returnValue']

  def _HasTable(self, tab_index=0, windex=0):
    """Returns whether the page has a table in its database."""
    try:
      self._GetRecords(tab_index, windex)
    except SQLExecutionError:
      return False
    return True

  def testInsertRecord(self):
    """Insert records to the database."""
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()
    self._InsertRecord('text')
    self.assertEquals(['text'], self._GetRecords())
    self._InsertRecord('text2')
    self.assertEquals(['text', 'text2'], self._GetRecords())

  def testUpdateRecord(self):
    """Update records in the database."""
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()

    # Update the first record.
    self._InsertRecord('text')
    self._UpdateRecord(0, '0')
    records = self._GetRecords()
    self.assertEquals(1, len(records))
    self.assertEquals('0', records[0])

    # Update the middle record.
    self._InsertRecord('1')
    self._InsertRecord('2')
    self._UpdateRecord(1, '1000')
    self.assertEquals(['0', '1000', '2'], self._GetRecords())

  def testDeleteRecord(self):
    """Delete records in the database."""
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()

    # Delete the first and only record.
    self._InsertRecord('text')
    self._DeleteRecord(0)
    self.assertFalse(self._GetRecords())

    # Delete the middle record.
    self._InsertRecord('0')
    self._InsertRecord('1')
    self._InsertRecord('2')
    self._DeleteRecord(1)
    self.assertEquals(['0', '2'], self._GetRecords())

  def testDeleteNonexistentRow(self):
    """Attempts to delete a nonexistent row in the table."""
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()
    self._InsertRecord('text')
    did_throw_exception = False
    try:
      self._DeleteRecord(1)
    except SQLExecutionError:
      did_throw_exception = True
    self.assertTrue(did_throw_exception)
    self.assertEquals(['text'], self._GetRecords())

  def testDatabaseOperations(self):
    """Insert, update, and delete records in the database."""
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()

    for i in range(10):
      self._InsertRecord(str(i))
    records = self._GetRecords()
    self.assertEqual([str(i) for i in range(10)], records)

    for i in range(10):
      self._UpdateRecord(i, str(i * i))
    records = self._GetRecords()
    self.assertEqual([str(i * i) for i in range(10)], records)

    for i in range(10):
      self._DeleteRecord(0)
    self.assertEqual(0, len(self._GetRecords()))

  def testReloadActiveTab(self):
    """Create records in the database and verify they persist after reload."""
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()
    self._InsertRecord('text')
    self.ReloadActiveTab()
    self.assertEquals(['text'], self._GetRecords())

  def testIncognitoCannotReadRegularDatabase(self):
    """Attempt to read a database created in a regular browser from an incognito
    browser.
    """
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()
    self._InsertRecord('text')
    self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)
    self.NavigateToURL(self.TEST_PAGE_URL, 1, 0)
    self.assertFalse(self._HasTable(windex=1))
    self._CreateTable(windex=1)
    self.assertFalse(self._GetRecords(windex=1))

  def testRegularCannotReadIncognitoDatabase(self):
    """Attempt to read a database created in an incognito browser from a regular
    browser.
    """
    self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)
    self.NavigateToURL(self.TEST_PAGE_URL, 1, 0)
    self._CreateTable(windex=1)
    self._InsertRecord('text', windex=1)

    # Verify a regular browser cannot read the incognito database.
    self.NavigateToURL(self.TEST_PAGE_URL)
    self.assertFalse(self._HasTable())
    self._CreateTable()
    self.assertFalse(self._GetRecords())

  def testDbModificationPersistInSecondTab(self):
    """Verify DB changes within first tab are present in the second tab."""
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()
    self._InsertRecord('text')
    self.AppendTab(pyauto.GURL(self.TEST_PAGE_URL))
    self._UpdateRecord(0, '0', tab_index=0)
    tab1_records = self._GetRecords(tab_index=0)
    tab2_records = self._GetRecords(tab_index=1)
    self.assertEquals(1, len(tab1_records))
    self.assertEquals('0', tab1_records[0])
    self.assertEquals(1, len(tab2_records))
    self.assertEquals(tab1_records[0], tab2_records[0])

  def testIncognitoDatabaseNotPersistent(self):
    """Verify incognito database is removed after incognito window closes."""
    self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)
    self.NavigateToURL(self.TEST_PAGE_URL, 1, 0)
    self._CreateTable(windex=1)
    self._InsertRecord('text', windex=1)
    self.CloseBrowserWindow(1)
    self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)
    self.NavigateToURL(self.TEST_PAGE_URL, 1, 0)
    self.assertFalse(self._HasTable(windex=1))

  def testModificationsPersistAfterRendererCrash(self):
    """Verify database modifications persist after crashing tab."""
    self.AppendTab(pyauto.GURL('about:blank'))
    self.ActivateTab(0)
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()
    self._InsertRecord('1')
    self.KillRendererProcess(
        self.GetBrowserInfo()['windows'][0]['tabs'][0]['renderer_pid'])
    self.ReloadActiveTab()
    self.assertEqual(['1'], self._GetRecords())

  def testIncognitoDBPersistentAcrossTabs(self):
    """Test to check if database modifications are persistent across tabs
    in incognito window.
    """
    self.RunCommand(pyauto.IDC_NEW_INCOGNITO_WINDOW)
    self.NavigateToURL(self.TEST_PAGE_URL, 1, 0)
    self._CreateTable(windex=1)
    self._InsertRecord('text', windex=1)
    self.AppendTab(pyauto.GURL(self.TEST_PAGE_URL), 1)
    self.assertEquals(['text'], self._GetRecords(1, 1))

  def testDatabasePersistsAfterRelaunch(self):
    """Verify database modifications persist after restarting browser."""
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()
    self._InsertRecord('text')
    self.RestartBrowser(clear_profile=False)
    self.NavigateToURL(self.TEST_PAGE_URL)
    self.assertEquals(['text'], self._GetRecords())

  def testDeleteAndUpdateDatabase(self):
    """Verify can modify database after deleting it."""
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()
    self._InsertRecord('text')
    # ClearBrowsingData doesn't return and times out
    self.ClearBrowsingData(['COOKIES'], 'EVERYTHING')
    self.NavigateToURL(self.TEST_PAGE_URL)
    self._CreateTable()
    self._InsertRecord('text2')
    self.assertEquals(['text2'], self._GetRecords())


if __name__ == '__main__':
  pyauto_functional.Main()