-
Notifications
You must be signed in to change notification settings - Fork 216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Reload a running deployment. #419
Conversation
name: https://github.com/run-llama/llama_deploy.git | ||
path: tests/apiserver/data/workflow:my_workflow | ||
name: https://github.com/run-llama/llama_deploy.git@massi/322 | ||
path: e2e_tests/apiserver/deployments/src:my_workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a catch-22 in here, the tests use this repo but until we merge this PR on main
I need to use my branch. When we'll merge this PR, Github will delete the branch and tests will need an update 😅
consumer_task = asyncio.create_task(consumer_fn()) | ||
self._service_tasks.append(consumer_task) | ||
self._service_startup_complete.set() | ||
await asyncio.gather(*self._service_tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the key change. We assume that until self._running
is true, if the services tasks are being cancelled they'll be restarted. We send the signal _service_startup_complete
when the load process is done so that the reload
function will know it can return.
if config.name in self._deployments: | ||
msg = f"Deployment already exists: {config.name}" | ||
raise ValueError(msg) | ||
if not reload: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if branch behaves exactly as before
deployment = Deployment(config=config, root_path=self._deployments_path) | ||
self._deployments[config.name] = deployment | ||
self._pool.apply_async(func=asyncio.run, args=(deployment.start(),)) | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this branch is new
service.port = self._control_plane_port | ||
self._control_plane_port += 1 | ||
service.port = self._last_control_plane_port | ||
self._last_control_plane_port += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated, I changed the name of the variable for clarity
for t in self._service_tasks: | ||
t.cancel() | ||
|
||
await self._service_startup_complete.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic here is to cancel the running services and wait for the reload in _run_services
to happen.
Fixes #322
Reload a running deployment by restarting its services. The message queue and the control plane won't be restarted, the thread running the dedicated ioloop for this deployment will stay active.