upload and download in parallel

This commit is contained in:
fyears 2022-03-06 18:12:49 +08:00
parent 3dfe215103
commit 76e61b6ebf
5 changed files with 156 additions and 32 deletions

View File

@ -71,6 +71,7 @@
"mime-types": "^2.1.33", "mime-types": "^2.1.33",
"nanoid": "^3.1.30", "nanoid": "^3.1.30",
"obsidian": "^0.13.26", "obsidian": "^0.13.26",
"p-queue": "^7.2.0",
"path-browserify": "^1.0.1", "path-browserify": "^1.0.1",
"process": "^0.11.10", "process": "^0.11.10",
"qrcode": "^1.5.0", "qrcode": "^1.5.0",

View File

@ -57,6 +57,7 @@ export interface RemotelySavePluginSettings {
vaultRandomID?: string; vaultRandomID?: string;
autoRunEveryMilliseconds?: number; autoRunEveryMilliseconds?: number;
agreeToUploadExtraMetadata?: boolean; agreeToUploadExtraMetadata?: boolean;
concurrency?: number;
} }
export interface RemoteItem { export interface RemoteItem {

View File

@ -54,6 +54,7 @@ const DEFAULT_SETTINGS: RemotelySavePluginSettings = {
vaultRandomID: "", vaultRandomID: "",
autoRunEveryMilliseconds: -1, autoRunEveryMilliseconds: -1,
agreeToUploadExtraMetadata: false, agreeToUploadExtraMetadata: false,
concurrency: 5,
}; };
interface OAuth2Info { interface OAuth2Info {
@ -230,6 +231,7 @@ export default class RemotelySavePlugin extends Plugin {
deletions, deletions,
(key: string) => self.trash(key), (key: string) => self.trash(key),
this.settings.password, this.settings.password,
this.settings.concurrency,
(i: number, totalCount: number, pathName: string, decision: string) => (i: number, totalCount: number, pathName: string, decision: string) =>
self.setCurrSyncMsg(i, totalCount, pathName, decision) self.setCurrSyncMsg(i, totalCount, pathName, decision)
); );

View File

@ -521,6 +521,27 @@ export class RemotelySaveSettingTab extends PluginSettingTab {
}); });
}); });
const concurrencyDiv = generalDiv.createEl("div");
new Setting(concurrencyDiv)
.setName("Concurrency")
.setDesc(
"How many files do you want to download or upload in parallel at most?"
)
.addDropdown((dropdown) => {
dropdown.addOption("1", "1");
dropdown.addOption("2", "2");
dropdown.addOption("5", "5");
dropdown.addOption("10", "10");
dropdown
.setValue(`${this.plugin.settings.concurrency}`)
.onChange(async (val) => {
const realVal = parseInt(val);
this.plugin.settings.concurrency = realVal;
await this.plugin.saveSettings();
});
});
////////////////////////////////////////////////// //////////////////////////////////////////////////
// below for general chooser (part 1/2) // below for general chooser (part 1/2)
////////////////////////////////////////////////// //////////////////////////////////////////////////

View File

