diff --git a/package.json b/package.json index 1bd62e4..7b4cebd 100644 --- a/package.json +++ b/package.json @@ -71,6 +71,7 @@ "mime-types": "^2.1.33", "nanoid": "^3.1.30", "obsidian": "^0.13.26", + "p-queue": "^7.2.0", "path-browserify": "^1.0.1", "process": "^0.11.10", "qrcode": "^1.5.0", diff --git a/src/baseTypes.ts b/src/baseTypes.ts index 6d25d82..e2c3ce4 100644 --- a/src/baseTypes.ts +++ b/src/baseTypes.ts @@ -57,6 +57,7 @@ export interface RemotelySavePluginSettings { vaultRandomID?: string; autoRunEveryMilliseconds?: number; agreeToUploadExtraMetadata?: boolean; + concurrency?: number; } export interface RemoteItem { diff --git a/src/main.ts b/src/main.ts index 0d2dc70..5156a71 100644 --- a/src/main.ts +++ b/src/main.ts @@ -54,6 +54,7 @@ const DEFAULT_SETTINGS: RemotelySavePluginSettings = { vaultRandomID: "", autoRunEveryMilliseconds: -1, agreeToUploadExtraMetadata: false, + concurrency: 5, }; interface OAuth2Info { @@ -230,6 +231,7 @@ export default class RemotelySavePlugin extends Plugin { deletions, (key: string) => self.trash(key), this.settings.password, + this.settings.concurrency, (i: number, totalCount: number, pathName: string, decision: string) => self.setCurrSyncMsg(i, totalCount, pathName, decision) ); diff --git a/src/settings.ts b/src/settings.ts index 23a0fa0..97aa9f3 100644 --- a/src/settings.ts +++ b/src/settings.ts @@ -521,6 +521,27 @@ export class RemotelySaveSettingTab extends PluginSettingTab { }); }); + const concurrencyDiv = generalDiv.createEl("div"); + new Setting(concurrencyDiv) + .setName("Concurrency") + .setDesc( + "How many files do you want to download or upload in parallel at most?" + ) + .addDropdown((dropdown) => { + dropdown.addOption("1", "1"); + dropdown.addOption("2", "2"); + dropdown.addOption("5", "5"); + dropdown.addOption("10", "10"); + + dropdown + .setValue(`${this.plugin.settings.concurrency}`) + .onChange(async (val) => { + const realVal = parseInt(val); + this.plugin.settings.concurrency = realVal; + await this.plugin.saveSettings(); + }); + }); + ////////////////////////////////////////////////// // below for general chooser (part 1/2) ////////////////////////////////////////////////// diff --git a/src/sync.ts b/src/sync.ts index 219f54d..8abbae6 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -5,6 +5,7 @@ import { Vault, requireApiVersion, } from "obsidian"; +import PQueue from "p-queue"; import { RemoteItem, SUPPORTED_SERVICES_TYPE, @@ -891,10 +892,10 @@ export const doActualSync = async ( deletions: DeletionOnRemote[], localDeleteFunc: any, password: string = "", + concurrency: number = 1, callbackSyncProcess?: any ) => { const mixedStates = syncPlan.mixedStates; - let i = 0; const totalCount = sortedKeys.length || 0; log.debug(`start syncing extra data firstly`); @@ -907,41 +908,139 @@ export const doActualSync = async ( ); log.debug(`finish syncing extra data firstly`); - for (let i = 0; i < sortedKeys.length; ++i) { - const key = sortedKeys[i]; - const val = mixedStates[key]; + log.debug(`concurrency === ${concurrency}`); + if (concurrency === 1) { + // run everything in sequence + // good old way + for (let i = 0; i < sortedKeys.length; ++i) { + const key = sortedKeys[i]; + const val = mixedStates[key]; - log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`); + log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`); - if (callbackSyncProcess !== undefined) { - await callbackSyncProcess(i, totalCount, key, val.decision); + if (callbackSyncProcess !== undefined) { + await callbackSyncProcess(i, totalCount, key, val.decision); + } + + await dispatchOperationToActual( + key, + vaultRandomID, + val, + client, + db, + vault, + localDeleteFunc, + password + ); + log.debug(`finished ${key}`); + } + } else { + let realCounter = 0; + + log.debug(`1. create all folders from shadowest to deepest`); + for (let i = sortedKeys.length - 1; i >= 0; --i) { + const key = sortedKeys[i]; + const val = mixedStates[key]; + + if (val.decision === "skipFolder" || val.decision === "createFolder") { + log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`); + + if (callbackSyncProcess !== undefined) { + await callbackSyncProcess(realCounter, totalCount, key, val.decision); + } + realCounter += 1; + + await dispatchOperationToActual( + key, + vaultRandomID, + val, + client, + db, + vault, + localDeleteFunc, + password + ); + log.debug(`finished ${key}`); + } } - await dispatchOperationToActual( - key, - vaultRandomID, - val, - client, - db, - vault, - localDeleteFunc, - password - ); - log.debug(`finished ${key}`); + log.debug(`2. delete files and folders from deepest to shadowest`); + for (let i = 0; i < sortedKeys.length; ++i) { + const key = sortedKeys[i]; + const val = mixedStates[key]; + if ( + val.decision === "uploadLocalDelHistToRemoteFolder" || + val.decision === "keepRemoteDelHistFolder" + ) { + log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`); - // await Promise.all( - // Object.entries(mixedStates).map(async ([k, v]) => - // dispatchOperationToActual( - // k as string, - // vaultRandomID, - // v as FileOrFolderMixedState, - // client, - // db, - // vault, - // localDeleteFunc, - // password - // ) - // ) - // ); + if (callbackSyncProcess !== undefined) { + await callbackSyncProcess(realCounter, totalCount, key, val.decision); + } + realCounter += 1; + + await dispatchOperationToActual( + key, + vaultRandomID, + val, + client, + db, + vault, + localDeleteFunc, + password + ); + log.debug(`finished ${key}`); + } + } + + log.debug( + `3. upload or download files in parallel, with the desired concurrency=${concurrency}` + ); + const queue = new PQueue({ concurrency: concurrency, autoStart: true }); + + // const commands: any[] = []; + + for (let i = 0; i < sortedKeys.length; ++i) { + const key = sortedKeys[i]; + const val = mixedStates[key]; + if ( + val.decision === "skipUploading" || + val.decision === "uploadLocalDelHistToRemote" || + val.decision === "keepRemoteDelHist" || + val.decision === "uploadLocalToRemote" || + val.decision === "downloadRemoteToLocal" + ) { + const fn = async () => { + log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`); + + if (callbackSyncProcess !== undefined) { + await callbackSyncProcess( + realCounter, + totalCount, + key, + val.decision + ); + + realCounter += 1; + } + + await dispatchOperationToActual( + key, + vaultRandomID, + val, + client, + db, + vault, + localDeleteFunc, + password + ); + + log.debug(`finished ${key}`); + }; + queue.add(fn); + } + } + + await queue.onIdle(); } };