Skip to content

Commit

Permalink
Revert PR #170 (#228)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart authored Dec 26, 2023
1 parent 943597e commit b11cc72
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 245 deletions.
3 changes: 0 additions & 3 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,6 @@ async def on_message(self, message):
)
return skip

if message_type == MessageType.ROOM:
await self.room.handle_msg(message[1:])

if message_type == MessageType.CHAT:
msg = message[2:].decode("utf-8")

Expand Down
113 changes: 13 additions & 100 deletions jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,16 @@
from __future__ import annotations

import asyncio
import uuid
from logging import Logger
from typing import Any

from jupyter_events import EventLogger
from jupyter_ydoc import ydocs as YDOCS
from pycrdt_websocket.websocket_server import YRoom
from pycrdt_websocket.ystore import BaseYStore, YDocNotFound
from pycrdt_websocket.yutils import write_var_uint

from .loaders import FileLoader
from .utils import (
JUPYTER_COLLABORATION_EVENTS_URI,
LogLevel,
MessageType,
OutOfBandChanges,
RoomMessages,
)
from .utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, OutOfBandChanges

YFILE = YDOCS["file"]

Expand Down Expand Up @@ -53,7 +45,6 @@ def __init__(
self._save_delay = save_delay

self._update_lock = asyncio.Lock()
self._outofband_lock = asyncio.Lock()
self._initialization_lock = asyncio.Lock()
self._cleaner: asyncio.Task | None = None
self._saving_document: asyncio.Task | None = None
Expand Down Expand Up @@ -159,41 +150,6 @@ async def initialize(self) -> None:
self.ready = True
self._emit(LogLevel.INFO, "initialize", "Room initialized")

async def handle_msg(self, data: bytes) -> None:
msg_type = data[0]
msg_id = data[2:].decode()

# Use a lock to prevent handling responses from multiple clients
# at the same time
async with self._messages[msg_id]:
# Check whether the previous client resolved the conflict
if msg_id not in self._messages:
return

try:
ans = None
if msg_type == RoomMessages.RELOAD:
# Restore the room with the content from disk
await self._load_document()
ans = RoomMessages.DOC_OVERWRITTEN

elif msg_type == RoomMessages.OVERWRITE:
# Overwrite the file with content from the room
await self._save_document()
ans = RoomMessages.FILE_OVERWRITTEN

if ans is not None:
# Remove the lock and broadcast the resolution
self._messages.pop(msg_id)
data = msg_id.encode()
self._outofband_lock.release()
await self._broadcast_msg(
bytes([MessageType.ROOM, ans]) + write_var_uint(len(data)) + data
)

except Exception:
return

def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
data = {"level": level.value, "room": self._room_id, "path": self._file.path}
if action:
Expand Down Expand Up @@ -232,24 +188,24 @@ async def _on_content_change(self, event: str, args: dict[str, Any]) -> None:
event (str): Type of change.
args (dict): A dictionary with format, type, last_modified.
"""
if self._outofband_lock.locked():
return

if event == "metadata" and (
self._last_modified is None or self._last_modified < args["last_modified"]
):
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.")

msg_id = str(uuid.uuid4())
self._messages[msg_id] = asyncio.Lock()
await self._outofband_lock.acquire()
data = msg_id.encode()
await self._broadcast_msg(
bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED])
+ write_var_uint(len(data))
+ data
)
try:
model = await self._file.load_content(self._file_format, self._file_type, True)
except Exception as e:
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)
return None

async with self._update_lock:
self._document.source = model["content"]
self._last_modified = model["last_modified"]
self._document.dirty = False

def _on_document_change(self, target: str, event: Any) -> None:
"""
Expand All @@ -276,45 +232,6 @@ def _on_document_change(self, target: str, event: Any) -> None:

self._saving_document = asyncio.create_task(self._maybe_save_document())

async def _load_document(self) -> None:
try:
model = await self._file.load_content(self._file_format, self._file_type, True)
except Exception as e:
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)
return None

async with self._update_lock:
self._document.source = model["content"]
self._last_modified = model["last_modified"]
self._document.dirty = False

async def _save_document(self) -> None:
"""
Saves the content of the document to disk.
"""
try:
self.log.info("Saving the content from room %s", self._room_id)
model = await self._file.save_content(
{
"format": self._file_format,
"type": self._file_type,
"last_modified": self._last_modified,
"content": self._document.source,
}
)
self._last_modified = model["last_modified"]
async with self._update_lock:
self._document.dirty = False

self._emit(LogLevel.INFO, "save", "Content saved.")

except Exception as e:
msg = f"Error saving file: {self._file.path}\n{e!r}"
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)

