Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bigframes/core/compile/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ class CompileResult:
sql: str
sql_schema: typing.Sequence[google.cloud.bigquery.SchemaField]
row_order: typing.Optional[ordering.RowOrdering]
encoded_type_refs: str
12 changes: 10 additions & 2 deletions bigframes/core/compile/ibis_compiler/ibis_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import bigframes.core.compile.concat as concat_impl
import bigframes.core.compile.configs as configs
import bigframes.core.compile.explode
from bigframes.core.logging import data_types as data_type_logger
import bigframes.core.nodes as nodes
import bigframes.core.ordering as bf_ordering
import bigframes.core.rewrite as rewrites
Expand Down Expand Up @@ -56,23 +57,30 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult:
)
if request.sort_rows:
result_node = cast(nodes.ResultNode, rewrites.column_pruning(result_node))
encoded_type_refs = data_type_logger.encode_type_refs(result_node)
sql = compile_result_node(result_node)
return configs.CompileResult(
sql, result_node.schema.to_bigquery(), result_node.order_by
sql,
result_node.schema.to_bigquery(),
result_node.order_by,
encoded_type_refs,
)

ordering: Optional[bf_ordering.RowOrdering] = result_node.order_by
result_node = dataclasses.replace(result_node, order_by=None)
result_node = cast(nodes.ResultNode, rewrites.column_pruning(result_node))
result_node = cast(nodes.ResultNode, rewrites.defer_selection(result_node))
encoded_type_refs = data_type_logger.encode_type_refs(result_node)
sql = compile_result_node(result_node)
# Return the ordering iff no extra columns are needed to define the row order
if ordering is not None:
output_order = (
ordering if ordering.referenced_columns.issubset(result_node.ids) else None
)
assert (not request.materialize_all_order_keys) or (output_order is not None)
return configs.CompileResult(sql, result_node.schema.to_bigquery(), output_order)
return configs.CompileResult(
sql, result_node.schema.to_bigquery(), output_order, encoded_type_refs
)


def _replace_unsupported_ops(node: nodes.BigFrameNode):
Expand Down
12 changes: 10 additions & 2 deletions bigframes/core/compile/sqlglot/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from bigframes.core.compile.sqlglot.expressions import typed_expr
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
from bigframes.core.logging import data_types as data_type_logger
import bigframes.core.ordering as bf_ordering
from bigframes.core.rewrite import schema_binding

Expand Down Expand Up @@ -65,9 +66,13 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult:
result_node = typing.cast(
nodes.ResultNode, rewrite.defer_selection(result_node)
)
encoded_type_refs = data_type_logger.encode_type_refs(result_node)
sql = _compile_result_node(result_node, uid_gen)
return configs.CompileResult(
sql, result_node.schema.to_bigquery(), result_node.order_by
sql,
result_node.schema.to_bigquery(),
result_node.order_by,
encoded_type_refs,
)

ordering: typing.Optional[bf_ordering.RowOrdering] = result_node.order_by
Expand All @@ -76,14 +81,17 @@ def compile_sql(request: configs.CompileRequest) -> configs.CompileResult:

result_node = _remap_variables(result_node, uid_gen)
result_node = typing.cast(nodes.ResultNode, rewrite.defer_selection(result_node))
encoded_type_refs = data_type_logger.encode_type_refs(result_node)
sql = _compile_result_node(result_node, uid_gen)
# Return the ordering iff no extra columns are needed to define the row order
if ordering is not None:
output_order = (
ordering if ordering.referenced_columns.issubset(result_node.ids) else None
)
assert (not request.materialize_all_order_keys) or (output_order is not None)
return configs.CompileResult(sql, result_node.schema.to_bigquery(), output_order)
return configs.CompileResult(
sql, result_node.schema.to_bigquery(), output_order, encoded_type_refs
)


def _remap_variables(
Expand Down
4 changes: 4 additions & 0 deletions bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ def _export_gbq(
clustering_fields=spec.cluster_cols if spec.cluster_cols else None,
)

# Attach data type usage to the job labels
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
# TODO(swast): plumb through the api_name of the user-facing api that
# caused this query.
iterator, job = self._run_execute_query(
Expand Down Expand Up @@ -661,6 +663,8 @@ def _execute_plan_gbq(
)
job_config.destination = destination_table

# Attach data type usage to the job labels
job_config.labels["bigframes-dtypes"] = compiled.encoded_type_refs
iterator, query_job = self._run_execute_query(
sql=compiled.sql,
job_config=job_config,
Expand Down
40 changes: 40 additions & 0 deletions tests/system/small/session/test_session_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest import mock

from bigframes.core.logging import data_types
import bigframes.session._io.bigquery as bq_io


def test_data_type_logging(scalars_df_index):
s = scalars_df_index["int64_col"] + 1.5

# We want to check the job_config passed to _query_and_wait_bigframes
with mock.patch(
"bigframes.session._io.bigquery.start_query_with_client",
wraps=bq_io.start_query_with_client,
) as mock_query:
s.to_pandas()

# Fetch job labels sent to the BQ client and verify their values
assert mock_query.called
call_args = mock_query.call_args
job_config = call_args.kwargs.get("job_config")
assert job_config is not None
job_labels = job_config.labels
assert "bigframes-dtypes" in job_labels
assert job_labels["bigframes-dtypes"] == data_types.encode_type_refs(
s._block._expr.node
)
Loading