这是indexloc提供的服务,不要输入任何密码
Skip to content

Initialize and shutdown ray session in each executor #844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: ray-api
Choose a base branch
from

Conversation

praateekmahajan
Copy link
Contributor

@praateekmahajan praateekmahajan commented Jul 22, 2025

Description

This PRs goal is to ensure we are able to run different executor pipeline in the same python process e.g. Xenna pipeline followed by Ray Data pipeline (and vice versa). It is currently not possible to do so without shutdown because ray preserve envvars from the previous sessions.

If https://github.com/nvidia-cosmos/cosmos-xenna/pull/6/files does not merge, even then this PR works as expected as Xenna sets os.environ[RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES] which means if Ray Data runs after Xenna then Ray Data will not set CUDA_VISIBLE_DEVICES i.e. use gpu_id=0. We solve that by forcefully setting RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES="" inside envvars while doing ray.init inside Ray Data.

Workarounds needed

However because we introduce ray.shutdown() now

  1. running two subsequent Ray Data pipelines will fail due to [data] Failed to submit task to actor after ray.shutdown() and re-ray.init() in data pipeline for an existing cluster ray-project/ray#54841
  2. Whenever we create a ray cluster inside the process, it'll be killed by the first pipeline. To avoid that we can use subprocess.run to create our cluster

See the following for clearer understanding

  1. You start a Ray Cluster outside the process and run pipelines in the same process
ray start --head --port 1234

os.environ["RAY_ADDRESS"] = "localhost:1234"
ray.init(ignore_reinit=True) # this will connect to 1234
run_pipeline("xenna")
ray.shutdown()

ray.init(ignore_reinit=True) # this will connect to 1234
run_pipeline("ray data")
ray.shutdown()
  1. You start a ray cluster inside the process and run pipelines in the same process
ray.init(ignore_reinit=True) # this will start a new cluster
run_pipeline("xenna")
ray.shutdown() # this will kill the ray cluster


ray.init(ignore_reinit=True) # this will start a new cluster
run_pipeline("xenna")
ray.shutdown()
  1. You start a ray cluster explicitly inside the new process
ray.init(ignore_reinit=True) # this will start a new cluster A

ray.init(ignore_reinit=True) # this will connect to cluster A
run_pipeline("xenna")
ray.shutdown() # this will kill the cluster A


ray.init(ignore_reinit=True) # this will start cluster B
run_pipeline("xenna")
ray.shutdown()
  1. You use subprocess to start a ray cluster
subprocess.run(["ray", "start", "--head", "--port", "1234"]) # this will start a new cluster A

ray.init(ignore_reinit=True) # this will connect to cluster A @ 1234
run_pipeline("xenna")
ray.shutdown() # this will exit session

ray.init(ignore_reinit=True) # this will connect to cluster A @ 1234
run_pipeline("xenna")
ray.shutdown()

Usage

# Add snippet demonstrating usage

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
…an/NeMo-Curator into praateek/test-cuda-context

Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Copy link
Contributor

@abhinavg4 abhinavg4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some minor comments. Should add a bunch of TODOs but apart from that looks nice.

output_tasks: list[Task] = []
try:
# Initialize ray
ray.init(ignore_reinit_error=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Can we please add a comment here? Why are we doing this again here? Since getting the client also might have ray init/ray start.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this should have the loguru serialiser?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does have a loguru few lines above. Added a comment too

output_tasks = self._dataset_to_tasks(current_dataset)
logger.info(f"Pipeline completed. Final results: {len(output_tasks)} tasks")
finally:
ray.shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. A comment here would be useful. If you have made a issue out of our findings, we can just link that here.

@@ -113,13 +114,14 @@ def execute(self, stages: list[ProcessingStage], initial_tasks: list[Task] | Non
logger.info(f"Execution mode: {exec_mode.name}")

try:
# Run the pipeline
# Run the pipeline (this will initialize ray)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My recommendation would be to add ray. init here as well, along with Ray Loguru serializer, so that we are not dependent on Xenna. And the call is exactly the same across executors.
If we add ray.init here, the ray init inside Xenna will effectively be useless, and I think that is what we want?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup made that change, and in fact that helps us solve this PR without Xenna changes too (along with one more change)

from ray.cluster_utils import Cluster


def find_free_port():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function might be present in the actual code too. Can we add a TODO to use that maybe and remove this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason we find the port ourselves is so that subsequently we can connect to that port using ray.init (instead of address="auto" to avoid connecting to another cluster in case multiple are running)..

str(2 * ONE_GB),
"--block",
],
env={**os.environ, "RAY_MAX_LIMIT_FROM_API_SERVER": "40000", "RAY_MAX_LIMIT_FROM_DATA_SOURCE": "40000"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO to use get_client here in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, i hope we can. the only nuance is that we need to know the pid of the process that was started by ray start so that we can kill it without doing ray stop which might kill all ray processes

Signed-off-by: Praateek <praateekm@gmail.com>
Signed-off-by: Praateek <praateekm@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants