Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion google/cloud/bigquery/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
from google.cloud._helpers import _RFC3339_MICROS
from google.cloud._helpers import _RFC3339_NO_FRACTION
from google.cloud._helpers import _to_bytes
from google.cloud.bigquery import enums

from google.auth import credentials as ga_credentials # type: ignore
from google.api_core import client_options as client_options_lib

Expand Down Expand Up @@ -253,7 +255,10 @@ def bytes_to_py(self, value, field):
return base64.standard_b64decode(_to_bytes(value))

def timestamp_to_py(self, value, field):
"""Coerce 'value' to a datetime, if set or not nullable."""
"""Coerce 'value' to a datetime, if set or not nullable. If timestamp
is of picosecond precision, preserve the string format."""
if field.timestamp_precision == enums.TimestampPrecision.PICOSECOND:
return value
if _not_null(value, field):
# value will be a integer in seconds, to microsecond precision, in UTC.
return _datetime_from_microseconds(int(value))
Expand Down
15 changes: 14 additions & 1 deletion google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import google.api_core.exceptions as core_exceptions
from google.api_core import retry as retries

from google.cloud.bigquery import enums
from google.cloud.bigquery import job
import google.cloud.bigquery.job.query
import google.cloud.bigquery.query
Expand Down Expand Up @@ -265,6 +266,7 @@ def _to_query_request(
query: str,
location: Optional[str] = None,
timeout: Optional[float] = None,
timestamp_precision: Optional[enums.TimestampPrecision] = None,
) -> Dict[str, Any]:
"""Transform from Job resource to QueryRequest resource.

Expand All @@ -290,6 +292,12 @@ def _to_query_request(
request_body.setdefault("formatOptions", {})
request_body["formatOptions"]["useInt64Timestamp"] = True # type: ignore

if timestamp_precision == enums.TimestampPrecision.PICOSECOND:
# Cannot specify both use_int64_timestamp and timestamp_output_format.
del request_body["formatOptions"]["useInt64Timestamp"]

request_body["formatOptions"]["timestampOutputFormat"] = "ISO8601_STRING"

if timeout is not None:
# Subtract a buffer for context switching, network latency, etc.
request_body["timeoutMs"] = max(0, int(1000 * timeout) - _TIMEOUT_BUFFER_MILLIS)
Expand Down Expand Up @@ -370,14 +378,19 @@ def query_jobs_query(
retry: retries.Retry,
timeout: Optional[float],
job_retry: Optional[retries.Retry],
timestamp_precision: Optional[enums.TimestampPrecision] = None,
) -> job.QueryJob:
"""Initiate a query using jobs.query with jobCreationMode=JOB_CREATION_REQUIRED.

See: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query
"""
path = _to_query_path(project)
request_body = _to_query_request(
query=query, job_config=job_config, location=location, timeout=timeout
query=query,
job_config=job_config,
location=location,
timeout=timeout,
timestamp_precision=timestamp_precision,
)

def do_query():
Expand Down
31 changes: 31 additions & 0 deletions google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3469,6 +3469,8 @@ def query(
timeout: TimeoutType = DEFAULT_TIMEOUT,
job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
api_method: Union[str, enums.QueryApiMethod] = enums.QueryApiMethod.INSERT,
*,
timestamp_precision: Optional[enums.TimestampPrecision] = None,
) -> job.QueryJob:
"""Run a SQL query.

