Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloudbuild/zb-system-tests-cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ steps:
# Execute the script on the VM via SSH.
# Capture the exit code to ensure cleanup happens before the build fails.
set +e
gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh"
gcloud compute ssh ${_VM_NAME} --zone=${_ZONE} --internal-ip --ssh-key-file=/workspace/.ssh/google_compute_engine --command="ulimit -n {_ULIMIT}; COMMIT_SHA=${COMMIT_SHA} _ZONAL_BUCKET=${_ZONAL_BUCKET} CROSS_REGION_BUCKET=${_CROSS_REGION_BUCKET} _PR_NUMBER=${_PR_NUMBER} bash run_zonal_tests.sh"
EXIT_CODE=$?
set -e

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ def __init__(
self._read_id_to_download_ranges_id = {}
self._download_ranges_id_to_pending_read_ids = {}
self.persisted_size: Optional[int] = None # updated after opening the stream
self._open_retries: int = 0


async def __aenter__(self):
"""Opens the underlying bidi-gRPC connection to read from the object."""
Expand Down Expand Up @@ -257,13 +259,18 @@ async def open(
raise ValueError("Underlying bidi-gRPC stream is already open")

if retry_policy is None:
def on_error_wrapper(exc):
self._open_retries += 1
self._on_open_error(exc)

retry_policy = AsyncRetry(
predicate=_is_read_retryable, on_error=self._on_open_error
predicate=_is_read_retryable, on_error=on_error_wrapper
)
else:
original_on_error = retry_policy._on_error

def combined_on_error(exc):
self._open_retries += 1
self._on_open_error(exc)
if original_on_error:
original_on_error(exc)
Expand Down
46 changes: 46 additions & 0 deletions tests/system/test_zonal.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# TODO: replace this with a fixture once zonal bucket creation / deletion
# is supported in grpc client or json client client.
_ZONAL_BUCKET = os.getenv("ZONAL_BUCKET")
_CROSS_REGION_BUCKET = os.getenv("CROSS_REGION_BUCKET")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This will likely evaluate to None as the CROSS_REGION_BUCKET environment variable is not exported in the test execution environment. The cloudbuild.yaml configuration passes _CROSS_REGION_BUCKET, but the run_zonal_tests.sh script does not export it as CROSS_REGION_BUCKET for the pytest command (unlike ZONAL_BUCKET).

To fix this, you should add export CROSS_REGION_BUCKET=${_CROSS_REGION_BUCKET} to cloudbuild/run_zonal_tests.sh. This change is necessary for the new test to pass.

_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"


Expand Down Expand Up @@ -82,6 +83,51 @@ def _get_equal_dist(a: int, b: int) -> tuple[int, int]:
return a + step, a + 2 * step


@pytest.mark.parametrize(
"object_size",
[
256, # less than _chunk size
10 * 1024 * 1024, # less than _MAX_BUFFER_SIZE_BYTES
20 * 1024 * 1024, # greater than _MAX_BUFFER_SIZE
],
)
def test_basic_wrd_x_region(
storage_client,
blobs_to_delete,
object_size,
event_loop,
grpc_client,
):
Comment on lines +94 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This new test test_basic_wrd_x_region is nearly identical to the existing test_basic_wrd function. This significant code duplication can make the test suite harder to maintain.

To improve maintainability, consider refactoring this to avoid duplication. One approach would be to parameterize a single test function to handle both zonal and cross-region bucket cases, as well as the direct path option. For example, you could parameterize on a tuple containing (bucket_name, attempt_direct_path).

object_name = f"test_basic_wrd-{str(uuid.uuid4())}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For easier debugging and to avoid potential conflicts, it's good practice to make test object names specific to the test case. This helps identify which test created an object if cleanup fails.

Suggested change
object_name = f"test_basic_wrd-{str(uuid.uuid4())}"
object_name = f"test_basic_wrd_x_region-{str(uuid.uuid4())}"


async def _run():
object_data = os.urandom(object_size)
object_checksum = google_crc32c.value(object_data)

writer = AsyncAppendableObjectWriter(grpc_client, _CROSS_REGION_BUCKET, object_name)
await writer.open()
await writer.append(object_data)
object_metadata = await writer.close(finalize_on_close=True)
assert object_metadata.size == object_size
assert int(object_metadata.checksums.crc32c) == object_checksum

buffer = BytesIO()
mrd = AsyncMultiRangeDownloader(grpc_client, _CROSS_REGION_BUCKET, object_name)
async with mrd:
assert mrd._open_retries == 1
# (0, 0) means read the whole object
await mrd.download_ranges([(0, 0, buffer)])
assert mrd.persisted_size == object_size

assert buffer.getvalue() == object_data

# Clean up; use json client (i.e. `storage_client` fixture) to delete.
blobs_to_delete.append(storage_client.bucket(_CROSS_REGION_BUCKET).blob(object_name))
del writer
gc.collect()

event_loop.run_until_complete(_run())

@pytest.mark.parametrize(
"object_size",
[
Expand Down