Skip to content
13 changes: 12 additions & 1 deletion bigframes/_config/compute_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""Options for displaying objects."""

import dataclasses
from typing import Any, Dict, Optional
from typing import Any, Dict, Literal, Optional


@dataclasses.dataclass
Expand Down Expand Up @@ -140,6 +140,17 @@ class ComputeOptions:
int | None: Number of rows, if set.
"""

default_write_engine: Literal["bigquery_load", "bigquery_write"] = "bigquery_write"
"""
Sets the default write engine for uploadin local data to bigquery.

The two options are "bigquery_load" or "bigquery_write". "bigquery_write" is generally
preferred as it is faster, but "bigquery_load" may be used if bigquery write api is unavailable.

Returns:
str: "bigquery_load" or "bigquery_write"
"""

semantic_ops_confirmation_threshold: Optional[int] = 0
"""
Deprecated.
Expand Down
5 changes: 2 additions & 3 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,8 +981,7 @@ def read_pandas(
quota and your data cannot be embedded in SQL due to size or
data type limitations.
* "bigquery_write":
[Preview] Use the BigQuery Storage Write API. This feature
is in public preview.
Use the BigQuery Storage Write API.
Returns:
An equivalent bigframes.pandas.(DataFrame/Series/Index) object

Expand Down Expand Up @@ -1031,7 +1030,7 @@ def _read_pandas(
mem_usage = pandas_dataframe.memory_usage(deep=True).sum()
if write_engine == "default":
write_engine = (
"bigquery_load"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about https://docs.cloud.google.com/bigquery/docs/sandbox users. Maybe we could provide a global option for the default so that such users can switch back to load jobs (accepting the limitations for data types).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added config option in new version

bigframes.options.compute.default_write_engine
if mem_usage > bigframes.constants.MAX_INLINE_BYTES
else "bigquery_inline"
)
Expand Down
13 changes: 10 additions & 3 deletions bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,16 @@ def _upload_local_data(self, local_table: local_data.ManagedArrowTable):
# Might be better as a queue and a worker thread
with self._upload_lock:
if local_table not in self.cache._uploaded_local_data:
uploaded = self.loader.load_data(
local_table, bigframes.core.guid.generate_guid()
)
engine = bigframes.options.compute.default_write_engine
if engine == "bigquery_load":
uploaded = self.loader.load_data(
local_table, bigframes.core.guid.generate_guid()
)
else:
assert engine == "bigquery_write"
uploaded = self.loader.write_data(
local_table, bigframes.core.guid.generate_guid()
)
self.cache.cache_remote_replacement(local_table, uploaded)

def _execute_plan_gbq(
Expand Down
20 changes: 20 additions & 0 deletions tests/system/small/test_large_local_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,26 @@
large_dataframe.index = large_dataframe.index.astype("Int64")


@pytest.mark.parametrize(
("default_write_engine",),
[
pytest.param("bigquery_load"),
pytest.param("bigquery_write"),
],
)
def test_read_pandas_config_default_engine(
session: bigframes.Session, default_write_engine
):
pytest.importorskip("pandas", minversion="2.0.0")
with bigframes.option_context(
"compute.default_write_engine",
default_write_engine,
):
bf_df = session.read_pandas(large_dataframe)

assert_frame_equal(large_dataframe, bf_df.to_pandas())


def test_read_pandas_defer_noop(session: bigframes.Session):
pytest.importorskip("pandas", minversion="2.0.0")
bf_df = session.read_pandas(large_dataframe, write_engine="_deferred")
Expand Down
7 changes: 6 additions & 1 deletion tests/system/small/test_progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,18 @@ def test_progress_bar_extract_jobs(
def test_progress_bar_load_jobs(
session: bf.Session, penguins_pandas_df_default_index: pd.DataFrame, capsys
):

# repeat the DF to be big enough to trigger the load job.
df = penguins_pandas_df_default_index
while len(df) < MAX_INLINE_DF_BYTES:
df = pd.DataFrame(np.repeat(df.values, 2, axis=0))

# default write engine usually streaming, which doesn't have job
with bf.option_context(
"display.progress_bar", "terminal"
"display.progress_bar",
"terminal",
"compute.default_write_engine",
"bigquery_load",
), tempfile.TemporaryDirectory() as dir:
path = dir + "/test_read_csv_progress_bar*.csv"
df.to_csv(path, index=False)
Expand Down
Loading