Expand Down Expand Up @@ -3524,6 +3526,11 @@ def query(

See :class:`google.cloud.bigquery.enums.QueryApiMethod` for
details on the difference between the query start methods.
timestamp_precision (Optional[enums.TimestampPrecision]):
[Private Preview] If set to `enums.TimestampPrecision.PICOSECOND`,
timestamp columns of picosecond precision will be returned with
full precision. Otherwise, will truncate to microsecond
precision. Only applies when api_method == `enums.QueryApiMethod.QUERY`.

Returns:
google.cloud.bigquery.job.QueryJob: A new query job instance.
Expand All @@ -3543,6 +3550,15 @@ def query(
"`job_id` was provided, but the 'QUERY' `api_method` was requested."
)

if (
timestamp_precision == enums.TimestampPrecision.PICOSECOND
and api_method == enums.QueryApiMethod.INSERT
):
raise ValueError(
"Picosecond Timestamp is only supported when `api_method "
"== enums.QueryApiMethod.QUERY`."
)

if project is None:
project = self.project

Expand All @@ -3568,6 +3584,7 @@ def query(
retry,
timeout,
job_retry,
timestamp_precision=timestamp_precision,
)
elif api_method == enums.QueryApiMethod.INSERT:
return _job_helpers.query_jobs_insert(
Expand Down Expand Up @@ -4062,6 +4079,8 @@ def list_rows(
page_size: Optional[int] = None,
retry: retries.Retry = DEFAULT_RETRY,
timeout: TimeoutType = DEFAULT_TIMEOUT,
*,
timestamp_precision: Optional[enums.TimestampPrecision] = None,
) -> RowIterator:
"""List the rows of the table.

Expand Down Expand Up @@ -4110,6 +4129,11 @@ def list_rows(
before using ``retry``.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
timestamp_precision (Optional[enums.TimestampPrecision]):
[Private Preview] If set to `enums.TimestampPrecision.PICOSECOND`,
timestamp columns of picosecond precision will be returned with
full precision. Otherwise, will truncate to microsecond
precision.

Returns:
google.cloud.bigquery.table.RowIterator:
Expand Down Expand Up @@ -4144,6 +4168,13 @@ def list_rows(
params["startIndex"] = start_index

params["formatOptions.useInt64Timestamp"] = True

if timestamp_precision == enums.TimestampPrecision.PICOSECOND:
# Cannot specify both use_int64_timestamp and timestamp_output_format.
del params["formatOptions.useInt64Timestamp"]

params["formatOptions.timestampOutputFormat"] = "ISO8601_STRING"

row_iterator = RowIterator(
client=self,
api_request=functools.partial(self._call_api, retry, timeout=timeout),
Expand Down
30 changes: 30 additions & 0 deletions google/cloud/bigquery/job/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,36 @@ def column_name_character_map(self, value: Optional[str]):
value = ColumnNameCharacterMap.COLUMN_NAME_CHARACTER_MAP_UNSPECIFIED
self._set_sub_prop("columnNameCharacterMap", value)

@property
def timestamp_target_precision(self) -> Optional[List[int]]:
"""Optional[list[int]]: [Private Preview] Precisions (maximum number of
total digits in base 10) for seconds of TIMESTAMP types that are
allowed to the destination table for autodetection mode.

Available for the formats: CSV.

For the CSV Format, Possible values include:
None, [], or [6]: timestamp(6) for all auto detected TIMESTAMP
columns.
[6, 12]: timestamp(6) for all auto detected TIMESTAMP columns that
have less than 6 digits of subseconds. timestamp(12) for all auto
detected TIMESTAMP columns that have more than 6 digits of
subseconds.
[12]: timestamp(12) for all auto detected TIMESTAMP columns.

The order of the elements in this array is ignored. Inputs that have
higher precision than the highest target precision in this array will
be truncated.
"""
return self._get_sub_prop("timestampTargetPrecision")

@timestamp_target_precision.setter
def timestamp_target_precision(self, value: Optional[List[int]]):
if value is not None:
self._set_sub_prop("timestampTargetPrecision", value)
else:
self._del_sub_prop("timestampTargetPrecision")


class LoadJob(_AsyncJob):
"""Asynchronous job for loading data into a table.
Expand Down
3 changes: 3 additions & 0 deletions tests/data/pico.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
2025-01-01T00:00:00.123456789012Z
2025-01-02T00:00:00.123456789012Z
2025-01-03T00:00:00.123456789012Z
8 changes: 8 additions & 0 deletions tests/data/pico_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[
{
"name": "pico_col",
"type": "TIMESTAMP",
"mode": "NULLABLE",
"timestampPrecision": "12"
}
]
19 changes: 19 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,14 @@ def load_scalars_table(
data_path: str = "scalars.jsonl",
source_format=enums.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_source="scalars_schema.json",
timestamp_target_precision=None,
) -> str:
schema = bigquery_client.schema_from_json(DATA_DIR / schema_source)
table_id = data_path.replace(".", "_") + hex(random.randrange(1000000))
job_config = bigquery.LoadJobConfig()
job_config.schema = schema
job_config.source_format = source_format
job_config.timestamp_target_precision = timestamp_target_precision
full_table_id = f"{project_id}.{dataset_id}.{table_id}"
with open(DATA_DIR / data_path, "rb") as data_file:
job = bigquery_client.load_table_from_file(
Expand Down Expand Up @@ -169,6 +171,23 @@ def scalars_table_csv(
bigquery_client.delete_table(full_table_id, not_found_ok=True)


@pytest.fixture(scope="session")
def scalars_table_pico(
bigquery_client: bigquery.Client, project_id: str, dataset_id: str
):
full_table_id = load_scalars_table(
bigquery_client,
project_id,
dataset_id,
data_path="pico.csv",
source_format=enums.SourceFormat.CSV,
schema_source="pico_schema.json",
timestamp_target_precision=[12],
)
yield full_table_id
bigquery_client.delete_table(full_table_id, not_found_ok=True)


@pytest.fixture
def test_table_name(request, replace_non_anum=re.compile(r"[^a-zA-Z0-9_]").sub):
return replace_non_anum("_", request.node.name)
23 changes: 23 additions & 0 deletions tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,29 @@ def test_load_table_from_json_schema_autodetect_table_exists(self):
self.assertEqual(tuple(table.schema), table_schema)
self.assertEqual(table.num_rows, 2)

def test_load_table_from_csv_w_picosecond_timestamp(self):
dataset_id = _make_dataset_id("bq_system_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.load_table_from_json_basic_use".format(
Config.CLIENT.project, dataset_id
)

table_schema = Config.CLIENT.schema_from_json(DATA_PATH / "pico_schema.json")
# create the table before loading so that the column order is predictable
table = helpers.retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

# do not pass an explicit job config to trigger automatic schema detection
with open(DATA_PATH / "pico.csv", "rb") as f:
load_job = Config.CLIENT.load_table_from_file(f, table_id)
load_job.result()

table = Config.CLIENT.get_table(table)
self.assertEqual(list(table.schema), table_schema)
self.assertEqual(table.num_rows, 3)

def test_load_avro_from_uri_then_dump_table(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import SourceFormat
Expand Down
20 changes: 20 additions & 0 deletions tests/system/test_list_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,23 @@ def test_list_rows_range(bigquery_client: bigquery.Client, scalars_table_csv: st

row_null = rows[1]
assert row_null["range_date"] is None


def test_list_rows_pico(bigquery_client: bigquery.Client, scalars_table_pico: str):
rows = bigquery_client.list_rows(
scalars_table_pico, timestamp_precision=enums.TimestampPrecision.PICOSECOND
)
rows = list(rows)
row = rows[0]
assert row["pico_col"] == "2025-01-01T00:00:00.123456789012Z"


def test_list_rows_pico_truncate(
bigquery_client: bigquery.Client, scalars_table_pico: str
):
# For a picosecond timestamp column, if the user does not explicitly set
# timestamp_precision, will return truncated microsecond precision.
rows = bigquery_client.list_rows(scalars_table_pico)
rows = list(rows)
row = rows[0]
assert row["pico_col"] == "1735689600123456"
13 changes: 13 additions & 0 deletions tests/system/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pytest

from google.cloud import bigquery
from google.cloud.bigquery import enums
from google.cloud.bigquery.query import ArrayQueryParameter
from google.cloud.bigquery.query import ScalarQueryParameter
from google.cloud.bigquery.query import ScalarQueryParameterType
Expand Down Expand Up @@ -546,3 +547,15 @@ def test_session(bigquery_client: bigquery.Client, query_api_method: str):

assert len(rows) == 1
assert rows[0][0] == 5


def test_query_picosecond(bigquery_client: bigquery.Client):
job = bigquery_client.query(
"SELECT CAST('2025-10-20' AS TIMESTAMP(12));",
api_method="QUERY",
timestamp_precision=enums.TimestampPrecision.PICOSECOND,
)

result = job.result()
rows = list(result)
assert rows[0][0] == "2025-10-20T00:00:00.000000000000Z"
13 changes: 11 additions & 2 deletions tests/unit/_helpers/test_cell_data_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,17 +290,26 @@ def test_bytes_to_py_w_base64_encoded_text(object_under_test):
def test_timestamp_to_py_w_string_int_value(object_under_test):
from google.cloud._helpers import _EPOCH

coerced = object_under_test.timestamp_to_py("1234567", object())
coerced = object_under_test.timestamp_to_py("1234567", create_field())
assert coerced == _EPOCH + datetime.timedelta(seconds=1, microseconds=234567)


def test_timestamp_to_py_w_int_value(object_under_test):
from google.cloud._helpers import _EPOCH

coerced = object_under_test.timestamp_to_py(1234567, object())
coerced = object_under_test.timestamp_to_py(1234567, create_field())
assert coerced == _EPOCH + datetime.timedelta(seconds=1, microseconds=234567)


def test_timestamp_to_py_w_picosecond_precision(object_under_test):
from google.cloud.bigquery import enums

pico_schema = create_field(timestamp_precision=enums.TimestampPrecision.PICOSECOND)
pico_timestamp = "2025-01-01T00:00:00.123456789012Z"
coerced = object_under_test.timestamp_to_py(pico_timestamp, pico_schema)
assert coerced == pico_timestamp


def test_datetime_to_py_w_string_value(object_under_test):
coerced = object_under_test.datetime_to_py("2016-12-02T18:51:33", object())
assert coerced == datetime.datetime(2016, 12, 2, 18, 51, 33)
Expand Down
Loading