async def _maybe_save_document(self) -> None:
"""
Saves the content of the document to disk.
Expand Down Expand Up @@ -368,10 +285,6 @@ async def _maybe_save_document(self) -> None:
self.log.error(msg, exc_info=e)
self._emit(LogLevel.ERROR, None, msg)

async def _broadcast_msg(self, msg: bytes) -> None:
for client in self.clients:
await client.send(msg)


class TransientRoom(YRoom):
"""A Y room for sharing state (e.g. awareness)."""
Expand Down
9 changes: 0 additions & 9 deletions jupyter_collaboration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,9 @@
class MessageType(IntEnum):
SYNC = 0
AWARENESS = 1
ROOM = 124
CHAT = 125


class RoomMessages(IntEnum):
RELOAD = 0
OVERWRITE = 1
FILE_CHANGED = 2
FILE_OVERWRITTEN = 3
DOC_OVERWRITTEN = 4


class LogLevel(Enum):
INFO = "INFO"
DEBUG = "DEBUG"
Expand Down
9 changes: 0 additions & 9 deletions packages/docprovider/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,5 @@
|----------------------------------------------------------------------------*/

export enum MessageType {
ROOM = 124,
CHAT = 125
}

export enum RoomMessage {
RELOAD = 0,
OVERWRITE = 1,
FILE_CHANGED = 2,
FILE_OVERWRITTEN = 3,
DOC_OVERWRITTEN = 4
}
68 changes: 0 additions & 68 deletions packages/docprovider/src/yprovider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ import { Signal } from '@lumino/signaling';

import { DocumentChange, YDocument } from '@jupyter/ydoc';

import * as decoding from 'lib0/decoding';
import * as encoding from 'lib0/encoding';
import { Awareness } from 'y-protocols/awareness';
import { WebsocketProvider as YWebsocketProvider } from 'y-websocket';

import { requestDocSession } from './requests';
import { MessageType, RoomMessage } from './utils';

/**
* An interface for a document provider.
Expand Down Expand Up @@ -114,18 +111,6 @@ export class WebSocketProvider implements IDocumentProvider {

this._yWebsocketProvider.on('sync', this._onSync);
this._yWebsocketProvider.on('connection-close', this._onConnectionClosed);

this._yWebsocketProvider.messageHandlers[MessageType.ROOM] = (
encoder,
decoder,
provider,
emitSynced,
messageType
) => {
const msgType = decoding.readVarUint(decoder);
const data = decoding.readVarString(decoder);
this._handleRoomMessage(msgType, data);
};
}

private _onUserChanged(user: User.IManager): void {
Expand Down Expand Up @@ -153,59 +138,6 @@ export class WebSocketProvider implements IDocumentProvider {
}
};

private _handleRoomMessage(type: number, data: string): void {
switch (type) {
case RoomMessage.FILE_CHANGED:
this._handleFileChanged(data);
break;

case RoomMessage.DOC_OVERWRITTEN:
case RoomMessage.FILE_OVERWRITTEN:
if (this._dialog) {
this._dialog.close();
this._dialog = null;
}
break;
}
}

private _handleFileChanged(data: string): void {
this._dialog = new Dialog({
title: this._trans.__('File changed'),
body: this._trans.__('Do you want to overwrite the file or reload it?'),
buttons: [
Dialog.okButton({ label: 'Reload' }),
Dialog.warnButton({ label: 'Overwrite' })
],
hasClose: false
});

this._dialog.launch().then(resp => {
if (resp.button.label === 'Reload') {
this._sendReloadMsg(data);
} else if (resp.button.label === 'Overwrite') {
this._sendOverwriteMsg(data);
}
});
}

private _sendReloadMsg(data: string): void {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, MessageType.ROOM);
encoding.writeVarUint(encoder, RoomMessage.RELOAD);
encoding.writeVarString(encoder, data);
this._yWebsocketProvider?.ws!.send(encoding.toUint8Array(encoder));
}

private _sendOverwriteMsg(data: string): void {
const encoder = encoding.createEncoder();
encoding.writeVarUint(encoder, MessageType.ROOM);
encoding.writeVarUint(encoder, RoomMessage.OVERWRITE);
encoding.writeVarString(encoder, data);
this._yWebsocketProvider?.ws!.send(encoding.toUint8Array(encoder));
}

private _dialog: Dialog<any> | null = null;
private _awareness: Awareness;
private _contentType: string;
private _format: string;
Expand Down
45 changes: 0 additions & 45 deletions tests/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
from __future__ import annotations

import asyncio
from datetime import datetime

from jupyter_ydoc import YUnicode

from .utils import overite_msg, reload_msg


async def test_should_initialize_document_room_without_store(rtc_create_mock_document_room):
content = "test"
Expand Down Expand Up @@ -78,45 +75,3 @@ async def test_undefined_save_delay_should_not_save_content_after_document_chang
await asyncio.sleep(0.15)

assert "save" not in cm.actions


async def test_should_reload_content_from_disk(rtc_create_mock_document_room):
content = "test"
last_modified = datetime.now()

cm, loader, room = rtc_create_mock_document_room(
"test-id", "test.txt", "whatever", last_modified
)

await room.initialize()

# Make sure the time increases
cm.model["last_modified"] = datetime.fromtimestamp(last_modified.timestamp() + 1)
cm.model["content"] = content

await loader.notify()

msg_id = next(iter(room._messages)).encode("utf8")
await room.handle_msg(reload_msg(msg_id))

assert room._document.source == content


async def test_should_not_reload_content_from_disk(rtc_create_mock_document_room):
content = "test"
last_modified = datetime.now()

cm, loader, room = rtc_create_mock_document_room("test-id", "test.txt", content, last_modified)

await room.initialize()

# Make sure the time increases
cm.model["last_modified"] = datetime.fromtimestamp(last_modified.timestamp() + 1)
cm.model["content"] = "whatever"

await loader.notify()

msg_id = list(room._messages.keys())[0].encode("utf8")
await room.handle_msg(overite_msg(msg_id))

assert room._document.source == content
11 changes: 0 additions & 11 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
from typing import Any

from jupyter_server import _tz as tz
from pycrdt_websocket.yutils import write_var_uint

from jupyter_collaboration.utils import RoomMessages


class FakeFileIDManager:
Expand Down Expand Up @@ -55,11 +52,3 @@ def save_content(self, model: dict[str, Any], path: str) -> dict:
class FakeEventLogger:
def emit(self, schema_id: str, data: dict) -> None:
print(data)


def reload_msg(msg_id: str) -> bytearray:
return bytes([RoomMessages.RELOAD]) + write_var_uint(len(msg_id)) + msg_id


def overite_msg(msg_id: str) -> bytearray:
return bytes([RoomMessages.OVERWRITE]) + write_var_uint(len(msg_id)) + msg_id

0 comments on commit b11cc72

Please sign in to comment.