这是indexloc提供的服务,不要输入任何密码
Skip to content

Pipelines Control Flow Sample: dsl.Parallelfor issue #3837

@md-dev-lab

Description

@md-dev-lab

Dear all,

thanks for providing this great repo. However, with some notebooks it is not clear which kfp version it should be run with. I am running into the following issue in the notebook https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/official/pipelines/control_flow_kfp.ipynb

ValueError: Cannot iterate over a single parameter using dsl.ParallelFor. Expected a list of parameters as argument to items.

This error occurs in dsl.ParallelFor in

@dsl.pipeline(
    name="control",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
    json_string: str = json.dumps(
        [
            {
                "snakes": "anaconda",
                "lizards": "anole",
                "bunnies": [{"cottontail": "bugs"}, {"cottontail": "thumper"}],
            },
            {
                "snakes": "cobra",
                "lizards": "gecko",
                "bunnies": [{"cottontail": "roger"}],
            },
            {
                "snakes": "boa",
                "lizards": "iguana",
                "bunnies": [
                    {"cottontail": "fluffy"},
                    {"fuzzy_lop": "petunia", "cottontail": "peter"},
                ],
            },
        ],
        sort_keys=True,
    )
):

    flip1 = flip_coin_op()

    with dsl.Condition(
        flip1.output != "no-such-result", name="alwaystrue"
    ):  # always true

        args_generator = args_generator_op()
        with dsl.ParallelFor(args_generator.output) as item:
            print_op(msg=json_string)

            with dsl.Condition(flip1.output == "heads", name="heads"):
                print_op(msg=item.cats)

            with dsl.Condition(flip1.output == "tails", name="tails"):
                print_op(msg=item.dogs)

    with dsl.ParallelFor(json_string) as item:
        with dsl.Condition(item.snakes == "boa", name="snakes"):
            print_op(msg=item.snakes)
            print_op(msg=item.lizards)
            print_op(msg=item.bunnies)

    # it is possible to access sub-items
    with dsl.ParallelFor(json_string) as item:
        with dsl.ParallelFor(item.bunnies) as item_bunnies:
            print_op(msg=item_bunnies.cottontail)

kfp Version 2.10.1 has been used.

The following changes resolve the issue:

from typing import List
@component
def args_generator_op() -> List:
    import json

    return [
        json.dumps({"cats": "1", "dogs": "2"}),
        json.dumps({"cats": "10", "dogs": "20"})
    ]

and

@dsl.pipeline(
    name="control",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
    json_string: List = 
        [
            json.dumps({
                "snakes": "anaconda",
                "lizards": "anole",
                "bunnies": [{"cottontail": "bugs"}, {"cottontail": "thumper"}],
            }),
            json.dumps({
                "snakes": "cobra",
                "lizards": "gecko",
                "bunnies": [{"cottontail": "roger"}],
            }),
            json.dumps({
                "snakes": "boa",
                "lizards": "iguana",
                "bunnies": [
                    {"cottontail": "fluffy"},
                    {"fuzzy_lop": "petunia", "cottontail": "peter"},
                ],
            }),
        ],  
):

    flip1 = flip_coin_op()

    with dsl.Condition(
        flip1.output != "no-such-result", name="alwaystrue"
    ):  # always true

        args_generator = args_generator_op()
        with dsl.ParallelFor(args_generator.output) as item:
            print_op(msg=str(json_string))

            with dsl.Condition(flip1.output == "heads", name="heads"):
                print_op(msg=item.cats)

            with dsl.Condition(flip1.output == "tails", name="tails"):
                print_op(msg=item.dogs)

    with dsl.ParallelFor(json_string) as item:
        with dsl.Condition(item.snakes == "boa", name="snakes"):
            print_op(msg=item.snakes)
            print_op(msg=item.lizards)
            print_op(msg=item.bunnies)

    # it is possible to access sub-items
    with dsl.ParallelFor(json_string) as item:
        with dsl.ParallelFor(item.bunnies) as item_bunnies:
            print_op(msg=item_bunnies.cottontail)

However, I have been wondering if these quick 'fixes' are actually a fix.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions