-
Notifications
You must be signed in to change notification settings - Fork 723
Description
Trying to migrate TFX v1 pipelines running on Kubeflow 1.8 to v2 ones and unable to set the necessary env variables for tensorflow-io to work with embedded Minio through S3 protocol.
System information
- Have I specified the code to reproduce the issue (Yes, No): yes
- Environment in which the code is executed (e.g., Local(Linux/MacOS/Windows),
Interactive Notebook, Google Cloud, etc): local Linux - TensorFlow version: 2.16.2
- TFX Version: 1.16
- Python version: 3.9.2
- Python dependencies (from
pip freeze
output):
Describe the current behavior
Attempt at setting globally at pipeline level does not work, i.e. the pipeline yaml does not show any of the components containing the desired env
`#'accelerator', 'cpu_limit', 'cpu_request', 'memory_limit', 'memory_request'
#pipeline_pb2.PipelineDeploymentConfig.PipelineContainerSpec.ResourceSpec.AcceleratorConfig(type='nvidia', count=1)
pcRes = pipeline_pb2.PipelineDeploymentConfig.PipelineContainerSpec.ResourceSpec(cpu_request=2.0, memory_request=8.0)
pcEnv = [pipeline_pb2.PipelineDeploymentConfig.PipelineContainerSpec.EnvVar(name=name, value=value)
for name, value in configs.get_MINIO_S3_ENV()]
pcSpec = pipeline_pb2.PipelineDeploymentConfig.PipelineContainerSpec(resources=pcRes, env=pcEnv)
tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
components=components,
...
platform_config=pcSpec), # does not seem to work at pipeline level, trying per component`
Attempt at doing same at component level (though not ideal due to repetition, was hoping to layer on and override where necessary) also does not work, i.e. pipeline yaml does not show the env variables in the respective container
CsvExampleGen( input_base=data_path, input_config=input ).with_platform_config(pcSpec)
Browsing through the codebase it seems to restrict platform_config to be just the ResourceSpec part, the sibling of EnvVar in https://github.com/kubeflow/pipelines/blob/1ba6d5f1c402158966d7fdc552b99c0ffca2dfa8/api/v2alpha1/pipeline_spec.proto#L688
` def _build_container_spec(self) -> ContainerSpec:
"""Builds the container spec for a component.
Returns:
The PipelineContainerSpec represents the container execution of the
component.
Raises:
NotImplementedError: When the executor class is neither ExecutorClassSpec
nor TemplatedExecutorContainerSpec.
"""
assert isinstance(self._node, base_component.BaseComponent)
if self._node.platform_config:
logging.info(
'ResourceSpec with container execution parameters has been passed via platform_config'
)
assert isinstance(
self._node.platform_config, pipeline_pb2.PipelineDeploymentConfig
.PipelineContainerSpec.ResourceSpec
), ('platform_config, if set by the user, must be a ResourceSpec proto '
'specifying vCPU and vRAM requirements')
cpu_limit = self._node.platform_config.cpu_limit
memory_limit = self._node.platform_config.memory_limit
if cpu_limit:
assert (cpu_limit >= 0), ('vCPU must be non-negative')
if memory_limit:
assert (memory_limit >= 0), ('vRAM must be non-negative')
if self._node.platform_config.accelerator.type:
assert (self._node.platform_config.accelerator.count >=
0), ('GPU type and count must be set')
if isinstance(self._node.executor_spec,
executor_specs.TemplatedExecutorContainerSpec):
container_spec = self._node.executor_spec
result = ContainerSpec(
image=container_spec.image,
command=_resolve_command_line(
container_spec=container_spec,
exec_properties=self._node.exec_properties,
))
if self._node.platform_config:
result.resources.CopyFrom(self._node.platform_config)
return result`