[Code] improve project status polling management (#36841) (#37171)

* [Code] improve project status polling management

* [Code] Use make repository status polling cancellable

* change back to 1000ms polling interval

* deduplicate repository status polling runner before run
This commit is contained in:
Mengwei Ding 2019-05-26 18:04:10 -07:00 committed by GitHub
parent 35050f513b
commit f71384e9f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 328 additions and 172 deletions

View file

@ -5,16 +5,14 @@
*/
import { createAction } from 'redux-actions';
import { RepositoryUri } from '../../model';
import { RepoStatus } from '../reducers';
export const loadStatus = createAction<string>('LOAD STATUS');
export const loadStatusSuccess = createAction<any>('LOAD STATUS SUCCESS');
export const loadStatusFailed = createAction<string>('LOAD STATUS FAILED');
export const pollRepoCloneStatus = createAction<any>('POLL CLONE STATUS');
export const pollRepoIndexStatus = createAction<any>('POLL INDEX STATUS');
export const pollRepoDeleteStatus = createAction<any>('POLL DELETE STATUS');
export const loadRepo = createAction<string>('LOAD REPO');
export const loadRepoSuccess = createAction<any>('LOAD REPO SUCCESS');
export const loadRepoFailed = createAction<any>('LOAD REPO FAILED');
@ -22,3 +20,11 @@ export const loadRepoFailed = createAction<any>('LOAD REPO FAILED');
export const updateCloneProgress = createAction<RepoStatus>('UPDATE CLONE PROGRESS');
export const updateIndexProgress = createAction<RepoStatus>('UPDATE INDEX PROGRESS');
export const updateDeleteProgress = createAction<RepoStatus>('UPDATE DELETE PROGRESS');
export const pollRepoCloneStatusStart = createAction<any>('POLL CLONE STATUS START');
export const pollRepoIndexStatusStart = createAction<any>('POLL INDEX STATUS START');
export const pollRepoDeleteStatusStart = createAction<any>('POLL DELETE STATUS START');
export const pollRepoCloneStatusStop = createAction<RepositoryUri>('POLL CLONE STATUS STOP');
export const pollRepoIndexStatusStop = createAction<RepositoryUri>('POLL INDEX STATUS STOP');
export const pollRepoDeleteStatusStop = createAction<RepositoryUri>('POLL DELETE STATUS STOP');

View file

@ -5,17 +5,18 @@
*/
import produce from 'immer';
import { handleActions } from 'redux-actions';
import { Action, handleActions } from 'redux-actions';
import { RepositoryUri, WorkerReservedProgress } from '../../model';
import {
deleteRepoFinished,
loadStatus,
loadStatusFailed,
loadStatusSuccess,
updateCloneProgress,
updateDeleteProgress,
updateIndexProgress,
} from '../actions/status';
} from '../actions';
export enum RepoState {
CLONING,
@ -179,6 +180,10 @@ export const status = handleActions(
};
}
}),
[String(deleteRepoFinished)]: (state: StatusState, action: Action<any>) =>
produce<StatusState>(state, (draft: StatusState) => {
delete draft.status[action.payload];
}),
},
initialState
);

View file

@ -25,9 +25,11 @@ import { watchLoadConfigs, watchSwitchProjectLanguageServer } from './project_co
import {
watchLoadRepoListStatus,
watchLoadRepoStatus,
watchPollingRepoStatus,
watchRepoCloneStatusPolling,
watchRepoDeleteStatusPolling,
watchRepoIndexStatusPolling,
watchResetPollingStatus,
} from './project_status';
import {
watchAdminRouteChange,
@ -83,8 +85,14 @@ export function* rootSaga() {
yield fork(watchLoadConfigs);
yield fork(watchLoadRepoListStatus);
yield fork(watchLoadRepoStatus);
// Repository status polling sagas begin
yield fork(watchPollingRepoStatus);
yield fork(watchResetPollingStatus);
yield fork(watchRepoDeleteStatusPolling);
yield fork(watchRepoIndexStatusPolling);
yield fork(watchRepoCloneStatusPolling);
// Repository status polling sagas end
yield fork(watchRepoScopeSearch);
}

View file

@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Action } from 'redux-actions';
import { Match, routeChange } from '../actions';
import { PathTypes } from '../common/types';
import * as ROUTES from '../components/routes';

View file

