-
Notifications
You must be signed in to change notification settings - Fork 154
Open
Description
Dear all,
thanks for providing this great repo. However, with some notebooks it is not clear which kfp version it should be run with. I am running into the following issue in the notebook https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb
ValueError: Cannot iterate over a single parameter using
dsl.ParallelFor. Expected a list of parameters as argument to
items.
This error occurs in dsl.ParallelFor
in
@dsl.pipeline(
name="control",
pipeline_root=PIPELINE_ROOT,
)
def pipeline(
json_string: str = json.dumps(
[
{
"snakes": "anaconda",
"lizards": "anole",
"bunnies": [{"cottontail": "bugs"}, {"cottontail": "thumper"}],
},
{
"snakes": "cobra",
"lizards": "gecko",
"bunnies": [{"cottontail": "roger"}],
},
{
"snakes": "boa",
"lizards": "iguana",
"bunnies": [
{"cottontail": "fluffy"},
{"fuzzy_lop": "petunia", "cottontail": "peter"},
],
},
],
sort_keys=True,
)
):
flip1 = flip_coin_op()
with dsl.Condition(
flip1.output != "no-such-result", name="alwaystrue"
): # always true
args_generator = args_generator_op()
with dsl.ParallelFor(args_generator.output) as item:
print_op(msg=json_string)
with dsl.Condition(flip1.output == "heads", name="heads"):
print_op(msg=item.cats)
with dsl.Condition(flip1.output == "tails", name="tails"):
print_op(msg=item.dogs)
with dsl.ParallelFor(json_string) as item:
with dsl.Condition(item.snakes == "boa", name="snakes"):
print_op(msg=item.snakes)
print_op(msg=item.lizards)
print_op(msg=item.bunnies)
# it is possible to access sub-items
with dsl.ParallelFor(json_string) as item:
with dsl.ParallelFor(item.bunnies) as item_bunnies:
print_op(msg=item_bunnies.cottontail)
kfp Version 2.10.1 has been used.
The following changes resolve the issue:
from typing import List
@component
def args_generator_op() -> List:
import json
return [
json.dumps({"cats": "1", "dogs": "2"}),
json.dumps({"cats": "10", "dogs": "20"})
]
and
@dsl.pipeline(
name="control",
pipeline_root=PIPELINE_ROOT,
)
def pipeline(
json_string: List =
[
json.dumps({
"snakes": "anaconda",
"lizards": "anole",
"bunnies": [{"cottontail": "bugs"}, {"cottontail": "thumper"}],
}),
json.dumps({
"snakes": "cobra",
"lizards": "gecko",
"bunnies": [{"cottontail": "roger"}],
}),
json.dumps({
"snakes": "boa",
"lizards": "iguana",
"bunnies": [
{"cottontail": "fluffy"},
{"fuzzy_lop": "petunia", "cottontail": "peter"},
],
}),
],
):
flip1 = flip_coin_op()
with dsl.Condition(
flip1.output != "no-such-result", name="alwaystrue"
): # always true
args_generator = args_generator_op()
with dsl.ParallelFor(args_generator.output) as item:
print_op(msg=str(json_string))
with dsl.Condition(flip1.output == "heads", name="heads"):
print_op(msg=item.cats)
with dsl.Condition(flip1.output == "tails", name="tails"):
print_op(msg=item.dogs)
with dsl.ParallelFor(json_string) as item:
with dsl.Condition(item.snakes == "boa", name="snakes"):
print_op(msg=item.snakes)
print_op(msg=item.lizards)
print_op(msg=item.bunnies)
# it is possible to access sub-items
with dsl.ParallelFor(json_string) as item:
with dsl.ParallelFor(item.bunnies) as item_bunnies:
print_op(msg=item_bunnies.cottontail)
However, I have been wondering if these quick 'fixes' are actually a fix.
Metadata
Metadata
Assignees
Labels
No labels