Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Protocol simplification proposal #1039

Draft
wants to merge 43 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
f823b2a
get rid of statefullness
marinoska Dec 14, 2024
56a967c
Get rid of peer states
marinoska Dec 14, 2024
0e8b045
rebase storage on new actions
marinoska Dec 14, 2024
84e17a9
load from peers with pull
marinoska Dec 14, 2024
73d5f18
handlePush instead handleLoad
marinoska Dec 15, 2024
d9c2503
replace content with push
marinoska Dec 15, 2024
86a2c91
implement all actions
marinoska Dec 15, 2024
ca54b4c
tweaks
marinoska Dec 15, 2024
47275a1
Track upload state in coValueState
marinoska Dec 19, 2024
14b70aa
add peer ops and refactor sync to make use of them
marinoska Dec 20, 2024
0712546
Move peers tracking into node
marinoska Dec 20, 2024
f6bbe18
Rearrange logic between local node and sync
marinoska Dec 21, 2024
5a97702
Create Peers class
marinoska Dec 22, 2024
3a55c8a
Move getServerAndStorage to Peers
marinoska Dec 22, 2024
891baf2
Move load logic into sync
marinoska Dec 22, 2024
df59b53
Move all response logic into PeerOperations
marinoska Dec 23, 2024
0458e12
Refactor handle pull
marinoska Dec 23, 2024
928ac67
Tweaks
marinoska Dec 23, 2024
d8cabe3
Introduce message handlers
marinoska Dec 26, 2024
60adbff
Refactor sync.ts
marinoska Dec 26, 2024
526a26a
Delete subscriptionManager
marinoska Dec 26, 2024
02cd6fe
Fix Imports
marinoska Dec 26, 2024
5c87175
tweaks
marinoska Dec 26, 2024
d7be246
tiny fixes
marinoska Dec 27, 2024
191a7f3
Refactor storage and storage-sqlite
marinoska Dec 27, 2024
452c284
Fix sync bugs
marinoska Dec 28, 2024
cefc6e2
Copy entry upload state when entry is copied
marinoska Dec 29, 2024
26d4fa9
SQLite selects protocol conditionally
marinoska Dec 29, 2024
d4e93af
dont sync from the data action
marinoska Dec 29, 2024
244fd84
Send dependencies when unknown covalue requested
marinoska Dec 30, 2024
5631d53
Order initial sync messages by dependencies
marinoska Dec 31, 2024
b84edec
Fix data handler for dependencies
marinoska Dec 31, 2024
1967c73
Add dependency service
marinoska Dec 31, 2024
b959375
Send content fix
marinoska Jan 1, 2025
8a40c02
Fix unloading dependencies
marinoska Jan 2, 2025
bb0158e
Fix dependencies not loaing from another peer
marinoska Jan 3, 2025
d1c0981
add anaware peers
marinoska Jan 4, 2025
8ea0858
Fix build errors
marinoska Jan 4, 2025
c391180
Add queue runner
marinoska Jan 4, 2025
7b96fd7
Fix peers bug
marinoska Jan 5, 2025
cfbe745
add todo
marinoska Jan 6, 2025
2f6ca4c
Sync if not found in peer
marinoska Jan 6, 2025
0589bf2
Add config for cojson
marinoska Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/chat/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { Account, CoValue, ID } from "jazz-tools";

export function waitForUpload(id: ID<CoValue>, me: Account) {
const syncManager = me._raw.core.node.syncManager;
const peers = syncManager.getPeers();
const peers = me._raw.core.node.peers.getAll();

return Promise.all(
peers.map((peer) => syncManager.waitForUploadIntoPeer(peer.id, id)),
Expand Down
1 change: 1 addition & 0 deletions examples/pets/src/2_main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const peer =
"peer",
) as `ws://${string}`) ??
"wss://cloud.jazz.tools/?key=pets-example-jazz@garden.co";
// "ws://localhost:4200/?key=pets-example-jazz@gcmp.io";

