Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid overeager garbage collection #6518

Merged
merged 2 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions panel/io/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

from bokeh.application import Application as BkApplication
from bokeh.application.handlers.directory import DirectoryHandler
from bokeh.application.handlers.document_lifecycle import (
DocumentLifecycleHandler,
)

from ..config import config
from .document import _destroy_document
Expand All @@ -21,11 +24,26 @@
from .state import set_curdoc, state

if TYPE_CHECKING:
from bokeh.application.application import SessionContext
from bokeh.application.handlers.handler import Handler

log = logging.getLogger('panel.io.application')


def _on_session_destroyed(session_context: SessionContext) -> None:
"""
Calls any on_session_destroyed callbacks defined on the Document
"""
callbacks = session_context._document.session_destroyed_callbacks
session_context._document.session_destroyed_callbacks = set()
for callback in callbacks:
try:
callback(session_context)
except Exception as e:
log.warning("DocumentLifecycleHandler on_session_destroyed "
f"callback {callback} failed with following error: {e}")


class Application(BkApplication):
"""
Extends Bokeh Application with ability to add global session
Expand All @@ -45,6 +63,14 @@ async def on_session_created(self, session_context):
cb(session_context)
await super().on_session_created(session_context)

def add(self, handler: Handler) -> None:
"""
Override default DocumentLifeCycleHandler
"""
if type(handler) is DocumentLifecycleHandler:
handler._on_session_destroyed = _on_session_destroyed
super().add(handler)

def initialize_document(self, doc):
log.info(LOG_SESSION_LAUNCHING, id(doc))
super().initialize_document(doc)
Expand Down
39 changes: 31 additions & 8 deletions panel/io/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import inspect
import json
import logging
import sys
import threading
import time
import weakref

from contextlib import contextmanager
Expand Down Expand Up @@ -45,10 +47,12 @@
ColumnDataChangedEvent, ColumnsPatchedEvent, ColumnsStreamedEvent,
ModelChangedEvent
)

WRITE_TASKS = []
GC_DEBOUNCE = 5
WRITE_LOCK = asyncio.Lock()

_panel_last_cleanup = None
_write_tasks = []

@dataclasses.dataclass
class Request:
headers : dict
Expand All @@ -74,8 +78,8 @@ def request(self):
return Request(headers={}, cookies={}, arguments={})

def _cleanup_task(task):
if task in WRITE_TASKS:
WRITE_TASKS.remove(task)
if task in _write_tasks:
_write_tasks.remove(task)

def _dispatch_events(doc: Document, events: List[DocumentChangedEvent]) -> None:
"""
Expand Down Expand Up @@ -148,7 +152,7 @@ def _dispatch_write_task(doc, func, *args, **kwargs):
"""
try:
task = asyncio.ensure_future(func(*args, **kwargs))
WRITE_TASKS.append(task)
_write_tasks.append(task)
task.add_done_callback(_cleanup_task)
except RuntimeError:
doc.add_next_tick_callback(partial(func, *args, **kwargs))
Expand All @@ -175,6 +179,13 @@ async def _dispatch_msgs(doc, msgs):
await asyncio.sleep(0.01)
_dispatch_write_task(doc, _dispatch_msgs, doc, remaining)

def _garbage_collect():
if (new_time:= time.monotonic()-_panel_last_cleanup) < GC_DEBOUNCE:
at = dt.datetime.now() + dt.timedelta(seconds=new_time)
state.schedule_task('gc.collect', _garbage_collect, at=at)
return
gc.collect()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can set _panel_last_cleanup here so that if a session is destroyed say every 4 seconds (maybe some automated polling script), then we don't end up never running cleanup!

That would also mean the first session destroyed will cause very quick gc but any immediately following session destruction will only call gc at 5s intervals.


def _destroy_document(self, session):
"""
Override for Document.destroy() without calling gc.collect directly.
Expand All @@ -192,7 +203,17 @@ def _destroy_document(self, session):

self.callbacks.destroy()
self.models.destroy()
self.modules.destroy()

# Module cleanup without trawling through referrers (as self.modules.destroy() does)
for module in self.modules._modules:
# remove the reference from sys.modules
if module.__name__ in sys.modules:
del sys.modules[module.__name__]

# explicitly clear the module contents and the module here itself
module.__dict__.clear()
del module
self.modules._modules = []

# Clear periodic callbacks
for cb in state._periodic.get(self, []):
Expand All @@ -208,8 +229,10 @@ def _destroy_document(self, session):
del state_obj[self]

# Schedule GC
at = dt.datetime.now() + dt.timedelta(seconds=5)
state.schedule_task('gc.collect', gc.collect, at=at)
global _panel_last_cleanup
_panel_last_cleanup = time.monotonic()
at = dt.datetime.now() + dt.timedelta(seconds=GC_DEBOUNCE)
state.schedule_task('gc.collect', _garbage_collect, at=at)

del self.destroy

Expand Down
Loading