# Copyright 2012 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, # either express or implied. See the License for the specific # language governing permissions and limitations under the License. """File Interface for Google Cloud Storage.""" from __future__ import with_statement __all__ = ['delete', 'listbucket', 'open', 'stat', ] import logging import StringIO import urllib import xml.etree.cElementTree as ET from . import api_utils from . import common from . import errors from . import storage_api def open(filename, mode='r', content_type=None, options=None, read_buffer_size=storage_api.ReadBuffer.DEFAULT_BUFFER_SIZE, retry_params=None, _account_id=None): """Opens a Google Cloud Storage file and returns it as a File-like object. Args: filename: A Google Cloud Storage filename of form '/bucket/filename'. mode: 'r' for reading mode. 'w' for writing mode. In reading mode, the file must exist. In writing mode, a file will be created or be overrode. content_type: The MIME type of the file. str. Only valid in writing mode. options: A str->basestring dict to specify additional headers to pass to GCS e.g. {'x-goog-acl': 'private', 'x-goog-meta-foo': 'foo'}. Supported options are x-goog-acl, x-goog-meta-, cache-control, content-disposition, and content-encoding. Only valid in writing mode. See https://developers.google.com/storage/docs/reference-headers for details. read_buffer_size: The buffer size for read. Read keeps a buffer and prefetches another one. To minimize blocking for large files, always read by buffer size. To minimize number of RPC requests for small files, set a large buffer size. Max is 30MB. retry_params: An instance of api_utils.RetryParams for subsequent calls to GCS from this file handle. If None, the default one is used. _account_id: Internal-use only. Returns: A reading or writing buffer that supports File-like interface. Buffer must be closed after operations are done. Raises: errors.AuthorizationError: if authorization failed. errors.NotFoundError: if an object that's expected to exist doesn't. ValueError: invalid open mode or if content_type or options are specified in reading mode. """ common.validate_file_path(filename) api = storage_api._get_storage_api(retry_params=retry_params, account_id=_account_id) filename = api_utils._quote_filename(filename) if mode == 'w': common.validate_options(options) return storage_api.StreamingBuffer(api, filename, content_type, options) elif mode == 'r': if content_type or options: raise ValueError('Options and content_type can only be specified ' 'for writing mode.') return storage_api.ReadBuffer(api, filename, buffer_size=read_buffer_size) else: raise ValueError('Invalid mode %s.' % mode) def delete(filename, retry_params=None, _account_id=None): """Delete a Google Cloud Storage file. Args: filename: A Google Cloud Storage filename of form '/bucket/filename'. retry_params: An api_utils.RetryParams for this call to GCS. If None, the default one is used. _account_id: Internal-use only. Raises: errors.NotFoundError: if the file doesn't exist prior to deletion. """ api = storage_api._get_storage_api(retry_params=retry_params, account_id=_account_id) common.validate_file_path(filename) filename = api_utils._quote_filename(filename) status, resp_headers, _ = api.delete_object(filename) errors.check_status(status, [204], filename, resp_headers=resp_headers) def stat(filename, retry_params=None, _account_id=None): """Get GCSFileStat of a Google Cloud storage file. Args: filename: A Google Cloud Storage filename of form '/bucket/filename'. retry_params: An api_utils.RetryParams for this call to GCS. If None, the default one is used. _account_id: Internal-use only. Returns: a GCSFileStat object containing info about this file. Raises: errors.AuthorizationError: if authorization failed. errors.NotFoundError: if an object that's expected to exist doesn't. """ common.validate_file_path(filename) api = storage_api._get_storage_api(retry_params=retry_params, account_id=_account_id) status, headers, _ = api.head_object(api_utils._quote_filename(filename)) errors.check_status(status, [200], filename, resp_headers=headers) file_stat = common.GCSFileStat( filename=filename, st_size=headers.get('content-length'), st_ctime=common.http_time_to_posix(headers.get('last-modified')), etag=headers.get('etag'), content_type=headers.get('content-type'), metadata=common.get_metadata(headers)) return file_stat def _copy2(src, dst, metadata=None, retry_params=None): """Copy the file content from src to dst. Internal use only! Args: src: /bucket/filename dst: /bucket/filename metadata: a dict of metadata for this copy. If None, old metadata is copied. For example, {'x-goog-meta-foo': 'bar'}. retry_params: An api_utils.RetryParams for this call to GCS. If None, the default one is used. Raises: errors.AuthorizationError: if authorization failed. errors.NotFoundError: if an object that's expected to exist doesn't. """ common.validate_file_path(src) common.validate_file_path(dst) if metadata is None: metadata = {} copy_meta = 'COPY' else: copy_meta = 'REPLACE' metadata.update({'x-goog-copy-source': src, 'x-goog-metadata-directive': copy_meta}) api = storage_api._get_storage_api(retry_params=retry_params) status, resp_headers, _ = api.put_object( api_utils._quote_filename(dst), headers=metadata) errors.check_status(status, [200], src, metadata, resp_headers) def listbucket(path_prefix, marker=None, prefix=None, max_keys=None, delimiter=None, retry_params=None, _account_id=None): """Returns a GCSFileStat iterator over a bucket. Optional arguments can limit the result to a subset of files under bucket. This function has two modes: 1. List bucket mode: Lists all files in the bucket without any concept of hierarchy. GCS doesn't have real directory hierarchies. 2. Directory emulation mode: If you specify the 'delimiter' argument, it is used as a path separator to emulate a hierarchy of directories. In this mode, the "path_prefix" argument should end in the delimiter specified (thus designates a logical directory). The logical directory's contents, both files and subdirectories, are listed. The names of subdirectories returned will end with the delimiter. So listbucket can be called with the subdirectory name to list the subdirectory's contents. Args: path_prefix: A Google Cloud Storage path of format "/bucket" or "/bucket/prefix". Only objects whose fullpath starts with the path_prefix will be returned. marker: Another path prefix. Only objects whose fullpath starts lexicographically after marker will be returned (exclusive). prefix: Deprecated. Use path_prefix. max_keys: The limit on the number of objects to return. int. For best performance, specify max_keys only if you know how many objects you want. Otherwise, this method requests large batches and handles pagination for you. delimiter: Use to turn on directory mode. str of one or multiple chars that your bucket uses as its directory separator. retry_params: An api_utils.RetryParams for this call to GCS. If None, the default one is used. _account_id: Internal-use only. Examples: For files "/bucket/a", "/bucket/bar/1" "/bucket/foo", "/bucket/foo/1", "/bucket/foo/2/1", "/bucket/foo/3/1", Regular mode: listbucket("/bucket/f", marker="/bucket/foo/1") will match "/bucket/foo/2/1", "/bucket/foo/3/1". Directory mode: listbucket("/bucket/", delimiter="/") will match "/bucket/a, "/bucket/bar/" "/bucket/foo", "/bucket/foo/". listbucket("/bucket/foo/", delimiter="/") will match "/bucket/foo/1", "/bucket/foo/2/", "/bucket/foo/3/" Returns: Regular mode: A GCSFileStat iterator over matched files ordered by filename. The iterator returns GCSFileStat objects. filename, etag, st_size, st_ctime, and is_dir are set. Directory emulation mode: A GCSFileStat iterator over matched files and directories ordered by name. The iterator returns GCSFileStat objects. For directories, only the filename and is_dir fields are set. The last name yielded can be used as next call's marker. """ if prefix: common.validate_bucket_path(path_prefix) bucket = path_prefix else: bucket, prefix = common._process_path_prefix(path_prefix) if marker and marker.startswith(bucket): marker = marker[len(bucket) + 1:] api = storage_api._get_storage_api(retry_params=retry_params, account_id=_account_id) options = {} if marker: options['marker'] = marker if max_keys: options['max-keys'] = max_keys if prefix: options['prefix'] = prefix if delimiter: options['delimiter'] = delimiter return _Bucket(api, bucket, options) class _Bucket(object): """A wrapper for a GCS bucket as the return value of listbucket.""" def __init__(self, api, path, options): """Initialize. Args: api: storage_api instance. path: bucket path of form '/bucket'. options: a dict of listbucket options. Please see listbucket doc. """ self._init(api, path, options) def _init(self, api, path, options): self._api = api self._path = path self._options = options.copy() self._get_bucket_fut = self._api.get_bucket_async( self._path + '?' + urllib.urlencode(self._options)) self._last_yield = None self._new_max_keys = self._options.get('max-keys') def __getstate__(self): options = self._options if self._last_yield: options['marker'] = self._last_yield.filename[len(self._path) + 1:] if self._new_max_keys is not None: options['max-keys'] = self._new_max_keys return {'api': self._api, 'path': self._path, 'options': options} def __setstate__(self, state): self._init(state['api'], state['path'], state['options']) def __iter__(self): """Iter over the bucket. Yields: GCSFileStat: a GCSFileStat for an object in the bucket. They are ordered by GCSFileStat.filename. """ total = 0 max_keys = self._options.get('max-keys') while self._get_bucket_fut: status, resp_headers, content = self._get_bucket_fut.get_result() errors.check_status(status, [200], self._path, resp_headers=resp_headers, extras=self._options) if self._should_get_another_batch(content): self._get_bucket_fut = self._api.get_bucket_async( self._path + '?' + urllib.urlencode(self._options)) else: self._get_bucket_fut = None root = ET.fromstring(content) dirs = self._next_dir_gen(root) files = self._next_file_gen(root) next_file = files.next() next_dir = dirs.next() while ((max_keys is None or total < max_keys) and not (next_file is None and next_dir is None)): total += 1 if next_file is None: self._last_yield = next_dir next_dir = dirs.next() elif next_dir is None: self._last_yield = next_file next_file = files.next() elif next_dir < next_file: self._last_yield = next_dir next_dir = dirs.next() elif next_file < next_dir: self._last_yield = next_file next_file = files.next() else: logging.error( 'Should never reach. next file is %r. next dir is %r.', next_file, next_dir) if self._new_max_keys: self._new_max_keys -= 1 yield self._last_yield def _next_file_gen(self, root): """Generator for next file element in the document. Args: root: root element of the XML tree. Yields: GCSFileStat for the next file. """ for e in root.getiterator(common._T_CONTENTS): st_ctime, size, etag, key = None, None, None, None for child in e.getiterator('*'): if child.tag == common._T_LAST_MODIFIED: st_ctime = common.dt_str_to_posix(child.text) elif child.tag == common._T_ETAG: etag = child.text elif child.tag == common._T_SIZE: size = child.text elif child.tag == common._T_KEY: key = child.text yield common.GCSFileStat(self._path + '/' + key, size, etag, st_ctime) e.clear() yield None def _next_dir_gen(self, root): """Generator for next directory element in the document. Args: root: root element in the XML tree. Yields: GCSFileStat for the next directory. """ for e in root.getiterator(common._T_COMMON_PREFIXES): yield common.GCSFileStat( self._path + '/' + e.find(common._T_PREFIX).text, st_size=None, etag=None, st_ctime=None, is_dir=True) e.clear() yield None def _should_get_another_batch(self, content): """Whether to issue another GET bucket call. Args: content: response XML. Returns: True if should, also update self._options for the next request. False otherwise. """ if ('max-keys' in self._options and self._options['max-keys'] <= common._MAX_GET_BUCKET_RESULT): return False elements = self._find_elements( content, set([common._T_IS_TRUNCATED, common._T_NEXT_MARKER])) if elements.get(common._T_IS_TRUNCATED, 'false').lower() != 'true': return False next_marker = elements.get(common._T_NEXT_MARKER) if next_marker is None: self._options.pop('marker', None) return False self._options['marker'] = next_marker return True def _find_elements(self, result, elements): """Find interesting elements from XML. This function tries to only look for specified elements without parsing the entire XML. The specified elements is better located near the beginning. Args: result: response XML. elements: a set of interesting element tags. Returns: A dict from element tag to element value. """ element_mapping = {} result = StringIO.StringIO(result) for _, e in ET.iterparse(result, events=('end',)): if not elements: break if e.tag in elements: element_mapping[e.tag] = e.text elements.remove(e.tag) return element_mapping