@ -8,10 +8,23 @@ import moment from 'moment';
import { Action } from 'redux-actions';
import { delay } from 'redux-saga';
import { kfetch } from 'ui/kfetch';
import { all, call, put, takeEvery, takeLatest } from 'redux-saga/effects';
import {
all,
call,
cancel,
cancelled,
fork,
put,
select,
take,
takeEvery,
takeLatest,
} from 'redux-saga/effects';
import { RepositoryUtils } from '../../common/repository_utils';
import { Repository, RepositoryUri, WorkerReservedProgress } from '../../model';
import * as ROUTES from '../components/routes';
import { allStatusSelector, repoUriSelector, routeSelector } from '../selectors';
import {
deleteRepo,
fetchReposSuccess,
@ -20,14 +33,26 @@ import {
loadRepoSuccess,
loadStatusFailed,
loadStatusSuccess,
pollRepoCloneStatus,
pollRepoDeleteStatus,
pollRepoIndexStatus,
pollRepoCloneStatusStart,
pollRepoDeleteStatusStart,
pollRepoIndexStatusStart,
routeChange,
updateCloneProgress,
updateDeleteProgress,
updateIndexProgress,
pollRepoCloneStatusStop,
pollRepoDeleteStatusStop,
pollRepoIndexStatusStop,
} from '../actions';
import { cloneCompletedPattern } from './status';
import { RepoState } from '../reducers';
import {
cloneCompletedPattern,
cloneRepoStatusPollingStopPattern,
deleteRepoStatusPollingStopPattern,
indexRepoStatusPollingStopPattern,
} from './status';
const REPO_STATUS_POLLING_FREQ_MS = 1000;
function fetchStatus(repoUri: string) {
return kfetch({
@ -79,199 +104,285 @@ function isInProgress(progress: number): boolean {
return progress < WorkerReservedProgress.COMPLETED && progress >= WorkerReservedProgress.INIT;
}
function* handleRepoListStatusLoaded(action: Action<any>) {
const statuses = action.payload;
for (const repoUri of Object.keys(statuses)) {
const status = statuses[repoUri];
if (status.deleteStatus) {
yield put(pollRepoDeleteStatus(repoUri));
} else if (status.indexStatus) {
if (isInProgress(status.indexStatus.progress)) {
yield put(pollRepoIndexStatus(repoUri));
}
} else if (status.gitStatus) {
if (isInProgress(status.gitStatus.progress)) {
yield put(pollRepoCloneStatus(repoUri));
}
}
// Try to trigger the repository status polling based on its current state.
function* triggerPollRepoStatus(state: RepoState, repoUri: RepositoryUri) {
switch (state) {
case RepoState.CLONING:
yield put(pollRepoCloneStatusStart(repoUri));
break;
case RepoState.INDEXING:
yield put(pollRepoIndexStatusStart(repoUri));
break;
case RepoState.DELETING:
yield put(pollRepoDeleteStatusStart(repoUri));
break;
default:
break;
}
}
// `fetchReposSuccess` is issued by the repository admin page.
export function* watchLoadRepoListStatus() {
yield takeEvery(String(fetchReposSuccess), handleRepoListStatus);
// After all the status of all the repositoriesin the list has been loaded,
// start polling status only for those still in progress.
yield takeEvery(String(loadStatusSuccess), handleRepoListStatusLoaded);
function* handleReposStatusLoaded(action: Action<any>) {
const route = yield select(routeSelector);
const allStatuses = yield select(allStatusSelector);
if (route.path === ROUTES.ADMIN) {
// Load all repository status on admin page
for (const repoUri of Object.keys(allStatuses)) {
const status = allStatuses[repoUri];
yield triggerPollRepoStatus(status.state, repoUri);
}
} else if (route.path === ROUTES.MAIN || route.path === ROUTES.MAIN_ROOT) {
// Load current repository status on main page
const currentUri = yield select(repoUriSelector);
const status = allStatuses[currentUri];
yield triggerPollRepoStatus(status.state, currentUri);
}
}
export function* watchLoadRepoListStatus() {
// After all the repositories have been loaded, we should start load
// their status.
yield takeEvery(String(fetchReposSuccess), handleRepoListStatus);
}
// `loadRepoSuccess` is issued by the main source view page.
export function* watchLoadRepoStatus() {
// `loadRepoSuccess` is issued by the main source view page.
yield takeLatest(String(loadRepoSuccess), handleRepoStatus);
}
const REPO_STATUS_POLLING_FREQ_MS = 1000;
function createRepoStatusPollingHandler(
parseRepoUri: (_: Action<any>) => RepositoryUri,
handleStatus: any,
pollingActionFunction: any
) {
return function*(a: Action<any>) {
yield call(delay, REPO_STATUS_POLLING_FREQ_MS);
const repoUri = parseRepoUri(a);
let keepPolling = false;
try {
const repoStatus = yield call(fetchStatus, repoUri);
keepPolling = yield handleStatus(repoStatus, repoUri);
} catch (err) {
// Fetch repository status error. Ignore and keep trying.
keepPolling = true;
}
if (keepPolling) {
yield put(pollingActionFunction(repoUri));
}
};
export function* watchPollingRepoStatus() {
// After the status of the repos or a given repo has been loaded, check
// if we need to start polling the status.
yield takeEvery(String(loadStatusSuccess), handleReposStatusLoaded);
}
const handleRepoCloneStatusPolling = createRepoStatusPollingHandler(
(action: Action<any>) => {
if (action.type === String(importRepo)) {
const repoUrl: string = action.payload;
return RepositoryUtils.buildRepository(repoUrl).uri;
} else if (action.type === String(pollRepoCloneStatus)) {
return action.payload;
}
},
function*(status: any, repoUri: RepositoryUri) {
if (
// Repository has been deleted during the clone
(!status.gitStatus && !status.indexStatus && !status.deleteStatus) ||
// Repository is in delete during the clone
status.deleteStatus
) {
// Stop polling git progress
return false;
}
function* handleResetPollingStatus(action: Action<any>) {
const statuses = yield select(allStatusSelector);
for (const repoUri of Object.keys(statuses)) {
yield put(pollRepoCloneStatusStop(repoUri));
yield put(pollRepoIndexStatusStop(repoUri));
yield put(pollRepoDeleteStatusStop(repoUri));
}
}
if (status.gitStatus) {
const { progress, cloneProgress, errorMessage, timestamp } = status.gitStatus;
yield put(
updateCloneProgress({
progress,
timestamp: moment(timestamp).toDate(),
repoUri,
errorMessage,
cloneProgress,
})
);
// Keep polling if the progress is not 100% yet.
return isInProgress(progress);
} else {
// Keep polling if the indexStatus has not been persisted yet.
return true;
}
},
pollRepoCloneStatus
);
export function* watchResetPollingStatus() {
// Stop all the repository status polling runners when route changes.
yield takeEvery(routeChange, handleResetPollingStatus);
}
const parseCloneStatusPollingRequest = (action: Action<any>) => {
if (action.type === String(importRepo)) {
const repoUrl: string = action.payload;
return RepositoryUtils.buildRepository(repoUrl).uri;
} else if (action.type === String(pollRepoCloneStatusStart)) {
return action.payload;
}
};
const handleRepoCloneStatusProcess = function*(status: any, repoUri: RepositoryUri) {
if (
// Repository has been deleted during the clone
(!status.gitStatus && !status.indexStatus && !status.deleteStatus) ||
// Repository is in delete during the clone
status.deleteStatus
) {
// Stop polling git progress
return false;
}
if (status.gitStatus) {
const { progress, cloneProgress, errorMessage, timestamp } = status.gitStatus;
yield put(
updateCloneProgress({
progress,
timestamp: moment(timestamp).toDate(),
repoUri,
errorMessage,
cloneProgress,
})
);
// Keep polling if the progress is not 100% yet.
return isInProgress(progress);
} else {
// Keep polling if the indexStatus has not been persisted yet.
return true;
}
};
export function* watchRepoCloneStatusPolling() {
// The repository clone status polling will be triggered by:
// * user click import repository
// * repeating pollRepoCloneStatus action by the poller itself.
yield takeEvery([String(importRepo), String(pollRepoCloneStatus)], handleRepoCloneStatusPolling);
// * repository status has been loaded and it's in cloning
yield takeEvery(
[String(importRepo), String(pollRepoCloneStatusStart)],
pollRepoCloneStatusRunner
);
}
const handleRepoIndexStatusPolling = createRepoStatusPollingHandler(
(action: Action<any>) => {
if (action.type === String(indexRepo) || action.type === String(pollRepoIndexStatus)) {
return action.payload;
} else if (action.type === String(updateCloneProgress)) {
return action.payload.repoUri;
}
},
function*(status: any, repoUri: RepositoryUri) {
if (
// Repository has been deleted during the index
(!status.gitStatus && !status.indexStatus && !status.deleteStatus) ||
// Repository is in delete during the index
status.deleteStatus
) {
// Stop polling index progress
return false;
}
const parseIndexStatusPollingRequest = (action: Action<any>) => {
if (action.type === String(indexRepo) || action.type === String(pollRepoIndexStatusStart)) {
return action.payload;
} else if (action.type === String(updateCloneProgress)) {
return action.payload.repoUri;
}
};
if (status.indexStatus) {
yield put(
updateIndexProgress({
progress: status.indexStatus.progress,
timestamp: moment(status.indexStatus.timestamp).toDate(),
repoUri,
})
);
// Keep polling if the progress is not 100% yet.
return isInProgress(status.indexStatus.progress);
} else {
// Keep polling if the indexStatus has not been persisted yet.
return true;
}
},
pollRepoIndexStatus
);
const handleRepoIndexStatusProcess = function*(status: any, repoUri: RepositoryUri) {
if (
// Repository has been deleted during the index
(!status.gitStatus && !status.indexStatus && !status.deleteStatus) ||
// Repository is in delete during the index
status.deleteStatus
) {
// Stop polling index progress
return false;
}
if (status.indexStatus) {
yield put(
updateIndexProgress({
progress: status.indexStatus.progress,
timestamp: moment(status.indexStatus.timestamp).toDate(),
repoUri,
})
);
// Keep polling if the progress is not 100% yet.
return isInProgress(status.indexStatus.progress);
} else {
// Keep polling if the indexStatus has not been persisted yet.
return true;
}
};
export function* watchRepoIndexStatusPolling() {
// The repository index status polling will be triggered by:
// * user click index repository
// * clone is done
// * repeating pollRepoIndexStatus action by the poller itself.
// * repository status has been loaded and it's in indexing
yield takeEvery(
[String(indexRepo), cloneCompletedPattern, String(pollRepoIndexStatus)],
handleRepoIndexStatusPolling
[String(indexRepo), cloneCompletedPattern, String(pollRepoIndexStatusStart)],
pollRepoIndexStatusRunner
);
}
const handleRepoDeleteStatusPolling = createRepoStatusPollingHandler(
(action: Action<any>) => {
return action.payload;
},
function*(status: any, repoUri: RepositoryUri) {
if (!status.gitStatus && !status.indexStatus && !status.deleteStatus) {
// If all the statuses cannot be found, this indicates the the repository has been successfully
// removed.
yield put(
updateDeleteProgress({
progress: WorkerReservedProgress.COMPLETED,
repoUri,
})
);
return false;
}
const parseDeleteStatusPollingRequest = (action: Action<any>) => {
return action.payload;
};
if (status.deleteStatus) {
yield put(
updateDeleteProgress({
progress: status.deleteStatus.progress,
timestamp: moment(status.deleteStatus.timestamp).toDate(),
repoUri,
})
);
return isInProgress(status.deleteStatus.progress);
} else {
// Keep polling if the deleteStatus has not been persisted yet.
return true;
}
},
pollRepoDeleteStatus
);
const handleRepoDeleteStatusProcess = function*(status: any, repoUri: RepositoryUri) {
if (!status.gitStatus && !status.indexStatus && !status.deleteStatus) {
// If all the statuses cannot be found, this indicates the the repository has been successfully
// removed.
yield put(
updateDeleteProgress({
progress: WorkerReservedProgress.COMPLETED,
repoUri,
})
);
return false;
}
if (status.deleteStatus) {
yield put(
updateDeleteProgress({
progress: status.deleteStatus.progress,
timestamp: moment(status.deleteStatus.timestamp).toDate(),
repoUri,
})
);
return isInProgress(status.deleteStatus.progress);
} else {
// Keep polling if the deleteStatus has not been persisted yet.
return true;
}
};
export function* watchRepoDeleteStatusPolling() {
// The repository delete status polling will be triggered by:
// * user click delete repository
// * repeating pollRepoDeleteStatus action by the poller itself.
// * repository status has been loaded and it's in deleting
yield takeEvery(
[String(deleteRepo), String(pollRepoDeleteStatus)],
handleRepoDeleteStatusPolling
[String(deleteRepo), String(pollRepoDeleteStatusStart)],
pollRepoDeleteStatusRunner
);
}
function createRepoStatusPollingRun(handleStatus: any, pollingStopActionFunction: any) {
return function*(repoUri: RepositoryUri) {
try {
while (true) {
// Delay at the beginning to allow some time for the server to consume the
// queue task.
yield call(delay, REPO_STATUS_POLLING_FREQ_MS);
const repoStatus = yield call(fetchStatus, repoUri);
const keepPolling = yield handleStatus(repoStatus, repoUri);
if (!keepPolling) {
yield put(pollingStopActionFunction(repoUri));
}
}
} finally {
if (yield cancelled()) {
// Do nothing here now.
}
}
};
}
function createRepoStatusPollingRunner(
parseRepoUri: (_: Action<any>) => RepositoryUri,
pollStatusRun: any,
pollingStopActionFunction: any,
pollingStopActionFunctionPattern: any
) {
return function*(action: Action<any>) {
const repoUri = parseRepoUri(action);
// Cancel existing runner to deduplicate the polling
yield put(pollingStopActionFunction(repoUri));
// Make a fork to run the repo index status polling
const task = yield fork(pollStatusRun, repoUri);
// Wait for the cancellation task
yield take(pollingStopActionFunctionPattern(repoUri));
// Cancell the task
yield cancel(task);
};
}
const runPollRepoCloneStatus = createRepoStatusPollingRun(
handleRepoCloneStatusProcess,
pollRepoCloneStatusStop
);
const runPollRepoIndexStatus = createRepoStatusPollingRun(
handleRepoIndexStatusProcess,
pollRepoIndexStatusStop
);
const runPollRepoDeleteStatus = createRepoStatusPollingRun(
handleRepoDeleteStatusProcess,
pollRepoDeleteStatusStop
);
const pollRepoCloneStatusRunner = createRepoStatusPollingRunner(
parseCloneStatusPollingRequest,
runPollRepoCloneStatus,
pollRepoCloneStatusStop,
cloneRepoStatusPollingStopPattern
);
const pollRepoIndexStatusRunner = createRepoStatusPollingRunner(
parseIndexStatusPollingRequest,
runPollRepoIndexStatus,
pollRepoIndexStatusStop,
indexRepoStatusPollingStopPattern
);
const pollRepoDeleteStatusRunner = createRepoStatusPollingRunner(
parseDeleteStatusPollingRequest,
runPollRepoDeleteStatus,
pollRepoDeleteStatusStop,
deleteRepoStatusPollingStopPattern
);

View file

@ -6,13 +6,16 @@
import { Action } from 'redux-actions';
import { put, select, takeEvery } from 'redux-saga/effects';
import { WorkerReservedProgress } from '../../model';
import { RepositoryUri, WorkerReservedProgress } from '../../model';
import {
deleteRepoFinished,
Match,
routeChange,
updateCloneProgress,
updateDeleteProgress,
pollRepoCloneStatusStop,
pollRepoDeleteStatusStop,
pollRepoIndexStatusStop,
} from '../actions';
import * as ROUTES from '../components/routes';
import { RootState } from '../reducers';
@ -27,6 +30,24 @@ const deleteCompletedPattern = (action: Action<any>) =>
action.type === String(updateDeleteProgress) &&
action.payload.progress === WorkerReservedProgress.COMPLETED;
export const cloneRepoStatusPollingStopPattern = (repoUri: RepositoryUri) => {
return (action: Action<any>) => {
return action.type === String(pollRepoCloneStatusStop) && action.payload === repoUri;
};
};
export const indexRepoStatusPollingStopPattern = (repoUri: RepositoryUri) => {
return (action: Action<any>) => {
return action.type === String(pollRepoIndexStatusStop) && action.payload === repoUri;
};
};
export const deleteRepoStatusPollingStopPattern = (repoUri: RepositoryUri) => {
return (action: Action<any>) => {
return action.type === String(pollRepoDeleteStatusStop) && action.payload === repoUri;
};
};
function* handleRepoCloneSuccess() {
const match: Match = yield select(matchSelector);
if (match.path === ROUTES.MAIN || match.path === ROUTES.MAIN_ROOT) {

View file

@ -35,10 +35,14 @@ export const repoUriSelector = (state: RootState) => {
return `${resource}/${org}/${repo}`;
};
export const routeSelector = (state: RootState) => state.route.match;
export const statusSelector = (state: RootState, repoUri: RepositoryUri) => {
return state.status.status[repoUri];
};
export const allStatusSelector = (state: RootState) => state.status.status;
export const treeCommitsSelector = (state: RootState) => {
const path = state.file.currentPath;
if (path === '') {