From 1ad838f11f786f2e0a02697e526657e3dd10cacd Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 23 Aug 2022 15:33:42 +0200 Subject: [PATCH 1/5] Ensure curdoc lookups work in async context --- panel/io/document.py | 5 ++--- panel/io/model.py | 2 ++ panel/io/state.py | 42 +++++++++++++++++++++++++++----------- panel/tests/test_server.py | 32 ++++++++++++++++++++++++++++- 4 files changed, 65 insertions(+), 16 deletions(-) diff --git a/panel/io/document.py b/panel/io/document.py index e0fc1f276d..3702b6cba0 100644 --- a/panel/io/document.py +++ b/panel/io/document.py @@ -18,7 +18,7 @@ from bokeh.document.events import DocumentChangedEvent, ModelChangedEvent from .model import monkeypatch_events -from .state import curdoc_locked, set_curdoc, state +from .state import curdoc_locked, state logger = logging.getLogger(__name__) @@ -72,8 +72,7 @@ def init_doc(doc: Optional[Document]) -> Document: thread = threading.current_thread() if thread: - with set_curdoc(curdoc): - state._thread_id = thread.ident + state._thread_id_[curdoc] = thread.ident session_id = curdoc.session_context.id sessions = state.session_info['sessions'] diff --git a/panel/io/model.py b/panel/io/model.py index 2d8d6cab69..cc2e20c6dd 100644 --- a/panel/io/model.py +++ b/panel/io/model.py @@ -77,6 +77,8 @@ def diff( return None patch_events = [event for event in events if isinstance(event, DocumentPatchedEvent)] + if not patch_events: + return monkeypatch_events(events) msg_type: Literal["PATCH-DOC"] = "PATCH-DOC" msg = Protocol().create(msg_type, patch_events, use_buffers=binary) diff --git a/panel/io/state.py b/panel/io/state.py index 1932f207b3..3ca7230dc0 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -325,10 +325,12 @@ def _on_load(self, doc: Optional[Document] = None) -> None: from .profile import profile_ctx with set_curdoc(doc): if (doc and doc in self._launching) or not config.profiler: - for cb in callbacks: cb() + for cb in callbacks: + self.execute(cb, schedule=False) return with profile_ctx(config.profiler) as sessions: - for cb in callbacks: cb() + for cb in callbacks: + self.execute(cb, schedule=False) path = doc.session_context.request.path self._profiles[(path+':on_load', config.profiler)] += sessions self.param.trigger('_profiles') @@ -565,15 +567,20 @@ def log(self, msg: str, level: str = 'info') -> None: msg = LOG_USER_MSG.format(msg=msg) getattr(_state_logger, level.lower())(msg, *args) - def onload(self, callback): + def onload(self, callback: Callable[[], None] | Coroutine[Any, Any, None]): """ Callback that is triggered when a session has been served. + + Arguments + --------- + callback: Callable[[], None] | Coroutine[Any, Any, None] + Callback that is executed when the application is loaded """ if self.curdoc is None: if self._thread_pool: - self._thread_pool.submit(callback) + self._thread_pool.submit(partial(self.execute, callback, schedule=False)) else: - callback() + self.execute(callback, schedule=False) return if self.curdoc not in self._onload: self._onload[self.curdoc] = [] @@ -802,23 +809,34 @@ def curdoc(self, doc: Document) -> None: def _curdoc(self) -> Document | None: """ Required to make overrides to curdoc (e.g. using the - set_curdoc context manager) thread-safe. Otherwise two threads - may independently override the curdoc and end up in a confused - final state. + set_curdoc context manager) thread-safe and asyncio task + local. Otherwise two threads may independently override the + curdoc and end up in a confused final state. """ + try: + task = asyncio.current_task() + task_id = id(task) + except Exception: + task_id = None thread = threading.current_thread() thread_id = thread.ident if thread else None - return self._curdoc_.get(thread_id) + return self._curdoc_.get((thread_id, task_id)) @_curdoc.setter def _curdoc(self, doc: Document | None) -> None: + try: + task = asyncio.current_task() + task_id = id(task) + except Exception: + task_id = None thread = threading.current_thread() thread_id = thread.ident if thread else None + key = (thread_id, task_id) if doc is None: - if thread_id in self._curdoc_: - del self._curdoc_[thread_id] + if key in self._curdoc_: + del self._curdoc_[key] else: - self._curdoc_[thread_id] = doc + self._curdoc_[key] = doc @property def cookies(self) -> Dict[str, str]: diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index d58e023c94..ef4c780efe 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -1,3 +1,4 @@ +import asyncio import datetime as dt import os import pathlib @@ -149,7 +150,6 @@ def test_server_async_callbacks(port): counts = [] async def cb(event, count=[0]): - import asyncio count[0] += 1 counts.append(count[0]) await asyncio.sleep(1) @@ -176,6 +176,36 @@ async def cb(event, count=[0]): assert max(counts) > 1 +def test_server_async_local_state(port): + docs = {} + + async def task(): + curdoc = state.curdoc + await asyncio.sleep(0.5) + docs[curdoc] = [] + for i in range(10): + await asyncio.sleep(0.1) + docs[curdoc].append(state.curdoc) + + def app(): + state.execute(task) + return 'My app' + + serve(app, port=port, threaded=True, show=False) + + # Wait for server to start + time.sleep(1) + + for i in range(5): + requests.get(f"http://localhost:{port}/") + + # Wait for callbacks to be scheduled + time.sleep(2) + + # Ensure state.curdoc was consistent despite asyncio context switching + assert len(docs) == 5 + assert all([len(set(docs)) == 1 and docs[0] is doc for doc, docs in docs.items()]) + def test_serve_config_per_session_state(): CSS1 = 'body { background-color: red }' CSS2 = 'body { background-color: green }' From d936314eb2556cd6918085f9c97f4794a1b235f0 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Tue, 23 Aug 2022 15:49:47 +0200 Subject: [PATCH 2/5] Add async onload test --- panel/tests/test_server.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index ef4c780efe..b9e5c2a899 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -568,6 +568,40 @@ def loaded(): assert max(counts) >= 2 +def test_server_async_onload(threads, port): + counts = [] + + def app(count=[0]): + button = Button(name='Click') + async def onload(): + count[0] += 1 + counts.append(count[0]) + await asyncio.sleep(2) + count[0] -= 1 + + state.onload(onload) + + # Simulate rendering + def loaded(): + state._schedule_on_load(state.curdoc, None) + state.execute(loaded, schedule=True) + + return button + + serve(app, port=port, threaded=True, show=False) + + # Wait for server to start + time.sleep(1) + + requests.get(f"http://localhost:{port}/") + requests.get(f"http://localhost:{port}/") + + time.sleep(1) + + # Checks whether onload callbacks were executed concurrently + assert max(counts) >= 2 + + class CustomBootstrapTemplate(BootstrapTemplate): _css = './assets/custom.css' From b322b3cda6f8823d392085c5a8789f405663e31a Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Wed, 24 Aug 2022 20:02:39 +0200 Subject: [PATCH 3/5] Handle async child tasks --- panel/io/jupyter_server_extension.py | 6 +++- panel/io/server.py | 24 ++++++++++++++- panel/io/state.py | 43 +++++++++++++++++--------- panel/tests/test_server.py | 45 ++++++++++++++++++++++++++-- 4 files changed, 99 insertions(+), 19 deletions(-) diff --git a/panel/io/jupyter_server_extension.py b/panel/io/jupyter_server_extension.py index beca254137..dc1cbba7f3 100644 --- a/panel/io/jupyter_server_extension.py +++ b/panel/io/jupyter_server_extension.py @@ -69,7 +69,7 @@ from .resources import ( DIST_DIR, ERROR_TEMPLATE, Resources, _env, ) -from .server import server_html_page_for_session +from .server import _add_task_factory, server_html_page_for_session from .state import set_curdoc, state logger = logging.getLogger(__name__) @@ -218,6 +218,10 @@ def _create_server_session(self) -> ServerSession: app.initialize_document(doc) loop = tornado.ioloop.IOLoop.current() + try: + _add_task_factory(loop.asyncio_loop) # type: ignore + except Exception: + pass session = ServerSession(self.session_id, doc, io_loop=loop, token=self.token) session_context._set_session(session) return session diff --git a/panel/io/server.py b/panel/io/server.py index d36fe1a363..baf45330c9 100644 --- a/panel/io/server.py +++ b/panel/io/server.py @@ -204,7 +204,6 @@ def autoload_js_script(doc, resources, token, element_id, app_path, absolute_url return AUTOLOAD_JS.render(bundle=bundle, elementid=element_id) - def destroy_document(self, session): """ Override for Document.destroy() without calling gc.collect directly. @@ -560,6 +559,24 @@ def create_static_handler(prefix, key, app): ): asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) +def _add_task_factory(loop): + """ + Adds a Task factory to the asyncio IOLoop that ensures child tasks + have access to their parent. + """ + if getattr(loop, '_has_panel_task_factory', False): + return + existing_factory = loop.get_task_factory() + def task_factory(loop, coro): + if existing_factory: + task = existing_factory(loop, coro) + else: + task = asyncio.Task(coro, loop=loop) + task.parent_task = asyncio.current_task() + return task + loop.set_task_factory(task_factory) + loop._has_panel_task_factory = True + #--------------------------------------------------------------------- # Public API #--------------------------------------------------------------------- @@ -846,6 +863,11 @@ def show_callback(): server.show('/login' if config.oauth_provider else '/') server.io_loop.add_callback(show_callback) + try: + _add_task_factory(server.io_loop.asyncio_loop) # type: ignore + except Exception: + pass + def sig_exit(*args, **kwargs): server.io_loop.add_callback_from_signal(do_stop) diff --git a/panel/io/state.py b/panel/io/state.py index 3ca7230dc0..7733616247 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -113,7 +113,7 @@ class _state(param.Parameterized): A dictionary used by the cache decorator.""") # Holds temporary curdoc overrides per thread - _curdoc_ = {} + _curdoc_ = defaultdict(WeakKeyDictionary) # Whether to hold comm events _hold: ClassVar[bool] = False @@ -813,30 +813,45 @@ def _curdoc(self) -> Document | None: local. Otherwise two threads may independently override the curdoc and end up in a confused final state. """ + thread = threading.current_thread() + thread_id = thread.ident if thread else None + if thread_id not in self._curdoc_: + return None + curdocs = self._curdoc_[thread_id] try: task = asyncio.current_task() - task_id = id(task) except Exception: - task_id = None - thread = threading.current_thread() - thread_id = thread.ident if thread else None - return self._curdoc_.get((thread_id, task_id)) + task = None + while True: + if task in curdocs: + return curdocs[task] + elif task is None: + return None + try: + task = task.parent_task + except Exception: + task = None @_curdoc.setter def _curdoc(self, doc: Document | None) -> None: + thread = threading.current_thread() + thread_id = thread.ident if thread else None + if thread_id not in self._curdoc_ and doc is None: + return None + curdocs = self._curdoc_[thread_id] try: task = asyncio.current_task() - task_id = id(task) except Exception: - task_id = None - thread = threading.current_thread() - thread_id = thread.ident if thread else None - key = (thread_id, task_id) + task = None if doc is None: - if key in self._curdoc_: - del self._curdoc_[key] + # Do not clean up curdocs for tasks since they may have + # children that are still running + if task in curdocs and task is None: + del curdocs[task] + if not len(curdocs): + del self._curdoc_[thread_id] else: - self._curdoc_[key] = doc + curdocs[task] = doc @property def cookies(self) -> Dict[str, str]: diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index b9e5c2a899..bfcbf297f6 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -1,5 +1,6 @@ import asyncio import datetime as dt +import gc import os import pathlib import time @@ -183,7 +184,7 @@ async def task(): curdoc = state.curdoc await asyncio.sleep(0.5) docs[curdoc] = [] - for i in range(10): + for i in range(5): await asyncio.sleep(0.1) docs[curdoc].append(state.curdoc) @@ -196,16 +197,54 @@ def app(): # Wait for server to start time.sleep(1) - for i in range(5): + for i in range(3): requests.get(f"http://localhost:{port}/") # Wait for callbacks to be scheduled time.sleep(2) # Ensure state.curdoc was consistent despite asyncio context switching - assert len(docs) == 5 + assert len(docs) == 3 assert all([len(set(docs)) == 1 and docs[0] is doc for doc, docs in docs.items()]) + +def test_server_async_local_state_nested_tasks(port): + docs = {} + + async def task(depth=1): + curdoc = state.curdoc + await asyncio.sleep(0.5) + if depth > 0: + asyncio.ensure_future(task(depth-1)) + docs[curdoc] = [] + for i in range(10): + await asyncio.sleep(0.1) + docs[curdoc].append(state.curdoc) + + def app(): + state.execute(task) + return 'My app' + + serve(app, port=port, threaded=True, show=False) + + # Wait for server to start + time.sleep(1) + + for i in range(3): + requests.get(f"http://localhost:{port}/") + + # Wait for callbacks to be scheduled + time.sleep(2) + + # Ensure state.curdoc was consistent despite asyncio context switching + assert len(docs) == 3 + assert all(len(set(docs)) == 1 and docs[0] is doc for doc, docs in docs.items()) + + # Ensure all curdocs are GCed + gc.collect() + assert all(len(curdocs) == 0 for curdocs in state._curdoc_.values()) + + def test_serve_config_per_session_state(): CSS1 = 'body { background-color: red }' CSS2 = 'body { background-color: green }' From de1fc8b3ca9872ac098ad336e11133c67b87b5a8 Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Wed, 24 Aug 2022 23:52:35 +0200 Subject: [PATCH 4/5] Test fixes --- panel/io/state.py | 14 ++++++++------ panel/tests/conftest.py | 1 + 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/panel/io/state.py b/panel/io/state.py index 7733616247..edee9b927c 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -60,7 +60,7 @@ @contextmanager def set_curdoc(doc: Document): orig_doc = state._curdoc - state.curdoc = doc + state._curdoc = doc yield state._curdoc = orig_doc @@ -824,13 +824,14 @@ def _curdoc(self) -> Document | None: task = None while True: if task in curdocs: - return curdocs[task] + return curdocs[task or self] elif task is None: - return None + break try: task = task.parent_task except Exception: task = None + return curdocs[self] if self in curdocs else None @_curdoc.setter def _curdoc(self, doc: Document | None) -> None: @@ -843,15 +844,16 @@ def _curdoc(self, doc: Document | None) -> None: task = asyncio.current_task() except Exception: task = None + key = task or self if doc is None: # Do not clean up curdocs for tasks since they may have # children that are still running - if task in curdocs and task is None: - del curdocs[task] + if key in curdocs and task is None: + del curdocs[key] if not len(curdocs): del self._curdoc_[thread_id] else: - curdocs[task] = doc + curdocs[key] = doc @property def cookies(self) -> Dict[str, str]: diff --git a/panel/tests/conftest.py b/panel/tests/conftest.py index 1d27cd15e7..0d7e92e7fb 100644 --- a/panel/tests/conftest.py +++ b/panel/tests/conftest.py @@ -313,6 +313,7 @@ def server_cleanup(): state._curdoc = None state.cache.clear() state._scheduled.clear() + state._curdoc_.clear() if state._thread_pool is not None: state._thread_pool.shutdown(wait=False) state._thread_pool = None From 1477d2acbc2fcb35c09e76b6fd509cc3a95caf8c Mon Sep 17 00:00:00 2001 From: Philipp Rudiger Date: Thu, 25 Aug 2022 01:07:46 +0200 Subject: [PATCH 5/5] Enable for panel serve --- panel/io/server.py | 43 +++++++++++++++++++++++--------------- panel/tests/test_server.py | 5 ----- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/panel/io/server.py b/panel/io/server.py index baf45330c9..98557c70b3 100644 --- a/panel/io/server.py +++ b/panel/io/server.py @@ -47,7 +47,7 @@ ) from bokeh.embed.util import RenderItem from bokeh.io import curdoc -from bokeh.server.server import Server +from bokeh.server.server import Server as BokehServer from bokeh.server.urls import per_app_patterns from bokeh.server.views.autoload_js_handler import ( AutoloadJsHandler as BkAutoloadJsHandler, @@ -238,6 +238,18 @@ def destroy_document(self, session): state.schedule_task('gc.collect', gc.collect, at=at) +# Patch Srrver to attach task factory to asyncio loop +class Server(BokehServer): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + try: + _add_task_factory(self.io_loop.asyncio_loop) # type: ignore + except Exception: + pass + +bokeh.server.server.Server = Server + # Patch Application to handle session callbacks class Application(BkApplication): @@ -289,17 +301,14 @@ class DocHandler(BkDocHandler, SessionPrefixHandler): async def get(self, *args, **kwargs): with self._session_prefix(): session = await self.get_session() - state.curdoc = session.document logger.info(LOG_SESSION_CREATED, id(session.document)) - try: + with set_curdoc(session.document): resources = Resources.from_bokeh(self.application.resources()) page = server_html_page_for_session( session, resources=resources, title=session.document.title, template=session.document.template, template_variables=session.document.template_variables ) - finally: - state.curdoc = None self.set_header("Content-Type", 'text/html') self.write(page) @@ -327,15 +336,12 @@ async def get(self, *args, **kwargs) -> None: with self._session_prefix(): session = await self.get_session() - state.curdoc = session.document - try: + with set_curdoc(session.document): resources = Resources.from_bokeh(self.application.resources(server_url)) js = autoload_js_script( session.document, resources, session.token, element_id, app_path, absolute_url ) - finally: - state.curdoc = None self.set_header("Content-Type", 'application/javascript') self.write(js) @@ -545,6 +551,10 @@ def create_static_handler(prefix, key, app): bokeh.server.tornado.create_static_handler = create_static_handler +#--------------------------------------------------------------------- +# Async patches +#--------------------------------------------------------------------- + # Bokeh 2.4.x patches the asyncio event loop policy but Tornado 6.1 # support the WindowsProactorEventLoopPolicy so we restore it, # unless we detect we are running on jupyter_server. @@ -568,11 +578,15 @@ def _add_task_factory(loop): return existing_factory = loop.get_task_factory() def task_factory(loop, coro): + try: + parent_task = asyncio.current_task() + except RuntimeError: + parent_task = None if existing_factory: task = existing_factory(loop, coro) else: task = asyncio.Task(coro, loop=loop) - task.parent_task = asyncio.current_task() + task.parent_task = parent_task return task loop.set_task_factory(task_factory) loop._has_panel_task_factory = True @@ -587,7 +601,7 @@ def serve( loop: Optional[IOLoop] = None, show: bool = True, start: bool = True, title: Optional[str] = None, verbose: bool = True, location: bool = True, threaded: bool = False, **kwargs -) -> threading.Thread | 'Server': +) -> threading.Thread | Server: """ Allows serving one or more panel objects on a single server. The panels argument should be either a Panel object or a function @@ -762,7 +776,7 @@ def get_server( Returns ------- - server : bokeh.server.server.Server + server : panel.io.server.Server Bokeh Server instance running this panel """ from ..config import config @@ -863,11 +877,6 @@ def show_callback(): server.show('/login' if config.oauth_provider else '/') server.io_loop.add_callback(show_callback) - try: - _add_task_factory(server.io_loop.asyncio_loop) # type: ignore - except Exception: - pass - def sig_exit(*args, **kwargs): server.io_loop.add_callback_from_signal(do_stop) diff --git a/panel/tests/test_server.py b/panel/tests/test_server.py index bfcbf297f6..d4a6fefe2e 100644 --- a/panel/tests/test_server.py +++ b/panel/tests/test_server.py @@ -1,6 +1,5 @@ import asyncio import datetime as dt -import gc import os import pathlib import time @@ -240,10 +239,6 @@ def app(): assert len(docs) == 3 assert all(len(set(docs)) == 1 and docs[0] is doc for doc, docs in docs.items()) - # Ensure all curdocs are GCed - gc.collect() - assert all(len(curdocs) == 0 for curdocs in state._curdoc_.values()) - def test_serve_config_per_session_state(): CSS1 = 'body { background-color: red }'