Using the API#
Authentication / Configuration#
Use
Client
objects to configure your applications.Client
objects hold both aproject
and an authenticated connection to the BigQuery service.The authentication credentials can be implicitly determined from the environment or directly via
from_service_account_json
andfrom_service_account_p12
.After setting
GOOGLE_APPLICATION_CREDENTIALS
andGOOGLE_CLOUD_PROJECT
environment variables, create an instance ofClient
.>>> from google.cloud import bigquery >>> client = bigquery.Client()
Projects#
A project is the top-level container in the BigQuery
API: it is tied
closely to billing, and can provide default access control across all its
datasets. If no project
is passed to the client container, the library
attempts to infer a project using the environment (including explicit
environment variables, GAE, and GCE).
To override the project inferred from the environment, pass an explicit
project
to the constructor, or to either of the alternative
classmethod
factories:
>>> from google.cloud import bigquery >>> client = bigquery.Client(project='PROJECT_ID')
Project ACLs#
Each project has an access control list granting reader / writer / owner permission to one or more entities. This list cannot be queried or set via the API: it must be managed using the Google Developer Console.
Datasets#
A dataset represents a collection of tables, and applies several default policies to tables as they are created:
- An access control list (ACL). When created, a dataset has an ACL which maps to the ACL inherited from its project.
- A default table expiration period. If set, tables created within the dataset will have the value as their expiration period.
Dataset operations#
List datasets for the client’s project:
datasets, token = client.list_datasets() # API request
while True:
for dataset in datasets:
do_something_with(dataset)
if token is None:
break
datasets, token = client.list_datasets(page_token=token) # API request
Create a new dataset for the client’s project:
dataset = client.dataset(DATASET_NAME)
dataset.create() # API request
Check for the existence of a dataset:
assert not dataset.exists() # API request
dataset.create() # API request
assert dataset.exists() # API request
Refresh metadata for a dataset (to pick up changes made by another client):
assert dataset.description == ORIGINAL_DESCRIPTION
dataset.description = LOCALLY_CHANGED_DESCRIPTION
assert dataset.description == LOCALLY_CHANGED_DESCRIPTION
dataset.reload() # API request
assert dataset.description == ORIGINAL_DESCRIPTION
Patch metadata for a dataset:
ONE_DAY_MS = 24 * 60 * 60 * 1000
assert dataset.description == ORIGINAL_DESCRIPTION
dataset.patch(
description=PATCHED_DESCRIPTION,
default_table_expiration_ms=ONE_DAY_MS
) # API request
assert dataset.description == PATCHED_DESCRIPTION
assert dataset.default_table_expiration_ms == ONE_DAY_MS
Replace the ACL for a dataset, and update all writeable fields:
>>> from google.cloud import bigquery
>>> client = bigquery.Client()
>>> dataset = client.dataset('dataset_name')
>>> dataset.get() # API request
>>> acl = list(dataset.acl)
>>> acl.append(bigquery.Access(role='READER', entity_type='domain', entity='example.com'))
>>> dataset.acl = acl
>>> dataset.update() # API request
Delete a dataset:
assert dataset.exists() # API request
dataset.delete()
assert not dataset.exists() # API request
Tables#
Tables exist within datasets. List tables for the dataset:
tables, token = dataset.list_tables() # API request
assert len(tables) == 0
assert token is None
table = dataset.table(TABLE_NAME)
table.view_query = QUERY
table.create() # API request
tables, token = dataset.list_tables() # API request
assert len(tables) == 1
assert tables[0].name == TABLE_NAME
Create a table:
table = dataset.table(TABLE_NAME, SCHEMA)
table.create() # API request
Check for the existence of a table:
table = dataset.table(TABLE_NAME, SCHEMA)
assert not table.exists() # API request
table.create() # API request
assert table.exists() # API request
Refresh metadata for a table (to pick up changes made by another client):
assert table.friendly_name == ORIGINAL_FRIENDLY_NAME
assert table.description == ORIGINAL_DESCRIPTION
table.friendly_name = LOCALLY_CHANGED_FRIENDLY_NAME
table.description = LOCALLY_CHANGED_DESCRIPTION
table.reload() # API request
assert table.friendly_name == ORIGINAL_FRIENDLY_NAME
assert table.description == ORIGINAL_DESCRIPTION
Patch specific properties for a table:
assert table.friendly_name == ORIGINAL_FRIENDLY_NAME
assert table.description == ORIGINAL_DESCRIPTION
table.patch(
friendly_name=PATCHED_FRIENDLY_NAME,
description=PATCHED_DESCRIPTION,
) # API request
assert table.friendly_name == PATCHED_FRIENDLY_NAME
assert table.description == PATCHED_DESCRIPTION
Update all writable metadata for a table
assert table.friendly_name == ORIGINAL_FRIENDLY_NAME
assert table.description == ORIGINAL_DESCRIPTION
NEW_SCHEMA = table.schema[:]
NEW_SCHEMA.append(SchemaField('phone', 'string'))
table.friendly_name = UPDATED_FRIENDLY_NAME
table.description = UPDATED_DESCRIPTION
table.schema = NEW_SCHEMA
table.update() # API request
assert table.friendly_name == UPDATED_FRIENDLY_NAME
assert table.description == UPDATED_DESCRIPTION
assert table.schema == NEW_SCHEMA
Get rows from a table’s data:
rows, _, token = table.fetch_data()
while True:
for row in rows:
do_something(row)
if token is None:
break
rows, _, token = table.fetch_data(page_token=token)
Insert rows into a table’s data:
ROWS_TO_INSERT = [
(u'Phred Phlyntstone', 32),
(u'Wylma Phlyntstone', 29),
]
table.insert_data(ROWS_TO_INSERT)
Upload table data from a file:
writer = csv.writer(csv_file)
writer.writerow((b'full_name', b'age'))
writer.writerow((b'Phred Phlyntstone', b'32'))
writer.writerow((b'Wylma Phlyntstone', b'29'))
csv_file.flush()
with open(csv_file.name, 'rb') as readable:
table.upload_from_file(
readable, source_format='CSV', skip_leading_rows=1)
Delete a table:
assert table.exists() # API request
table.delete() # API request
assert not table.exists() # API request
Jobs#
Jobs describe actions peformed on data in BigQuery tables:
- Load data into a table
- Run a query against data in one or more tables
- Extract data from a table
- Copy a table
List jobs for a project:
jobs, token = client.list_jobs() # API request
while True:
for job in jobs:
do_something_with(job)
if token is None:
break
jobs, token = client.list_jobs(page_token=token) # API request
Querying data (synchronous)#
Run a query which can be expected to complete within bounded time:
query = client.run_sync_query(LIMITED)
query.timeout_ms = TIMEOUT_MS
query.run() # API request
assert query.complete
assert len(query.rows) == LIMIT
assert [field.name for field in query.schema] == ['name']
If the rows returned by the query do not fit into the inital response,
then we need to fetch the remaining rows via fetch_data
:
query = client.run_sync_query(LIMITED)
query.timeout_ms = TIMEOUT_MS
query.max_results = PAGE_SIZE
query.run() # API request
assert query.complete
assert query.page_token is not None
assert len(query.rows) == PAGE_SIZE
assert [field.name for field in query.schema] == ['name']
rows = query.rows
token = query.page_token
while True:
do_something_with(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(
page_token=token) # API request
If the query takes longer than the timeout allowed, query.complete
will be False
. In that case, we need to poll the associated job until
it is done, and then fetch the reuslts:
query = client.run_sync_query(QUERY)
query.timeout_ms = TIMEOUT_MS
query.use_query_cache = False
query.run() # API request
assert not query.complete
job = query.job
job.reload() # API rquest
retry_count = 0
while retry_count < 10 and job.state != u'DONE':
time.sleep(1.5**retry_count) # exponential backoff
retry_count += 1
job.reload() # API request
assert job.state == u'DONE'
rows, total_count, token = query.fetch_data() # API request
while True:
do_something_with(rows)
if token is None:
break
rows, total_count, token = query.fetch_data(
page_token=token) # API request
Querying data (asynchronous)#
Background a query, loading the results into a table:
>>> from google.cloud import bigquery
>>> client = bigquery.Client()
>>> query = """\
SELECT firstname + ' ' + last_name AS full_name,
FLOOR(DATEDIFF(CURRENT_DATE(), birth_date) / 365) AS age
FROM dataset_name.persons
"""
>>> dataset = client.dataset('dataset_name')
>>> table = dataset.table(name='person_ages')
>>> job = client.run_async_query('fullname-age-query-job', query)
>>> job.destination = table
>>> job.write_disposition= 'truncate'
>>> job.name
'fullname-age-query-job'
>>> job.job_type
'query'
>>> job.created
None
>>> job.state
None
Note
google.cloud.bigquery
generates a UUID for each job.- The
created
andstate
fields are not set until the job is submitted to the BigQuery back-end.
Then, begin executing the job on the server:
>>> job.begin() # API call
>>> job.created
datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=<UTC>)
>>> job.state
'RUNNING'
Poll until the job is complete:
>>> import time
>>> retry_count = 100
>>> while retry_count > 0 and job.state != 'DONE':
... retry_count -= 1
... time.sleep(10)
... job.reload() # API call
>>> job.state
'done'
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)
Retrieve the results:
>>> results = job.results()
>>> rows, total_count, token = query.fetch_data() # API requet
>>> while True:
... do_something_with(rows)
... if token is None:
... break
... rows, total_count, token = query.fetch_data(
... page_token=token) # API request
Inserting data (asynchronous)#
Start a job loading data asynchronously from a set of CSV files, located on Google Cloud Storage, appending rows into an existing table. First, create the job locally:
>>> from google.cloud import bigquery
>>> from google.cloud.bigquery import SchemaField
>>> client = bigquery.Client()
>>> table = dataset.table(name='person_ages')
>>> table.schema = [
... SchemaField('full_name', 'STRING', mode='required'),
... SchemaField('age', 'INTEGER', mode='required)]
>>> job = client.load_table_from_storage(
... 'load-from-storage-job', table, 'gs://bucket-name/object-prefix*')
>>> job.source_format = 'CSV'
>>> job.skip_leading_rows = 1 # count of skipped header rows
>>> job.write_disposition = 'truncate'
>>> job.name
'load-from-storage-job'
>>> job.job_type
'load'
>>> job.created
None
>>> job.state
None
Note
google.cloud.bigquery
generates a UUID for each job.- The
created
andstate
fields are not set until the job is submitted to the BigQuery back-end.
Then, begin executing the job on the server:
>>> job.begin() # API call
>>> job.created
datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=<UTC>)
>>> job.state
'RUNNING'
Poll until the job is complete:
>>> import time
>>> retry_count = 100
>>> while retry_count > 0 and job.state != 'DONE':
... retry_count -= 1
... time.sleep(10)
... job.reload() # API call
>>> job.state
'done'
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)
Exporting data (async)#
Start a job exporting a table’s data asynchronously to a set of CSV files, located on Google Cloud Storage. First, create the job locally:
>>> from google.cloud import bigquery
>>> client = bigquery.Client()
>>> table = dataset.table(name='person_ages')
>>> job = client.extract_table_to_storage(
... 'extract-person-ages-job', table,
... 'gs://bucket-name/export-prefix*.csv')
... job.destination_format = 'CSV'
... job.print_header = True
... job.write_disposition = 'truncate'
>>> job.name
'extract-person-ages-job'
>>> job.job_type
'extract'
>>> job.created
None
>>> job.state
None
Note
google.cloud.bigquery
generates a UUID for each job.- The
created
andstate
fields are not set until the job is submitted to the BigQuery back-end.
Then, begin executing the job on the server:
>>> job.begin() # API call
>>> job.created
datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=<UTC>)
>>> job.state
'RUNNING'
Poll until the job is complete:
>>> import time
>>> retry_count = 100
>>> while retry_count > 0 and job.state != 'DONE':
... retry_count -= 1
... time.sleep(10)
... job.reload() # API call
>>> job.state
'done'
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)
Copy tables (async)#
First, create the job locally:
>>> from google.cloud import bigquery
>>> client = bigquery.Client()
>>> source_table = dataset.table(name='person_ages')
>>> destination_table = dataset.table(name='person_ages_copy')
>>> job = client.copy_table(
... 'copy-table-job', destination_table, source_table)
>>> job.name
'copy-table-job'
>>> job.job_type
'copy'
>>> job.created
None
>>> job.state
None
Note
google.cloud.bigquery
generates a UUID for each job.- The
created
andstate
fields are not set until the job is submitted to the BigQuery back-end.
Then, begin executing the job on the server:
>>> job.begin() # API call
>>> job.created
datetime.datetime(2015, 7, 23, 9, 30, 20, 268260, tzinfo=<UTC>)
>>> job.state
'RUNNING'
Poll until the job is complete:
>>> import time
>>> retry_count = 100
>>> while retry_count > 0 and job.state != 'DONE':
... retry_count -= 1
... time.sleep(10)
... job.reload() # API call
>>> job.state
'done'
>>> job.ended
datetime.datetime(2015, 7, 23, 9, 30, 21, 334792, tzinfo=<UTC>)