mirror of
https://github.com/elastic/kibana.git
synced 2025-04-24 17:59:23 -04:00
[Ingest Manager] Support both zip & tar archives from Registry (#76197)
* Quick pass at restoring support for both zip & tar Restored unzip functions from https://github.com/elastic/kibana/pull/43764 Persist the `download` value returned by EPR (e.g. `/epr/system/system-0.5.3.zip` or `/epr/system/system-0.5.3.tar.gz`) as "archive key" for a package name/version combo. The same name&version should return the same archive. The value initially given by the registry. Based on that value, we decide which decompression to use. * Use template literal vs JSON.stringify for keygen * Factor unzip/untar logic out to getBufferExtractor * Add tests for getBufferExtractor * Replace `[aA]rchiveKey*` with `[aA]rchiveLocation*` * Include given name & version in error message Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
parent
030d5e1390
commit
9a7c418327
5 changed files with 99 additions and 24 deletions
|
@ -144,6 +144,7 @@
|
|||
"@kbn/test-subj-selector": "0.2.1",
|
||||
"@kbn/ui-framework": "1.0.0",
|
||||
"@kbn/ui-shared-deps": "1.0.0",
|
||||
"@types/yauzl": "^2.9.1",
|
||||
"JSONStream": "1.3.5",
|
||||
"abortcontroller-polyfill": "^1.4.0",
|
||||
"accept": "3.0.2",
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
import { pkgToPkgKey } from './index';
|
||||
|
||||
const cache: Map<string, Buffer> = new Map();
|
||||
export const cacheGet = (key: string) => cache.get(key);
|
||||
|
@ -10,4 +11,10 @@ export const cacheSet = (key: string, value: Buffer) => cache.set(key, value);
|
|||
export const cacheHas = (key: string) => cache.has(key);
|
||||
export const cacheClear = () => cache.clear();
|
||||
export const cacheDelete = (key: string) => cache.delete(key);
|
||||
export const getCacheKey = (key: string) => key + '.tar.gz';
|
||||
|
||||
const archiveLocationCache: Map<string, string> = new Map();
|
||||
export const getArchiveLocation = (name: string, version: string) =>
|
||||
archiveLocationCache.get(pkgToPkgKey({ name, version }));
|
||||
|
||||
export const setArchiveLocation = (name: string, version: string, location: string) =>
|
||||
archiveLocationCache.set(pkgToPkgKey({ name, version }), location);
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
|
||||
import tar from 'tar';
|
||||
import yauzl from 'yauzl';
|
||||
import { bufferToStream, streamToBuffer } from './streams';
|
||||
|
||||
export interface ArchiveEntry {
|
||||
|
@ -30,3 +31,40 @@ export async function untarBuffer(
|
|||
deflatedStream.pipe(inflateStream);
|
||||
});
|
||||
}
|
||||
|
||||
export async function unzipBuffer(
|
||||
buffer: Buffer,
|
||||
filter = (entry: ArchiveEntry): boolean => true,
|
||||
onEntry = (entry: ArchiveEntry): void => {}
|
||||
): Promise<void> {
|
||||
const zipfile = await yauzlFromBuffer(buffer, { lazyEntries: true });
|
||||
zipfile.readEntry();
|
||||
zipfile.on('entry', async (entry: yauzl.Entry) => {
|
||||
const path = entry.fileName;
|
||||
if (!filter({ path })) return zipfile.readEntry();
|
||||
|
||||
const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer);
|
||||
onEntry({ buffer: entryBuffer, path });
|
||||
zipfile.readEntry();
|
||||
});
|
||||
return new Promise((resolve, reject) => zipfile.on('end', resolve).on('error', reject));
|
||||
}
|
||||
|
||||
function yauzlFromBuffer(buffer: Buffer, opts: yauzl.Options): Promise<yauzl.ZipFile> {
|
||||
return new Promise((resolve, reject) =>
|
||||
yauzl.fromBuffer(buffer, opts, (err?: Error, handle?: yauzl.ZipFile) =>
|
||||
err ? reject(err) : resolve(handle)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
function getZipReadStream(
|
||||
zipfile: yauzl.ZipFile,
|
||||
entry: yauzl.Entry
|
||||
): Promise<NodeJS.ReadableStream> {
|
||||
return new Promise((resolve, reject) =>
|
||||
zipfile.openReadStream(entry, (err?: Error, readStream?: NodeJS.ReadableStream) =>
|
||||
err ? reject(err) : resolve(readStream)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
|
|
@ -5,7 +5,17 @@
|
|||
*/
|
||||
|
||||
import { AssetParts } from '../../../types';
|
||||
import { pathParts, splitPkgKey } from './index';
|
||||
import { getBufferExtractor, pathParts, splitPkgKey } from './index';
|
||||
import { getArchiveLocation } from './cache';
|
||||
import { untarBuffer, unzipBuffer } from './extract';
|
||||
|
||||
jest.mock('./cache', () => {
|
||||
return {
|
||||
getArchiveLocation: jest.fn(),
|
||||
};
|
||||
});
|
||||
|
||||
const mockedGetArchiveLocation = getArchiveLocation as jest.Mock;
|
||||
|
||||
const testPaths = [
|
||||
{
|
||||
|
@ -80,3 +90,21 @@ describe('splitPkgKey tests', () => {
|
|||
expect(pkgVersion).toBe('0.13.0-alpha.1+abcd');
|
||||
});
|
||||
});
|
||||
|
||||
describe('getBufferExtractor', () => {
|
||||
it('throws if the archive has not been downloaded/cached yet', () => {
|
||||
expect(() => getBufferExtractor('missing', '1.2.3')).toThrow('no archive location');
|
||||
});
|
||||
|
||||
it('returns unzipBuffer if the archive key ends in .zip', () => {
|
||||
mockedGetArchiveLocation.mockImplementation(() => '.zip');
|
||||
const extractor = getBufferExtractor('will-use-mocked-key', 'a.b.c');
|
||||
expect(extractor).toBe(unzipBuffer);
|
||||
});
|
||||
|
||||
it('returns untarBuffer if the key ends in anything else', () => {
|
||||
mockedGetArchiveLocation.mockImplementation(() => 'xyz');
|
||||
const extractor = getBufferExtractor('will-use-mocked-key', 'a.b.c');
|
||||
expect(extractor).toBe(untarBuffer);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -17,8 +17,8 @@ import {
|
|||
RegistrySearchResults,
|
||||
RegistrySearchResult,
|
||||
} from '../../../types';
|
||||
import { cacheGet, cacheSet, getCacheKey, cacheHas } from './cache';
|
||||
import { ArchiveEntry, untarBuffer } from './extract';
|
||||
import { cacheGet, cacheSet, cacheHas, getArchiveLocation, setArchiveLocation } from './cache';
|
||||
import { ArchiveEntry, untarBuffer, unzipBuffer } from './extract';
|
||||
import { fetchUrl, getResponse, getResponseStream } from './requests';
|
||||
import { streamToBuffer } from './streams';
|
||||
import { getRegistryUrl } from './registry_url';
|
||||
|
@ -130,7 +130,9 @@ export async function getArchiveInfo(
|
|||
filter = (entry: ArchiveEntry): boolean => true
|
||||
): Promise<string[]> {
|
||||
const paths: string[] = [];
|
||||
const onEntry = (entry: ArchiveEntry) => {
|
||||
const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion);
|
||||
const bufferExtractor = getBufferExtractor(pkgName, pkgVersion);
|
||||
await bufferExtractor(archiveBuffer, filter, (entry: ArchiveEntry) => {
|
||||
const { path, buffer } = entry;
|
||||
const { file } = pathParts(path);
|
||||
if (!file) return;
|
||||
|
@ -138,9 +140,7 @@ export async function getArchiveInfo(
|
|||
cacheSet(path, buffer);
|
||||
paths.push(path);
|
||||
}
|
||||
};
|
||||
|
||||
await extract(pkgName, pkgVersion, filter, onEntry);
|
||||
});
|
||||
|
||||
return paths;
|
||||
}
|
||||
|
@ -175,24 +175,20 @@ export function pathParts(path: string): AssetParts {
|
|||
} as AssetParts;
|
||||
}
|
||||
|
||||
async function extract(
|
||||
pkgName: string,
|
||||
pkgVersion: string,
|
||||
filter = (entry: ArchiveEntry): boolean => true,
|
||||
onEntry: (entry: ArchiveEntry) => void
|
||||
) {
|
||||
const archiveBuffer = await getOrFetchArchiveBuffer(pkgName, pkgVersion);
|
||||
export function getBufferExtractor(pkgName: string, pkgVersion: string) {
|
||||
const archiveLocation = getArchiveLocation(pkgName, pkgVersion);
|
||||
if (!archiveLocation) throw new Error(`no archive location for ${pkgName} ${pkgVersion}`);
|
||||
const isZip = archiveLocation.endsWith('.zip');
|
||||
const bufferExtractor = isZip ? unzipBuffer : untarBuffer;
|
||||
|
||||
return untarBuffer(archiveBuffer, filter, onEntry);
|
||||
return bufferExtractor;
|
||||
}
|
||||
|
||||
async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
|
||||
// assume .tar.gz for now. add support for .zip if/when we need it
|
||||
const key = getCacheKey(`${pkgName}-${pkgVersion}`);
|
||||
let buffer = cacheGet(key);
|
||||
const key = getArchiveLocation(pkgName, pkgVersion);
|
||||
let buffer = key && cacheGet(key);
|
||||
if (!buffer) {
|
||||
buffer = await fetchArchiveBuffer(pkgName, pkgVersion);
|
||||
cacheSet(key, buffer);
|
||||
}
|
||||
|
||||
if (buffer) {
|
||||
|
@ -203,16 +199,21 @@ async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Pro
|
|||
}
|
||||
|
||||
export async function ensureCachedArchiveInfo(name: string, version: string) {
|
||||
const pkgkey = getCacheKey(`${name}-${version}`);
|
||||
if (!cacheHas(pkgkey)) {
|
||||
const pkgkey = getArchiveLocation(name, version);
|
||||
if (!pkgkey || !cacheHas(pkgkey)) {
|
||||
await getArchiveInfo(name, version);
|
||||
}
|
||||
}
|
||||
|
||||
async function fetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
|
||||
const { download: archivePath } = await fetchInfo(pkgName, pkgVersion);
|
||||
const registryUrl = getRegistryUrl();
|
||||
return getResponseStream(`${registryUrl}${archivePath}`).then(streamToBuffer);
|
||||
const archiveUrl = `${getRegistryUrl()}${archivePath}`;
|
||||
const buffer = await getResponseStream(archiveUrl).then(streamToBuffer);
|
||||
|
||||
setArchiveLocation(pkgName, pkgVersion, archivePath);
|
||||
cacheSet(archivePath, buffer);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
export function getAsset(key: string) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue