diff --git a/panel/io/jupyter_executor.py b/panel/io/jupyter_executor.py index 6c6e13818d..63a02e4951 100644 --- a/panel/io/jupyter_executor.py +++ b/panel/io/jupyter_executor.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from typing import ( - Any, Dict, List, Union, + TYPE_CHECKING, Any, Dict, List, Union, ) import tornado @@ -31,6 +31,9 @@ from .server import server_html_page_for_session from .state import set_curdoc, state +if TYPE_CHECKING: + from bokeh.document.events import DocumentPatchedEvent + @dataclass class _RequestProxy: @@ -48,6 +51,20 @@ def _repr_mimebundle_(self, include=None, exclude=None): return self._mimebundle, {} +class JupyterServerSession(ServerSession): + + _tasks = set() + + def _document_patched(self, event: DocumentPatchedEvent) -> None: + may_suppress = event.setter is self + for connection in self._subscribed_connections: + if may_suppress and connection is self._current_patch_connection: + continue + task = asyncio.ensure_future(connection.send_patch_document(event)) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) + + class PanelExecutor(WSHandler): """ The PanelExecutor is intended to be run inside a kernel where it @@ -95,7 +112,6 @@ def _get_payload(self, token: str) -> Dict[str, Any]: return payload def _set_state(self): - state._jupyter_kernel_context = True with edit_readonly(state): state.base_url = self.root_url + '/' state.rel_path = self.root_url @@ -146,6 +162,7 @@ def _create_server_session(self) -> ServerSession: # expose the session context to the document # use the _attribute to set the public property .session_context doc._session_context = weakref.ref(session_context) + state._jupyter_kernel_context = session_context if self.path.endswith('.yaml') or self.path.endswith('.yml'): from lumen.command import ( @@ -159,7 +176,7 @@ def _create_server_session(self) -> ServerSession: runner = app._handlers[0]._runner loop = tornado.ioloop.IOLoop.current() - session = ServerSession(self.session_id, doc, io_loop=loop, token=self.token) + session = JupyterServerSession(self.session_id, doc, io_loop=loop, token=self.token) session_context._set_session(session) return session, runner.error_detail @@ -168,10 +185,17 @@ async def write_message( binary: bool = False, locked: bool = True ) -> None: metadata = {'binary': binary} - if binary: - self.comm.send({}, metadata=metadata, buffers=[message]) + if locked: + with await self.write_lock.acquire(): + if binary: + self.comm.send({}, metadata=metadata, buffers=[message]) + else: + self.comm.send(message, metadata=metadata) else: - self.comm.send(message, metadata=metadata) + if binary: + self.comm.send({}, metadata=metadata, buffers=[message]) + else: + self.comm.send(message, metadata=metadata) def render(self) -> Mimebundle: """ diff --git a/panel/io/state.py b/panel/io/state.py index d6dc14f66a..d9b8268135 100644 --- a/panel/io/state.py +++ b/panel/io/state.py @@ -135,7 +135,7 @@ class _state(param.Parameterized): # Jupyter communication _comm_manager: ClassVar[Type[_CommManager]] = _CommManager - _jupyter_kernel_context: ClassVar[bool] = False + _jupyter_kernel_context: ClassVar[BokehSessionContext | None] = None _kernels = {} _ipykernels: ClassVar[WeakKeyDictionary[Document, Any]] = WeakKeyDictionary()