From 845b11009543408a95b233ef1d7f06f9ed0db7a5 Mon Sep 17 00:00:00 2001 From: Timofei Zubov Date: Tue, 5 Aug 2025 14:44:52 +0300 Subject: [PATCH 1/8] add custom grpc interceptor for dataproc wrapper without changing common code --- tests/test_dataproc_retry.py | 144 +++++++++++++++++++++ yandexcloud/_wrappers/dataproc/__init__.py | 73 +++++++++-- 2 files changed, 206 insertions(+), 11 deletions(-) create mode 100644 tests/test_dataproc_retry.py diff --git a/tests/test_dataproc_retry.py b/tests/test_dataproc_retry.py new file mode 100644 index 00000000..4cd7e764 --- /dev/null +++ b/tests/test_dataproc_retry.py @@ -0,0 +1,144 @@ +import pytest +import grpc +from unittest.mock import Mock, patch, MagicMock +import time + +from yandex.cloud.operation.operation_pb2 import Operation +from yandex.cloud.dataproc.v1.cluster_pb2 import Cluster +from yandex.cloud.dataproc.v1.cluster_service_pb2 import CreateClusterMetadata, CreateClusterRequest +from yandex.cloud.dataproc.v1.cluster_service_pb2_grpc import ClusterServiceStub +from yandex.cloud.operation.operation_service_pb2_grpc import OperationServiceStub +from yandexcloud._wrappers.dataproc import Dataproc, DataprocRetryInterceptor +from yandexcloud import SDK +import yandexcloud._operation_waiter as waiter_module + + +class MockOperationService: + def __init__(self, fail_attempts=0, fail_code=grpc.StatusCode.CANCELLED): + self.fail_attempts = fail_attempts + self.fail_code = fail_code + self.call_count = 0 + + def Get(self, request): + self.call_count += 1 + if self.call_count <= self.fail_attempts: + error = grpc.RpcError() + error._state = Mock() + error._state.code = self.fail_code + error.code = lambda: self.fail_code + raise error + return Operation(id="test-op", done=True) + + +class MockClusterService: + def Create(self, request): + return Operation(id="test-cluster-op", done=False) + + +class MockSDK: + def __init__(self): + self.client_calls = [] + + def client(self, service_class, interceptor=None): + self.client_calls.append((service_class, interceptor)) + if service_class == ClusterServiceStub: + return MockClusterService() + elif service_class == OperationServiceStub: + return MockOperationService() + + def create_operation_and_get_result(self, request, service, method_name, response_type, meta_type): + operation = Operation(id="test-op", done=False) + waiter = waiter_module.operation_waiter(self, operation.id, None) + for _ in waiter: + time.sleep(0.01) + return MockOperationResult() + + +class MockOperationResult: + def __init__(self): + self.response = Mock() + self.response.id = "test-cluster-id" + + +@pytest.fixture +def mock_sdk(): + return MockSDK() + + +def test_dataproc_custom_interceptor_max_attempts(mock_sdk): + dataproc = Dataproc(sdk=mock_sdk, enable_custom_interceptor=True) + + mock_operation_service = MockOperationService(fail_attempts=51, fail_code=grpc.StatusCode.CANCELLED) + + with patch.object(waiter_module, 'operation_waiter') as mock_waiter_fn: + mock_waiter = Mock() + mock_waiter.__iter__ = Mock(return_value=iter([])) + mock_waiter.operation = Operation(id="test", done=True) + mock_waiter_fn.return_value = mock_waiter + + with patch.object(mock_sdk, 'client') as mock_client: + mock_client.return_value = mock_operation_service + + with pytest.raises(grpc.RpcError) as exc_info: + dataproc.create_cluster( + folder_id="test-folder", + cluster_name="test-cluster", + subnet_id="test-subnet", + service_account_id="test-sa", + ssh_public_keys="test-ssh-key" + ) + + assert exc_info.value.code() == grpc.StatusCode.CANCELLED + assert mock_operation_service.call_count <= 50 + + +def test_dataproc_interceptor_inheritance(): + interceptor = DataprocRetryInterceptor( + max_retry_count=10, + retriable_codes=(grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE) + ) + + assert interceptor._RetryInterceptor__is_retriable(grpc.StatusCode.CANCELLED) == True + + assert interceptor._RetryInterceptor__is_retriable(grpc.StatusCode.UNAVAILABLE) == True + + assert interceptor._RetryInterceptor__is_retriable(grpc.StatusCode.PERMISSION_DENIED) == False + + + +def test_dataproc_monkey_patch_restoration(): + mock_sdk = Mock() + original_waiter = waiter_module.operation_waiter + + dataproc = Dataproc(sdk=mock_sdk, enable_custom_interceptor=True) + + with patch.object(mock_sdk, 'create_operation_and_get_result') as mock_create: + mock_create.return_value = MockOperationResult() + + result = dataproc.delete_cluster(cluster_id="test-cluster-id") + + assert result is not None + + assert waiter_module.operation_waiter == original_waiter + +def test_dataproc_all_methods_use_wrapper(mock_sdk): + dataproc = Dataproc(sdk=mock_sdk, enable_custom_interceptor=True) + + methods_to_test = [ + ('create_cluster', {'folder_id': 'test', 'cluster_name': 'test', 'subnet_id': 'test', 'service_account_id': 'test', 'ssh_public_keys': 'test-ssh-key'}), + ('delete_cluster', {'cluster_id': 'test'}), + ('stop_cluster', {'cluster_id': 'test'}), + ('start_cluster', {'cluster_id': 'test'}), + ] + + with patch.object(dataproc, '_with_dataproc_waiter') as mock_wrapper: + mock_wrapper.return_value = MockOperationResult() + + for method_name, kwargs in methods_to_test: + method = getattr(dataproc, method_name) + method(**kwargs) + + assert mock_wrapper.called + mock_wrapper.reset_mock() + + diff --git a/yandexcloud/_wrappers/dataproc/__init__.py b/yandexcloud/_wrappers/dataproc/__init__.py index 6b39709e..f3b2dda6 100644 --- a/yandexcloud/_wrappers/dataproc/__init__.py +++ b/yandexcloud/_wrappers/dataproc/__init__.py @@ -3,6 +3,7 @@ import logging import random from typing import Iterable, NamedTuple +import grpc from google.protobuf.field_mask_pb2 import FieldMask @@ -16,6 +17,10 @@ import yandex.cloud.dataproc.v1.subcluster_pb2 as subcluster_pb import yandex.cloud.dataproc.v1.subcluster_service_pb2 as subcluster_service_pb import yandex.cloud.dataproc.v1.subcluster_service_pb2_grpc as subcluster_service_grpc_pb +from yandex.cloud.operation.operation_service_pb2_grpc import OperationServiceStub +import yandexcloud._operation_waiter as waiter_module +from yandexcloud._backoff import backoff_exponential_jittered_min_interval +from yandexcloud._retry_interceptor import RetryInterceptor class InitializationAction(NamedTuple): @@ -31,6 +36,29 @@ def to_grpc(self): ) +class DataprocRetryInterceptor(RetryInterceptor): + def _RetryInterceptor__is_retriable(self, error: "grpc.StatusCode") -> bool: + if error in self._RetryInterceptor__retriable_codes: + return True + + return False + + +def create_dataproc_operation_waiter(sdk, operation_id, timeout): + retry_interceptor = DataprocRetryInterceptor( + max_retry_count=50, + retriable_codes=( + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.CANCELLED, + ), + back_off_func=backoff_exponential_jittered_min_interval(), + ) + operation_service = sdk.client(OperationServiceStub, interceptor=retry_interceptor) + return waiter_module.OperationWaiter(operation_id, operation_service, timeout) + + class Dataproc: """ A base hook for Yandex.Cloud Data Proc. @@ -45,7 +73,7 @@ class Dataproc: :type sdk: yandexcloud.SDK """ - def __init__(self, default_folder_id=None, default_public_ssh_key=None, logger=None, sdk=None): + def __init__(self, default_folder_id=None, default_public_ssh_key=None, logger=None, sdk=None, enable_custom_interceptor=False): self.sdk = sdk or self.sdk self.log = logger if not self.log: @@ -55,6 +83,19 @@ def __init__(self, default_folder_id=None, default_public_ssh_key=None, logger=N self.subnet_id = None self.default_folder_id = default_folder_id self.default_public_ssh_key = default_public_ssh_key + self._enable_custom_interceptor = enable_custom_interceptor + + def _with_dataproc_waiter(self, func, *args, **kwargs): + if not self._enable_custom_interceptor: + return func(*args, **kwargs) + + original_waiter = waiter_module.operation_waiter + waiter_module.operation_waiter = create_dataproc_operation_waiter + + try: + return func(*args, **kwargs) + finally: + waiter_module.operation_waiter = original_waiter def create_cluster( self, @@ -313,7 +354,8 @@ def create_cluster( log_group_id=log_group_id, labels=labels, ) - result = self.sdk.create_operation_and_get_result( + result = self._with_dataproc_waiter( + self.sdk.create_operation_and_get_result, request, service=cluster_service_grpc_pb.ClusterServiceStub, method_name="Create", @@ -427,7 +469,8 @@ def create_subcluster( hosts_count=hosts_count, autoscaling_config=autoscaling_config, ) - return self.sdk.create_operation_and_get_result( + return self._with_dataproc_waiter( + self.sdk.create_operation_and_get_result, request, service=subcluster_service_grpc_pb.SubclusterServiceStub, method_name="Create", @@ -455,7 +498,8 @@ def update_cluster_description(self, description, cluster_id=None): update_mask=mask, description=description, ) - return self.sdk.create_operation_and_get_result( + return self._with_dataproc_waiter( + self.sdk.create_operation_and_get_result, request, service=cluster_service_grpc_pb.ClusterServiceStub, method_name="Update", @@ -475,7 +519,8 @@ def delete_cluster(self, cluster_id=None): self.log.info("Deleting cluster %s", cluster_id) request = cluster_service_pb.DeleteClusterRequest(cluster_id=cluster_id) - return self.sdk.create_operation_and_get_result( + return self._with_dataproc_waiter( + self.sdk.create_operation_and_get_result, request, service=cluster_service_grpc_pb.ClusterServiceStub, method_name="Delete", @@ -497,7 +542,8 @@ def stop_cluster(self, cluster_id=None, decommission_timeout=0): request = cluster_service_pb.StopClusterRequest( cluster_id=cluster_id, decommission_timeout=decommission_timeout ) - return self.sdk.create_operation_and_get_result( + return self._with_dataproc_waiter( + self.sdk.create_operation_and_get_result, request, service=cluster_service_grpc_pb.ClusterServiceStub, method_name="Stop", @@ -514,7 +560,8 @@ def start_cluster(self, cluster_id=None): if not cluster_id: raise RuntimeError("Cluster id must be specified.") request = cluster_service_pb.StartClusterRequest(cluster_id=cluster_id) - return self.sdk.create_operation_and_get_result( + return self._with_dataproc_waiter( + self.sdk.create_operation_and_get_result, request, service=cluster_service_grpc_pb.ClusterServiceStub, method_name="Start", @@ -575,7 +622,8 @@ def create_hive_job( name=name, hive_job=hive_job, ) - return self.sdk.create_operation_and_get_result( + return self._with_dataproc_waiter( + self.sdk.create_operation_and_get_result, request, service=job_service_grpc_pb.JobServiceStub, method_name="Create", @@ -637,7 +685,8 @@ def create_mapreduce_job( properties=properties, ), ) - return self.sdk.create_operation_and_get_result( + return self._with_dataproc_waiter( + self.sdk.create_operation_and_get_result, request, service=job_service_grpc_pb.JobServiceStub, method_name="Create", @@ -712,7 +761,8 @@ def create_spark_job( exclude_packages=exclude_packages, ), ) - return self.sdk.create_operation_and_get_result( + return self._with_dataproc_waiter( + self.sdk.create_operation_and_get_result, request, service=job_service_grpc_pb.JobServiceStub, method_name="Create", @@ -786,7 +836,8 @@ def create_pyspark_job( exclude_packages=exclude_packages, ), ) - return self.sdk.create_operation_and_get_result( + return self._with_dataproc_waiter( + self.sdk.create_operation_and_get_result, request, service=job_service_grpc_pb.JobServiceStub, method_name="Create", From bc5a99b3d84f68a3d7b93f8a96af7334b659908e Mon Sep 17 00:00:00 2001 From: Timofei Zubov Date: Tue, 5 Aug 2025 14:45:23 +0300 Subject: [PATCH 2/8] style --- tests/test_dataproc_retry.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/tests/test_dataproc_retry.py b/tests/test_dataproc_retry.py index 4cd7e764..18af0aa9 100644 --- a/tests/test_dataproc_retry.py +++ b/tests/test_dataproc_retry.py @@ -18,7 +18,7 @@ def __init__(self, fail_attempts=0, fail_code=grpc.StatusCode.CANCELLED): self.fail_attempts = fail_attempts self.fail_code = fail_code self.call_count = 0 - + def Get(self, request): self.call_count += 1 if self.call_count <= self.fail_attempts: @@ -38,14 +38,14 @@ def Create(self, request): class MockSDK: def __init__(self): self.client_calls = [] - + def client(self, service_class, interceptor=None): self.client_calls.append((service_class, interceptor)) if service_class == ClusterServiceStub: return MockClusterService() elif service_class == OperationServiceStub: return MockOperationService() - + def create_operation_and_get_result(self, request, service, method_name, response_type, meta_type): operation = Operation(id="test-op", done=False) waiter = waiter_module.operation_waiter(self, operation.id, None) @@ -67,18 +67,18 @@ def mock_sdk(): def test_dataproc_custom_interceptor_max_attempts(mock_sdk): dataproc = Dataproc(sdk=mock_sdk, enable_custom_interceptor=True) - + mock_operation_service = MockOperationService(fail_attempts=51, fail_code=grpc.StatusCode.CANCELLED) - + with patch.object(waiter_module, 'operation_waiter') as mock_waiter_fn: mock_waiter = Mock() mock_waiter.__iter__ = Mock(return_value=iter([])) mock_waiter.operation = Operation(id="test", done=True) mock_waiter_fn.return_value = mock_waiter - + with patch.object(mock_sdk, 'client') as mock_client: mock_client.return_value = mock_operation_service - + with pytest.raises(grpc.RpcError) as exc_info: dataproc.create_cluster( folder_id="test-folder", @@ -97,11 +97,11 @@ def test_dataproc_interceptor_inheritance(): max_retry_count=10, retriable_codes=(grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE) ) - + assert interceptor._RetryInterceptor__is_retriable(grpc.StatusCode.CANCELLED) == True - + assert interceptor._RetryInterceptor__is_retriable(grpc.StatusCode.UNAVAILABLE) == True - + assert interceptor._RetryInterceptor__is_retriable(grpc.StatusCode.PERMISSION_DENIED) == False @@ -123,21 +123,21 @@ def test_dataproc_monkey_patch_restoration(): def test_dataproc_all_methods_use_wrapper(mock_sdk): dataproc = Dataproc(sdk=mock_sdk, enable_custom_interceptor=True) - + methods_to_test = [ ('create_cluster', {'folder_id': 'test', 'cluster_name': 'test', 'subnet_id': 'test', 'service_account_id': 'test', 'ssh_public_keys': 'test-ssh-key'}), ('delete_cluster', {'cluster_id': 'test'}), ('stop_cluster', {'cluster_id': 'test'}), ('start_cluster', {'cluster_id': 'test'}), ] - + with patch.object(dataproc, '_with_dataproc_waiter') as mock_wrapper: mock_wrapper.return_value = MockOperationResult() - + for method_name, kwargs in methods_to_test: method = getattr(dataproc, method_name) method(**kwargs) - + assert mock_wrapper.called mock_wrapper.reset_mock() From 6434003707e1f57f3f371813d330ae2433c930c9 Mon Sep 17 00:00:00 2001 From: Timofei Zubov Date: Tue, 5 Aug 2025 14:55:07 +0300 Subject: [PATCH 3/8] lint --- tests/test_dataproc_retry.py | 52 +++++++++++++--------- yandexcloud/_wrappers/dataproc/__init__.py | 13 ++++-- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/tests/test_dataproc_retry.py b/tests/test_dataproc_retry.py index 18af0aa9..39c4dfbe 100644 --- a/tests/test_dataproc_retry.py +++ b/tests/test_dataproc_retry.py @@ -1,16 +1,20 @@ -import pytest -import grpc -from unittest.mock import Mock, patch, MagicMock import time +from unittest.mock import MagicMock, Mock, patch -from yandex.cloud.operation.operation_pb2 import Operation +import grpc +import pytest + +import yandexcloud._operation_waiter as waiter_module from yandex.cloud.dataproc.v1.cluster_pb2 import Cluster -from yandex.cloud.dataproc.v1.cluster_service_pb2 import CreateClusterMetadata, CreateClusterRequest +from yandex.cloud.dataproc.v1.cluster_service_pb2 import ( + CreateClusterMetadata, + CreateClusterRequest, +) from yandex.cloud.dataproc.v1.cluster_service_pb2_grpc import ClusterServiceStub +from yandex.cloud.operation.operation_pb2 import Operation from yandex.cloud.operation.operation_service_pb2_grpc import OperationServiceStub -from yandexcloud._wrappers.dataproc import Dataproc, DataprocRetryInterceptor from yandexcloud import SDK -import yandexcloud._operation_waiter as waiter_module +from yandexcloud._wrappers.dataproc import Dataproc, DataprocRetryInterceptor class MockOperationService: @@ -70,13 +74,13 @@ def test_dataproc_custom_interceptor_max_attempts(mock_sdk): mock_operation_service = MockOperationService(fail_attempts=51, fail_code=grpc.StatusCode.CANCELLED) - with patch.object(waiter_module, 'operation_waiter') as mock_waiter_fn: + with patch.object(waiter_module, "operation_waiter") as mock_waiter_fn: mock_waiter = Mock() mock_waiter.__iter__ = Mock(return_value=iter([])) mock_waiter.operation = Operation(id="test", done=True) mock_waiter_fn.return_value = mock_waiter - with patch.object(mock_sdk, 'client') as mock_client: + with patch.object(mock_sdk, "client") as mock_client: mock_client.return_value = mock_operation_service with pytest.raises(grpc.RpcError) as exc_info: @@ -85,7 +89,7 @@ def test_dataproc_custom_interceptor_max_attempts(mock_sdk): cluster_name="test-cluster", subnet_id="test-subnet", service_account_id="test-sa", - ssh_public_keys="test-ssh-key" + ssh_public_keys="test-ssh-key", ) assert exc_info.value.code() == grpc.StatusCode.CANCELLED @@ -94,8 +98,7 @@ def test_dataproc_custom_interceptor_max_attempts(mock_sdk): def test_dataproc_interceptor_inheritance(): interceptor = DataprocRetryInterceptor( - max_retry_count=10, - retriable_codes=(grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE) + max_retry_count=10, retriable_codes=(grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE) ) assert interceptor._RetryInterceptor__is_retriable(grpc.StatusCode.CANCELLED) == True @@ -105,14 +108,13 @@ def test_dataproc_interceptor_inheritance(): assert interceptor._RetryInterceptor__is_retriable(grpc.StatusCode.PERMISSION_DENIED) == False - def test_dataproc_monkey_patch_restoration(): mock_sdk = Mock() original_waiter = waiter_module.operation_waiter dataproc = Dataproc(sdk=mock_sdk, enable_custom_interceptor=True) - with patch.object(mock_sdk, 'create_operation_and_get_result') as mock_create: + with patch.object(mock_sdk, "create_operation_and_get_result") as mock_create: mock_create.return_value = MockOperationResult() result = dataproc.delete_cluster(cluster_id="test-cluster-id") @@ -121,17 +123,27 @@ def test_dataproc_monkey_patch_restoration(): assert waiter_module.operation_waiter == original_waiter + def test_dataproc_all_methods_use_wrapper(mock_sdk): dataproc = Dataproc(sdk=mock_sdk, enable_custom_interceptor=True) methods_to_test = [ - ('create_cluster', {'folder_id': 'test', 'cluster_name': 'test', 'subnet_id': 'test', 'service_account_id': 'test', 'ssh_public_keys': 'test-ssh-key'}), - ('delete_cluster', {'cluster_id': 'test'}), - ('stop_cluster', {'cluster_id': 'test'}), - ('start_cluster', {'cluster_id': 'test'}), + ( + "create_cluster", + { + "folder_id": "test", + "cluster_name": "test", + "subnet_id": "test", + "service_account_id": "test", + "ssh_public_keys": "test-ssh-key", + }, + ), + ("delete_cluster", {"cluster_id": "test"}), + ("stop_cluster", {"cluster_id": "test"}), + ("start_cluster", {"cluster_id": "test"}), ] - with patch.object(dataproc, '_with_dataproc_waiter') as mock_wrapper: + with patch.object(dataproc, "_with_dataproc_waiter") as mock_wrapper: mock_wrapper.return_value = MockOperationResult() for method_name, kwargs in methods_to_test: @@ -140,5 +152,3 @@ def test_dataproc_all_methods_use_wrapper(mock_sdk): assert mock_wrapper.called mock_wrapper.reset_mock() - - diff --git a/yandexcloud/_wrappers/dataproc/__init__.py b/yandexcloud/_wrappers/dataproc/__init__.py index f3b2dda6..19551f87 100644 --- a/yandexcloud/_wrappers/dataproc/__init__.py +++ b/yandexcloud/_wrappers/dataproc/__init__.py @@ -3,8 +3,8 @@ import logging import random from typing import Iterable, NamedTuple -import grpc +import grpc from google.protobuf.field_mask_pb2 import FieldMask import yandex.cloud.dataproc.v1.cluster_pb2 as cluster_pb @@ -17,8 +17,8 @@ import yandex.cloud.dataproc.v1.subcluster_pb2 as subcluster_pb import yandex.cloud.dataproc.v1.subcluster_service_pb2 as subcluster_service_pb import yandex.cloud.dataproc.v1.subcluster_service_pb2_grpc as subcluster_service_grpc_pb -from yandex.cloud.operation.operation_service_pb2_grpc import OperationServiceStub import yandexcloud._operation_waiter as waiter_module +from yandex.cloud.operation.operation_service_pb2_grpc import OperationServiceStub from yandexcloud._backoff import backoff_exponential_jittered_min_interval from yandexcloud._retry_interceptor import RetryInterceptor @@ -73,7 +73,14 @@ class Dataproc: :type sdk: yandexcloud.SDK """ - def __init__(self, default_folder_id=None, default_public_ssh_key=None, logger=None, sdk=None, enable_custom_interceptor=False): + def __init__( + self, + default_folder_id=None, + default_public_ssh_key=None, + logger=None, + sdk=None, + enable_custom_interceptor=False, + ): self.sdk = sdk or self.sdk self.log = logger if not self.log: From 54d04abcfba3a37902d363fd953eec747783c193 Mon Sep 17 00:00:00 2001 From: Timofei Zubov Date: Wed, 6 Aug 2025 11:48:03 +0300 Subject: [PATCH 4/8] patch --- yandexcloud/_wrappers/dataproc/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yandexcloud/_wrappers/dataproc/__init__.py b/yandexcloud/_wrappers/dataproc/__init__.py index 19551f87..ac4cfc5b 100644 --- a/yandexcloud/_wrappers/dataproc/__init__.py +++ b/yandexcloud/_wrappers/dataproc/__init__.py @@ -37,7 +37,7 @@ def to_grpc(self): class DataprocRetryInterceptor(RetryInterceptor): - def _RetryInterceptor__is_retriable(self, error: "grpc.StatusCode") -> bool: + def _RetryInterceptor__is_retriable(self, error: grpc.StatusCode) -> bool: if error in self._RetryInterceptor__retriable_codes: return True From b90e6ab274b6faf518e88e11b9d69314ed3ddf81 Mon Sep 17 00:00:00 2001 From: Timofei Zubov Date: Wed, 6 Aug 2025 11:55:08 +0300 Subject: [PATCH 5/8] pylint ignore --- yandexcloud/_wrappers/dataproc/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/yandexcloud/_wrappers/dataproc/__init__.py b/yandexcloud/_wrappers/dataproc/__init__.py index ac4cfc5b..a0a6ebdf 100644 --- a/yandexcloud/_wrappers/dataproc/__init__.py +++ b/yandexcloud/_wrappers/dataproc/__init__.py @@ -37,6 +37,7 @@ def to_grpc(self): class DataprocRetryInterceptor(RetryInterceptor): + # pylint: disable-next=invalid-name def _RetryInterceptor__is_retriable(self, error: grpc.StatusCode) -> bool: if error in self._RetryInterceptor__retriable_codes: return True From 86eab98450dafb8d2943e3e6e62ec7e427524fd7 Mon Sep 17 00:00:00 2001 From: Timofei Zubov Date: Wed, 6 Aug 2025 12:26:15 +0300 Subject: [PATCH 6/8] add interceptor settings --- tests/test_dataproc_retry.py | 44 ++++++++++++++++------ yandexcloud/_wrappers/dataproc/__init__.py | 44 ++++++++++++---------- 2 files changed, 58 insertions(+), 30 deletions(-) diff --git a/tests/test_dataproc_retry.py b/tests/test_dataproc_retry.py index 39c4dfbe..88b66020 100644 --- a/tests/test_dataproc_retry.py +++ b/tests/test_dataproc_retry.py @@ -1,20 +1,15 @@ import time -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import Mock, patch import grpc import pytest +from yandexcloud._backoff import backoff_exponential_jittered_min_interval import yandexcloud._operation_waiter as waiter_module -from yandex.cloud.dataproc.v1.cluster_pb2 import Cluster -from yandex.cloud.dataproc.v1.cluster_service_pb2 import ( - CreateClusterMetadata, - CreateClusterRequest, -) from yandex.cloud.dataproc.v1.cluster_service_pb2_grpc import ClusterServiceStub from yandex.cloud.operation.operation_pb2 import Operation from yandex.cloud.operation.operation_service_pb2_grpc import OperationServiceStub -from yandexcloud import SDK -from yandexcloud._wrappers.dataproc import Dataproc, DataprocRetryInterceptor +from yandexcloud._wrappers.dataproc import Dataproc, DataprocRetryInterceptor, InterceptorSettings class MockOperationService: @@ -70,7 +65,16 @@ def mock_sdk(): def test_dataproc_custom_interceptor_max_attempts(mock_sdk): - dataproc = Dataproc(sdk=mock_sdk, enable_custom_interceptor=True) + dataproc = Dataproc(sdk=mock_sdk, interceptor_settings=InterceptorSettings( + max_retry_count=50, + retriable_codes=( + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.CANCELLED, + ), + back_off_func=backoff_exponential_jittered_min_interval() + )) mock_operation_service = MockOperationService(fail_attempts=51, fail_code=grpc.StatusCode.CANCELLED) @@ -112,7 +116,16 @@ def test_dataproc_monkey_patch_restoration(): mock_sdk = Mock() original_waiter = waiter_module.operation_waiter - dataproc = Dataproc(sdk=mock_sdk, enable_custom_interceptor=True) + dataproc = Dataproc(sdk=mock_sdk, interceptor_settings=InterceptorSettings( + max_retry_count=50, + retriable_codes=( + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.CANCELLED, + ), + back_off_func=backoff_exponential_jittered_min_interval() + )) with patch.object(mock_sdk, "create_operation_and_get_result") as mock_create: mock_create.return_value = MockOperationResult() @@ -125,7 +138,16 @@ def test_dataproc_monkey_patch_restoration(): def test_dataproc_all_methods_use_wrapper(mock_sdk): - dataproc = Dataproc(sdk=mock_sdk, enable_custom_interceptor=True) + dataproc = Dataproc(sdk=mock_sdk, interceptor_settings=InterceptorSettings( + max_retry_count=50, + retriable_codes=( + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.CANCELLED, + ), + back_off_func=backoff_exponential_jittered_min_interval() + )) methods_to_test = [ ( diff --git a/yandexcloud/_wrappers/dataproc/__init__.py b/yandexcloud/_wrappers/dataproc/__init__.py index a0a6ebdf..63a7ec09 100644 --- a/yandexcloud/_wrappers/dataproc/__init__.py +++ b/yandexcloud/_wrappers/dataproc/__init__.py @@ -2,7 +2,7 @@ # mypy: ignore-errors import logging import random -from typing import Iterable, NamedTuple +from typing import Callable, Iterable, NamedTuple import grpc from google.protobuf.field_mask_pb2 import FieldMask @@ -19,7 +19,6 @@ import yandex.cloud.dataproc.v1.subcluster_service_pb2_grpc as subcluster_service_grpc_pb import yandexcloud._operation_waiter as waiter_module from yandex.cloud.operation.operation_service_pb2_grpc import OperationServiceStub -from yandexcloud._backoff import backoff_exponential_jittered_min_interval from yandexcloud._retry_interceptor import RetryInterceptor @@ -36,6 +35,12 @@ def to_grpc(self): ) +class InterceptorSettings(NamedTuple): + max_retry_count: int # Maximum number of retries + retriable_codes: Iterable[grpc.StatusCode] # Retriable error codes + back_off_func: Callable[[int], float] # Backoff function + + class DataprocRetryInterceptor(RetryInterceptor): # pylint: disable-next=invalid-name def _RetryInterceptor__is_retriable(self, error: grpc.StatusCode) -> bool: @@ -45,19 +50,17 @@ def _RetryInterceptor__is_retriable(self, error: grpc.StatusCode) -> bool: return False -def create_dataproc_operation_waiter(sdk, operation_id, timeout): - retry_interceptor = DataprocRetryInterceptor( - max_retry_count=50, - retriable_codes=( - grpc.StatusCode.UNAVAILABLE, - grpc.StatusCode.RESOURCE_EXHAUSTED, - grpc.StatusCode.INTERNAL, - grpc.StatusCode.CANCELLED, - ), - back_off_func=backoff_exponential_jittered_min_interval(), - ) - operation_service = sdk.client(OperationServiceStub, interceptor=retry_interceptor) - return waiter_module.OperationWaiter(operation_id, operation_service, timeout) +def create_custom_operation_waiter(interceptor_settings: InterceptorSettings): + def custom_operation_waiter(sdk, operation_id, timeout): + retry_interceptor = DataprocRetryInterceptor( + max_retry_count=interceptor_settings.max_retry_count, + retriable_codes=interceptor_settings.retriable_codes, + back_off_func=interceptor_settings.back_off_func, + ) + operation_service = sdk.client(OperationServiceStub, interceptor=retry_interceptor) + return waiter_module.OperationWaiter(operation_id, operation_service, timeout) + + return custom_operation_waiter class Dataproc: @@ -80,7 +83,7 @@ def __init__( default_public_ssh_key=None, logger=None, sdk=None, - enable_custom_interceptor=False, + interceptor_settings=None, ): self.sdk = sdk or self.sdk self.log = logger @@ -91,14 +94,17 @@ def __init__( self.subnet_id = None self.default_folder_id = default_folder_id self.default_public_ssh_key = default_public_ssh_key - self._enable_custom_interceptor = enable_custom_interceptor + self._custom_operation_waiter = None + if interceptor_settings: + self._custom_operation_waiter = create_custom_operation_waiter(interceptor_settings) + def _with_dataproc_waiter(self, func, *args, **kwargs): - if not self._enable_custom_interceptor: + if not self._custom_operation_waiter: return func(*args, **kwargs) original_waiter = waiter_module.operation_waiter - waiter_module.operation_waiter = create_dataproc_operation_waiter + waiter_module.operation_waiter = self._custom_operation_waiter try: return func(*args, **kwargs) From 1de35c1f84fa434a2fca7f856c2f30097f250aba Mon Sep 17 00:00:00 2001 From: Timofei Zubov Date: Wed, 6 Aug 2025 12:30:20 +0300 Subject: [PATCH 7/8] format --- tests/test_dataproc_retry.py | 71 +++++++++++++--------- yandexcloud/_wrappers/dataproc/__init__.py | 3 +- 2 files changed, 43 insertions(+), 31 deletions(-) diff --git a/tests/test_dataproc_retry.py b/tests/test_dataproc_retry.py index 88b66020..117af9c0 100644 --- a/tests/test_dataproc_retry.py +++ b/tests/test_dataproc_retry.py @@ -4,12 +4,16 @@ import grpc import pytest -from yandexcloud._backoff import backoff_exponential_jittered_min_interval import yandexcloud._operation_waiter as waiter_module from yandex.cloud.dataproc.v1.cluster_service_pb2_grpc import ClusterServiceStub from yandex.cloud.operation.operation_pb2 import Operation from yandex.cloud.operation.operation_service_pb2_grpc import OperationServiceStub -from yandexcloud._wrappers.dataproc import Dataproc, DataprocRetryInterceptor, InterceptorSettings +from yandexcloud._backoff import backoff_exponential_jittered_min_interval +from yandexcloud._wrappers.dataproc import ( + Dataproc, + DataprocRetryInterceptor, + InterceptorSettings, +) class MockOperationService: @@ -65,16 +69,19 @@ def mock_sdk(): def test_dataproc_custom_interceptor_max_attempts(mock_sdk): - dataproc = Dataproc(sdk=mock_sdk, interceptor_settings=InterceptorSettings( - max_retry_count=50, - retriable_codes=( - grpc.StatusCode.UNAVAILABLE, - grpc.StatusCode.RESOURCE_EXHAUSTED, - grpc.StatusCode.INTERNAL, - grpc.StatusCode.CANCELLED, + dataproc = Dataproc( + sdk=mock_sdk, + interceptor_settings=InterceptorSettings( + max_retry_count=50, + retriable_codes=( + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.CANCELLED, + ), + back_off_func=backoff_exponential_jittered_min_interval(), ), - back_off_func=backoff_exponential_jittered_min_interval() - )) + ) mock_operation_service = MockOperationService(fail_attempts=51, fail_code=grpc.StatusCode.CANCELLED) @@ -116,16 +123,19 @@ def test_dataproc_monkey_patch_restoration(): mock_sdk = Mock() original_waiter = waiter_module.operation_waiter - dataproc = Dataproc(sdk=mock_sdk, interceptor_settings=InterceptorSettings( - max_retry_count=50, - retriable_codes=( - grpc.StatusCode.UNAVAILABLE, - grpc.StatusCode.RESOURCE_EXHAUSTED, - grpc.StatusCode.INTERNAL, - grpc.StatusCode.CANCELLED, + dataproc = Dataproc( + sdk=mock_sdk, + interceptor_settings=InterceptorSettings( + max_retry_count=50, + retriable_codes=( + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.CANCELLED, + ), + back_off_func=backoff_exponential_jittered_min_interval(), ), - back_off_func=backoff_exponential_jittered_min_interval() - )) + ) with patch.object(mock_sdk, "create_operation_and_get_result") as mock_create: mock_create.return_value = MockOperationResult() @@ -138,16 +148,19 @@ def test_dataproc_monkey_patch_restoration(): def test_dataproc_all_methods_use_wrapper(mock_sdk): - dataproc = Dataproc(sdk=mock_sdk, interceptor_settings=InterceptorSettings( - max_retry_count=50, - retriable_codes=( - grpc.StatusCode.UNAVAILABLE, - grpc.StatusCode.RESOURCE_EXHAUSTED, - grpc.StatusCode.INTERNAL, - grpc.StatusCode.CANCELLED, + dataproc = Dataproc( + sdk=mock_sdk, + interceptor_settings=InterceptorSettings( + max_retry_count=50, + retriable_codes=( + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.RESOURCE_EXHAUSTED, + grpc.StatusCode.INTERNAL, + grpc.StatusCode.CANCELLED, + ), + back_off_func=backoff_exponential_jittered_min_interval(), ), - back_off_func=backoff_exponential_jittered_min_interval() - )) + ) methods_to_test = [ ( diff --git a/yandexcloud/_wrappers/dataproc/__init__.py b/yandexcloud/_wrappers/dataproc/__init__.py index 63a7ec09..5b8bb331 100644 --- a/yandexcloud/_wrappers/dataproc/__init__.py +++ b/yandexcloud/_wrappers/dataproc/__init__.py @@ -59,7 +59,7 @@ def custom_operation_waiter(sdk, operation_id, timeout): ) operation_service = sdk.client(OperationServiceStub, interceptor=retry_interceptor) return waiter_module.OperationWaiter(operation_id, operation_service, timeout) - + return custom_operation_waiter @@ -98,7 +98,6 @@ def __init__( if interceptor_settings: self._custom_operation_waiter = create_custom_operation_waiter(interceptor_settings) - def _with_dataproc_waiter(self, func, *args, **kwargs): if not self._custom_operation_waiter: return func(*args, **kwargs) From 9395e28d6c15ca7f4f771952eea4706fe9f4ca21 Mon Sep 17 00:00:00 2001 From: Timofei Zubov Date: Wed, 6 Aug 2025 13:18:52 +0300 Subject: [PATCH 8/8] add param desc --- yandexcloud/_wrappers/dataproc/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/yandexcloud/_wrappers/dataproc/__init__.py b/yandexcloud/_wrappers/dataproc/__init__.py index 5b8bb331..6bcfd4cc 100644 --- a/yandexcloud/_wrappers/dataproc/__init__.py +++ b/yandexcloud/_wrappers/dataproc/__init__.py @@ -75,6 +75,8 @@ class Dataproc: :type logger: Optional[logging.Logger] :param sdk: SDK object. Normally is being set by Wrappers constructor :type sdk: yandexcloud.SDK + :param interceptor_settings: Settings For Custom Dataproc Interceptor + :type interceptor_settings: Optional[InterceptorSettings] """ def __init__(