/** Walkthrough: The top-level provider `<Jazz.Provider/>`
*
Expand Down
13 changes: 7 additions & 6 deletions packages/cojson-storage-indexeddb/src/tests/idbNode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ test("Should be able to initialize and load from empty DB", async () => {
Crypto,
);

node.syncManager.addPeer(await IDBStorage.asPeer({ trace: true }));
node.addPeer(await IDBStorage.asPeer({ trace: true }));

console.log("yay!");

const _group = node.createGroup();

await new Promise((resolve) => setTimeout(resolve, 200));

expect(node.syncManager.peers["indexedDB"]).toBeDefined();
expect(LocalNode.peers.get("indexedDB")).toBeDefined();
});

test("Should be able to sync data to database and then load that from a new node", async () => {
Expand All @@ -35,7 +35,7 @@ test("Should be able to sync data to database and then load that from a new node
Crypto,
);

node1.syncManager.addPeer(
node1.addPeer(
await IDBStorage.asPeer({ trace: true, localNodeName: "node1" }),
);

Expand All @@ -55,14 +55,15 @@ test("Should be able to sync data to database and then load that from a new node
Crypto,
);

node2.syncManager.addPeer(
node2.addPeer(
await IDBStorage.asPeer({ trace: true, localNodeName: "node2" }),
);

const map2 = await node2.load(map.id);
if (map2 === "unavailable") {
throw new Error("Map is unavailable");
}

expect(map2.get("hello")).toBe("world");
// TODO fixme
// expect(map2.get("hello")).toBe("world");
expect(false).toBeTruthy();
});
4 changes: 1 addition & 3 deletions packages/cojson-storage-sqlite/src/sqliteClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ export type RawTransactionRow = {

export class SQLiteClient implements DBClientInterface {
private readonly db: DatabaseT;
private readonly toLocalNode: OutgoingSyncQueue;

constructor(db: DatabaseT, toLocalNode: OutgoingSyncQueue) {
constructor(db: DatabaseT) {
this.db = db;
this.toLocalNode = toLocalNode;
}

getCoValue(coValueId: RawCoID): StoredCoValueRow | undefined {
Expand Down
46 changes: 43 additions & 3 deletions packages/cojson-storage-sqlite/src/sqliteNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,45 @@ import {
IncomingSyncStream,
OutgoingSyncQueue,
Peer,
SyncMessage,
cojsonInternals,
} from "cojson";
import { SyncManager, TransactionRow } from "cojson-storage";
import { SQLiteClient } from "./sqliteClient.js";
import {
transformIncomingMessageFromPeer,
transformOutgoingMessageToPeer,
} from "./transformers.js";

/**
* This is to transform outgoing message into older protocol message(s) for backward compatibility
* TODO To be removed after the protocol is updated in the sync server
*/
class LocalNodeWrapper {
constructor(private queue: OutgoingSyncQueue) {}

push(msg: SyncMessage): Promise<unknown> {
const transformedMessages = transformOutgoingMessageToPeer(msg);
transformedMessages.map((transformedMessage) => {
// console.log("🔴 <<<=== SQLite is sending", transformedMessage);
});

return Promise.all(
transformedMessages.map((transformedMessage) => {
return this.queue.push(transformedMessage);
}),
);
}

close() {
return this.queue.close();
}
}

export class SQLiteNode {
// ugly public static var to be deleted after new protocol is in effect on all peers
public static USE_PROTOCOL2 = false;

private readonly syncManager: SyncManager;
private readonly dbClient: SQLiteClient;

Expand All @@ -17,16 +50,23 @@ export class SQLiteNode {
fromLocalNode: IncomingSyncStream,
toLocalNode: OutgoingSyncQueue,
) {
this.dbClient = new SQLiteClient(db, toLocalNode);
this.syncManager = new SyncManager(this.dbClient, toLocalNode);
this.dbClient = new SQLiteClient(db);
this.syncManager = new SyncManager(
this.dbClient,
new LocalNodeWrapper(toLocalNode),
);

const processMessages = async () => {
for await (const msg of fromLocalNode) {
try {
if (msg === "Disconnected" || msg === "PingTimeout") {
throw new Error("Unexpected Disconnected message");
}
await this.syncManager.handleSyncMessage(msg);
// console.log("🟡 <<<=== SQLite is getting", msg);

await this.syncManager.handleSyncMessage(
transformIncomingMessageFromPeer(msg),
);
} catch (e) {
console.error(
new Error(
Expand Down
83 changes: 83 additions & 0 deletions packages/cojson-storage-sqlite/src/transformers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import {
CojsonInternalTypes,
SessionID,
SyncMessage,
unknownDataMessage,
} from "cojson";
import CoValueContent = CojsonInternalTypes.CoValueContent;
import { SQLiteNode } from "./sqliteNode.js";

export const transformOutgoingMessageToPeer = (
msg: SyncMessage,
): SyncMessage[] => {
if (SQLiteNode.USE_PROTOCOL2) {
return [msg];
}

const getSessionsObj = (msg: CoValueContent) =>
Object.entries(msg.new).reduce<{ [sessionID: SessionID]: number }>(
(acc, [session, content]) => {
acc[session as SessionID] =
content.after + content.newTransactions.length;
return acc;
},
{},
);

switch (msg.action) {
case "pull":
// load
return [{ ...msg, action: "load" }];
case "push":
// load + content
return [
{
action: "load",
id: msg.id,
header: true,
sessions: getSessionsObj(msg),
},
{ ...msg, action: "content" },
];
case "data":
if (!msg.known)
return [{ action: "known", id: msg.id, header: false, sessions: {} }];
// known + content => no response expected
return [
{
action: "known",
id: msg.id,
header: true,
sessions: getSessionsObj(msg),
},
{ ...msg, action: "content" },
];
case "ack":
// known => no response expected
return [{ ...msg, action: "known" }];
default:
return [msg];
}
};

export const transformIncomingMessageFromPeer = (
msg: SyncMessage,
): SyncMessage => {
if (SQLiteNode.USE_PROTOCOL2) {
return msg;
}

switch (msg.action) {
case "load":
return { ...msg, action: "pull" };
case "content":
return { ...msg, action: "push" };
case "known":
if (!msg.header) return unknownDataMessage(msg.id);

if (msg.isCorrection) return { ...msg, action: "pull" };
return { ...msg, action: "ack" };
default:
return msg;
}
};
Loading
Loading