Skip to content

Commit

Permalink
address feedbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
shaketbaby committed Jan 9, 2024
1 parent 5ad8af7 commit 12c6e99
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 69 deletions.
107 changes: 48 additions & 59 deletions src/explorer/connectionTreeItem.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import * as vscode from 'vscode';
import path from 'path';
import type { StreamProcessor } from 'mongodb-data-service/lib/data-service';
import type {
DataService,
StreamProcessor,
} from 'mongodb-data-service/lib/data-service';

import DatabaseTreeItem from './databaseTreeItem';
import type ConnectionController from '../connectionController';
Expand Down Expand Up @@ -195,70 +198,56 @@ export default class ConnectionTreeItem
return Object.values(this._childrenCache);
}

if (isAtlasStreams) {
const processors = await this.listStreamProcessors();
processors.sort((a: StreamProcessor, b: StreamProcessor) => {
return a.name.localeCompare(b.name);
});

this.cacheIsUpToDate = true;
this._childrenCache = await (isAtlasStreams
? this._buildChildrenCacheForStreams(dataService)
: this._buildChildrenCacheForDatabases(dataService));
this.cacheIsUpToDate = true;
return Object.values(this._childrenCache);
}

const pastChildrenCache = this._childrenCache;
this._childrenCache = {};
processors.forEach((sp) => {
const cachedItem = pastChildrenCache[
sp.name
] as StreamProcessorTreeItem;
// We create a new element here instead of reusing the cached one
// in order to ensure the expanded state is set.
this._childrenCache[sp.name] = new StreamProcessorTreeItem({
dataService,
streamProcessorName: sp.name,
streamProcessorState: sp.state,
isExpanded: cachedItem ? cachedItem.isExpanded : false,
});
private async _buildChildrenCacheForDatabases(dataService: DataService) {
const databases = await this.listDatabases();
databases.sort((a: string, b: string) => a.localeCompare(b));

const newChildrenCache: Record<string, DatabaseTreeItem> = {};

databases.forEach((databaseName: string) => {
const cachedItem = this._childrenCache[databaseName] as DatabaseTreeItem;
// We create a new element here instead of reusing the cached one
// in order to ensure the expanded state is set.
newChildrenCache[databaseName] = new DatabaseTreeItem({
dataService,
databaseName,
isExpanded: cachedItem ? cachedItem.isExpanded : false,
cacheIsUpToDate: cachedItem ? cachedItem.cacheIsUpToDate : false,
childrenCache: cachedItem ? cachedItem.getChildrenCache() : {},
});
} else {
const databases = await this.listDatabases();
databases.sort((a: string, b: string) => {
return a.localeCompare(b);
});

this.cacheIsUpToDate = true;
});

if (!databases) {
this._childrenCache = {};
return [];
}

const pastChildrenCache = this._childrenCache;
this._childrenCache = {};
return newChildrenCache;
}

databases.forEach((name: string) => {
const cachedItem = pastChildrenCache[name] as DatabaseTreeItem;
if (cachedItem) {
// We create a new element here instead of reusing the cached one
// in order to ensure the expanded state is set.
this._childrenCache[name] = new DatabaseTreeItem({
databaseName: name,
dataService,
isExpanded: cachedItem.isExpanded,
cacheIsUpToDate: cachedItem.cacheIsUpToDate,
childrenCache: cachedItem.getChildrenCache(),
});
} else {
this._childrenCache[name] = new DatabaseTreeItem({
databaseName: name,
dataService,
isExpanded: false,
cacheIsUpToDate: false, // Cache is not up to date (no cache).
childrenCache: {}, // No existing cache.
});
}
private async _buildChildrenCacheForStreams(dataService: DataService) {
const processors = await this.listStreamProcessors();
processors.sort((a, b) => a.name.localeCompare(b.name));

const newChildrenCache: Record<string, StreamProcessorTreeItem> = {};

processors.forEach((sp) => {
const cachedItem = this._childrenCache[
sp.name
] as StreamProcessorTreeItem;
// We create a new element here instead of reusing the cached one
// in order to ensure the expanded state is set.
newChildrenCache[sp.name] = new StreamProcessorTreeItem({
dataService,
streamProcessorName: sp.name,
streamProcessorState: sp.state,
isExpanded: cachedItem ? cachedItem.isExpanded : false,
});
}
});

return Object.values(this._childrenCache);
return newChildrenCache;
}

onDidCollapse(): void {
Expand Down
4 changes: 4 additions & 0 deletions src/mdbExtensionController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,10 @@ export default class MDBExtensionController implements vscode.Disposable {
return true;
}
);
this.registerAtlasStreamsTreeViewCommands();
}

registerAtlasStreamsTreeViewCommands() {
this.registerCommand(
EXTENSION_COMMANDS.MDB_ADD_STREAM_PROCESSOR,
async (element: ConnectionTreeItem): Promise<boolean> => {
Expand Down
10 changes: 5 additions & 5 deletions src/templates/playgroundCreateStreamProcessorTemplate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@ const template = `/* global sp */
// MongoDB Playground
// Use Ctrl+Space inside a snippet or a string literal to trigger completions.
// create a new stream processor
// Create a new stream processor.
/* sp.createStreamProcessor('newStreamProcessor', [
{
$source: {
"connectionName": "myKafka",
"topic": "source"
connectionName: 'myKafka',
topic: 'source'
}
},
{
$match: { temperature: 46 }
},
{
$emit: {
"connectionName": "mySink",
"topic" : "target",
connectionName: 'mySink',
topic : 'target',
}
}
]); */
Expand Down
9 changes: 5 additions & 4 deletions src/templates/playgroundStreamsTemplate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ const template = `/* global sp */
// For more documentation on playgrounds please refer to
// https://www.mongodb.com/docs/mongodb-vscode/playgrounds/
// Connection can be added in Atlas UI or using Atlas CLI. See doc linked below for more info
// A connection can be added in the Atlas UI or using the Atlas CLI.
// See the documentation linked below for more information.
// https://www.mongodb.com/docs/atlas/atlas-sp/manage-processing-instance/#add-a-connection-to-the-connection-registry
// List avialable connections
// List available connections
sp.listConnections();
// Use process to quickly test out the stream processing
sp.process([
{
$source:{
"connectionName": "sample_stream_solar"
$source: {
connectionName: 'sample_stream_solar'
}
}
]);
Expand Down
2 changes: 1 addition & 1 deletion src/test/suite/mdbExtensionController.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1416,7 +1416,7 @@ suite('MDBExtensionController Test Suite', function () {
);

const content = fakeCreatePlaygroundFileWithContent.firstCall.args[0];
assert(content.includes('// create a new stream processor'));
assert(content.includes('// Create a new stream processor.'));
assert(content.includes("sp.createStreamProcessor('newStreamProcessor'"));
});

Expand Down

0 comments on commit 12c6e99

Please sign in to comment.