@ -5,6 +5,7 @@ import {
Vault, Vault,
requireApiVersion, requireApiVersion,
} from "obsidian"; } from "obsidian";
import PQueue from "p-queue";
import { import {
RemoteItem, RemoteItem,
SUPPORTED_SERVICES_TYPE, SUPPORTED_SERVICES_TYPE,
@ -891,10 +892,10 @@ export const doActualSync = async (
deletions: DeletionOnRemote[], deletions: DeletionOnRemote[],
localDeleteFunc: any, localDeleteFunc: any,
password: string = "", password: string = "",
concurrency: number = 1,
callbackSyncProcess?: any callbackSyncProcess?: any
) => { ) => {
const mixedStates = syncPlan.mixedStates; const mixedStates = syncPlan.mixedStates;
let i = 0;
const totalCount = sortedKeys.length || 0; const totalCount = sortedKeys.length || 0;
log.debug(`start syncing extra data firstly`); log.debug(`start syncing extra data firstly`);
@ -907,41 +908,139 @@ export const doActualSync = async (
); );
log.debug(`finish syncing extra data firstly`); log.debug(`finish syncing extra data firstly`);
for (let i = 0; i < sortedKeys.length; ++i) { log.debug(`concurrency === ${concurrency}`);
const key = sortedKeys[i]; if (concurrency === 1) {
const val = mixedStates[key]; // run everything in sequence
// good old way
for (let i = 0; i < sortedKeys.length; ++i) {
const key = sortedKeys[i];
const val = mixedStates[key];
log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`); log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`);
if (callbackSyncProcess !== undefined) { if (callbackSyncProcess !== undefined) {
await callbackSyncProcess(i, totalCount, key, val.decision); await callbackSyncProcess(i, totalCount, key, val.decision);
}
await dispatchOperationToActual(
key,
vaultRandomID,
val,
client,
db,
vault,
localDeleteFunc,
password
);
log.debug(`finished ${key}`);
}
} else {
let realCounter = 0;
log.debug(`1. create all folders from shadowest to deepest`);
for (let i = sortedKeys.length - 1; i >= 0; --i) {
const key = sortedKeys[i];
const val = mixedStates[key];
if (val.decision === "skipFolder" || val.decision === "createFolder") {
log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`);
if (callbackSyncProcess !== undefined) {
await callbackSyncProcess(realCounter, totalCount, key, val.decision);
}
realCounter += 1;
await dispatchOperationToActual(
key,
vaultRandomID,
val,
client,
db,
vault,
localDeleteFunc,
password
);
log.debug(`finished ${key}`);
}
} }
await dispatchOperationToActual( log.debug(`2. delete files and folders from deepest to shadowest`);
key, for (let i = 0; i < sortedKeys.length; ++i) {
vaultRandomID, const key = sortedKeys[i];
val, const val = mixedStates[key];
client, if (
db, val.decision === "uploadLocalDelHistToRemoteFolder" ||
vault, val.decision === "keepRemoteDelHistFolder"
localDeleteFunc, ) {
password log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`);
);
log.debug(`finished ${key}`);
// await Promise.all( if (callbackSyncProcess !== undefined) {
// Object.entries(mixedStates).map(async ([k, v]) => await callbackSyncProcess(realCounter, totalCount, key, val.decision);
// dispatchOperationToActual( }
// k as string, realCounter += 1;
// vaultRandomID,
// v as FileOrFolderMixedState, await dispatchOperationToActual(
// client, key,
// db, vaultRandomID,
// vault, val,
// localDeleteFunc, client,
// password db,
// ) vault,
// ) localDeleteFunc,
// ); password
);
log.debug(`finished ${key}`);
}
}
log.debug(
`3. upload or download files in parallel, with the desired concurrency=${concurrency}`
);
const queue = new PQueue({ concurrency: concurrency, autoStart: true });
// const commands: any[] = [];
for (let i = 0; i < sortedKeys.length; ++i) {
const key = sortedKeys[i];
const val = mixedStates[key];
if (
val.decision === "skipUploading" ||
val.decision === "uploadLocalDelHistToRemote" ||
val.decision === "keepRemoteDelHist" ||
val.decision === "uploadLocalToRemote" ||
val.decision === "downloadRemoteToLocal"
) {
const fn = async () => {
log.debug(`start syncing "${key}" with plan ${JSON.stringify(val)}`);
if (callbackSyncProcess !== undefined) {
await callbackSyncProcess(
realCounter,
totalCount,
key,
val.decision
);
realCounter += 1;
}
await dispatchOperationToActual(
key,
vaultRandomID,
val,
client,
db,
vault,
localDeleteFunc,
password
);
log.debug(`finished ${key}`);
};
queue.add(fn);
}
}
await queue.onIdle();
} }
}; };