mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Use random purpose in blob store repository tests (#102789)
Today many blob store repository tests specify that the operations they perform have purpose `OperationPurpose#SNAPSHOT`, but most of these tests do not care about the purpose of these operations. This commit switches them to using a random purpose to highlight that the purpose is unimportant to the test.
This commit is contained in:
parent
7cf32030e5
commit
824d06c8cf
25 changed files with 196 additions and 258 deletions
|
@ -20,7 +20,6 @@ import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.regex.Regex;
|
import org.elasticsearch.common.regex.Regex;
|
||||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||||
|
@ -45,6 +44,7 @@ import java.util.Map;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.Matchers.anEmptyMap;
|
import static org.hamcrest.Matchers.anEmptyMap;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
@ -235,11 +235,11 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
||||||
for (int i = 0; i < numberOfBlobs; i++) {
|
for (int i = 0; i < numberOfBlobs; i++) {
|
||||||
byte[] bytes = randomBytes(randomInt(100));
|
byte[] bytes = randomBytes(randomInt(100));
|
||||||
String blobName = randomAlphaOfLength(10);
|
String blobName = randomAlphaOfLength(10);
|
||||||
container.writeBlob(OperationPurpose.SNAPSHOT, blobName, new BytesArray(bytes), false);
|
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
container.delete(OperationPurpose.SNAPSHOT);
|
container.delete(randomPurpose());
|
||||||
assertThat(container.listBlobs(OperationPurpose.SNAPSHOT), is(anEmptyMap()));
|
assertThat(container.listBlobs(randomPurpose()), is(anEmptyMap()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,7 +250,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
byte[] bytes = randomBytes(randomInt(100));
|
byte[] bytes = randomBytes(randomInt(100));
|
||||||
String blobName = randomAlphaOfLength(10);
|
String blobName = randomAlphaOfLength(10);
|
||||||
container.writeBlob(OperationPurpose.SNAPSHOT, blobName, new BytesArray(bytes), false);
|
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
|
||||||
blobsToDelete.add(blobName);
|
blobsToDelete.add(blobName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,18 +260,15 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
||||||
}
|
}
|
||||||
|
|
||||||
Randomness.shuffle(blobsToDelete);
|
Randomness.shuffle(blobsToDelete);
|
||||||
container.deleteBlobsIgnoringIfNotExists(OperationPurpose.SNAPSHOT, blobsToDelete.iterator());
|
container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobsToDelete.iterator());
|
||||||
assertThat(container.listBlobs(OperationPurpose.SNAPSHOT), is(anEmptyMap()));
|
assertThat(container.listBlobs(randomPurpose()), is(anEmptyMap()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testNotFoundErrorMessageContainsFullKey() throws Exception {
|
public void testNotFoundErrorMessageContainsFullKey() throws Exception {
|
||||||
try (BlobStore store = newBlobStore()) {
|
try (BlobStore store = newBlobStore()) {
|
||||||
BlobContainer container = store.blobContainer(BlobPath.EMPTY.add("nested").add("dir"));
|
BlobContainer container = store.blobContainer(BlobPath.EMPTY.add("nested").add("dir"));
|
||||||
NoSuchFileException exception = expectThrows(
|
NoSuchFileException exception = expectThrows(NoSuchFileException.class, () -> container.readBlob(randomPurpose(), "blob"));
|
||||||
NoSuchFileException.class,
|
|
||||||
() -> container.readBlob(OperationPurpose.SNAPSHOT, "blob")
|
|
||||||
);
|
|
||||||
assertThat(exception.getMessage(), containsString("nested/dir/blob] not found"));
|
assertThat(exception.getMessage(), containsString("nested/dir/blob] not found"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -281,10 +278,10 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
||||||
BlobContainer container = store.blobContainer(BlobPath.EMPTY.add(UUIDs.randomBase64UUID()));
|
BlobContainer container = store.blobContainer(BlobPath.EMPTY.add(UUIDs.randomBase64UUID()));
|
||||||
var data = randomBytes(randomIntBetween(128, 512));
|
var data = randomBytes(randomIntBetween(128, 512));
|
||||||
String blobName = randomName();
|
String blobName = randomName();
|
||||||
container.writeBlob(OperationPurpose.SNAPSHOT, blobName, new ByteArrayInputStream(data), data.length, true);
|
container.writeBlob(randomPurpose(), blobName, new ByteArrayInputStream(data), data.length, true);
|
||||||
|
|
||||||
var originalDataInputStream = new ByteArrayInputStream(data);
|
var originalDataInputStream = new ByteArrayInputStream(data);
|
||||||
try (var azureInputStream = container.readBlob(OperationPurpose.SNAPSHOT, blobName)) {
|
try (var azureInputStream = container.readBlob(randomPurpose(), blobName)) {
|
||||||
for (int i = 0; i < data.length; i++) {
|
for (int i = 0; i < data.length; i++) {
|
||||||
assertThat(originalDataInputStream.read(), is(equalTo(azureInputStream.read())));
|
assertThat(originalDataInputStream.read(), is(equalTo(azureInputStream.read())));
|
||||||
}
|
}
|
||||||
|
@ -292,7 +289,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
||||||
assertThat(azureInputStream.read(), is(equalTo(-1)));
|
assertThat(azureInputStream.read(), is(equalTo(-1)));
|
||||||
assertThat(originalDataInputStream.read(), is(equalTo(-1)));
|
assertThat(originalDataInputStream.read(), is(equalTo(-1)));
|
||||||
}
|
}
|
||||||
container.delete(OperationPurpose.SNAPSHOT);
|
container.delete(randomPurpose());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||||
import org.elasticsearch.common.settings.SecureSettings;
|
import org.elasticsearch.common.settings.SecureSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -36,6 +35,7 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.Matchers.blankOrNullString;
|
import static org.hamcrest.Matchers.blankOrNullString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.not;
|
import static org.hamcrest.Matchers.not;
|
||||||
|
@ -140,13 +140,13 @@ public class AzureStorageCleanupThirdPartyTests extends AbstractThirdPartyReposi
|
||||||
repo.threadPool().generic().execute(ActionRunnable.run(future, () -> {
|
repo.threadPool().generic().execute(ActionRunnable.run(future, () -> {
|
||||||
final BlobContainer blobContainer = repo.blobStore().blobContainer(repo.basePath().add("large_write"));
|
final BlobContainer blobContainer = repo.blobStore().blobContainer(repo.basePath().add("large_write"));
|
||||||
blobContainer.writeBlob(
|
blobContainer.writeBlob(
|
||||||
OperationPurpose.SNAPSHOT,
|
randomPurpose(),
|
||||||
UUIDs.base64UUID(),
|
UUIDs.base64UUID(),
|
||||||
new ByteArrayInputStream(randomByteArrayOfLength(blobSize)),
|
new ByteArrayInputStream(randomByteArrayOfLength(blobSize)),
|
||||||
blobSize,
|
blobSize,
|
||||||
false
|
false
|
||||||
);
|
);
|
||||||
blobContainer.delete(OperationPurpose.SNAPSHOT);
|
blobContainer.delete(randomPurpose());
|
||||||
}));
|
}));
|
||||||
future.get();
|
future.get();
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,6 @@ import com.sun.net.httpserver.HttpHandler;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
|
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
|
||||||
|
@ -43,6 +42,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
|
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
@ -61,11 +61,11 @@ public class AzureBlobContainerRetriesTests extends AbstractAzureServerTestCase
|
||||||
final BlobContainer blobContainer = createBlobContainer(between(1, 5));
|
final BlobContainer blobContainer = createBlobContainer(between(1, 5));
|
||||||
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
|
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_nonexistent_blob");
|
blobContainer.readBlob(randomPurpose(), "read_nonexistent_blob");
|
||||||
} else {
|
} else {
|
||||||
final long position = randomLongBetween(0, MAX_RANGE_VAL - 1L);
|
final long position = randomLongBetween(0, MAX_RANGE_VAL - 1L);
|
||||||
final long length = randomLongBetween(1, MAX_RANGE_VAL - position);
|
final long length = randomLongBetween(1, MAX_RANGE_VAL - position);
|
||||||
blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_nonexistent_blob", position, length);
|
blobContainer.readBlob(randomPurpose(), "read_nonexistent_blob", position, length);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
assertThat(exception.toString(), exception.getMessage().toLowerCase(Locale.ROOT), containsString("not found"));
|
assertThat(exception.toString(), exception.getMessage().toLowerCase(Locale.ROOT), containsString("not found"));
|
||||||
|
@ -112,7 +112,7 @@ public class AzureBlobContainerRetriesTests extends AbstractAzureServerTestCase
|
||||||
});
|
});
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries);
|
||||||
try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_max_retries")) {
|
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_blob_max_retries")) {
|
||||||
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
|
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
|
||||||
assertThat(countDownHead.isCountedDown(), is(true));
|
assertThat(countDownHead.isCountedDown(), is(true));
|
||||||
assertThat(countDownGet.isCountedDown(), is(true));
|
assertThat(countDownGet.isCountedDown(), is(true));
|
||||||
|
@ -160,7 +160,7 @@ public class AzureBlobContainerRetriesTests extends AbstractAzureServerTestCase
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries);
|
||||||
final int position = randomIntBetween(0, bytes.length - 1);
|
final int position = randomIntBetween(0, bytes.length - 1);
|
||||||
final int length = randomIntBetween(1, bytes.length - position);
|
final int length = randomIntBetween(1, bytes.length - position);
|
||||||
try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_range_blob_max_retries", position, length)) {
|
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_range_blob_max_retries", position, length)) {
|
||||||
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream));
|
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream));
|
||||||
assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead);
|
assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead);
|
||||||
assertThat(countDownGet.isCountedDown(), is(true));
|
assertThat(countDownGet.isCountedDown(), is(true));
|
||||||
|
@ -203,7 +203,7 @@ public class AzureBlobContainerRetriesTests extends AbstractAzureServerTestCase
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries);
|
||||||
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
||||||
blobContainer.writeBlob(OperationPurpose.SNAPSHOT, "write_blob_max_retries", stream, bytes.length, false);
|
blobContainer.writeBlob(randomPurpose(), "write_blob_max_retries", stream, bytes.length, false);
|
||||||
}
|
}
|
||||||
assertThat(countDown.isCountedDown(), is(true));
|
assertThat(countDown.isCountedDown(), is(true));
|
||||||
}
|
}
|
||||||
|
@ -273,7 +273,7 @@ public class AzureBlobContainerRetriesTests extends AbstractAzureServerTestCase
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries);
|
||||||
|
|
||||||
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
|
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
|
||||||
blobContainer.writeBlob(OperationPurpose.SNAPSHOT, "write_large_blob", stream, data.length, false);
|
blobContainer.writeBlob(randomPurpose(), "write_large_blob", stream, data.length, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat(countDownUploads.get(), equalTo(0));
|
assertThat(countDownUploads.get(), equalTo(0));
|
||||||
|
@ -341,7 +341,7 @@ public class AzureBlobContainerRetriesTests extends AbstractAzureServerTestCase
|
||||||
});
|
});
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries);
|
||||||
blobContainer.writeMetadataBlob(OperationPurpose.SNAPSHOT, "write_large_blob_streaming", false, randomBoolean(), out -> {
|
blobContainer.writeMetadataBlob(randomPurpose(), "write_large_blob_streaming", false, randomBoolean(), out -> {
|
||||||
int outstanding = data.length;
|
int outstanding = data.length;
|
||||||
while (outstanding > 0) {
|
while (outstanding > 0) {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
|
@ -391,13 +391,7 @@ public class AzureBlobContainerRetriesTests extends AbstractAzureServerTestCase
|
||||||
}) {
|
}) {
|
||||||
final IOException ioe = expectThrows(
|
final IOException ioe = expectThrows(
|
||||||
IOException.class,
|
IOException.class,
|
||||||
() -> blobContainer.writeBlob(
|
() -> blobContainer.writeBlob(randomPurpose(), "write_blob_max_retries", stream, randomIntBetween(1, 128), randomBoolean())
|
||||||
OperationPurpose.SNAPSHOT,
|
|
||||||
"write_blob_max_retries",
|
|
||||||
stream,
|
|
||||||
randomIntBetween(1, 128),
|
|
||||||
randomBoolean()
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
assertThat(ioe.getMessage(), is("Unable to write blob write_blob_max_retries"));
|
assertThat(ioe.getMessage(), is("Unable to write blob write_blob_max_retries"));
|
||||||
// The mock http server uses 1 thread to process the requests, it's possible that the
|
// The mock http server uses 1 thread to process the requests, it's possible that the
|
||||||
|
@ -471,7 +465,7 @@ public class AzureBlobContainerRetriesTests extends AbstractAzureServerTestCase
|
||||||
}
|
}
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, secondaryHost, locationMode);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, secondaryHost, locationMode);
|
||||||
try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_from_secondary")) {
|
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_blob_from_secondary")) {
|
||||||
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
|
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
|
||||||
|
|
||||||
// It does round robin, first tries on the primary, then on the secondary
|
// It does round robin, first tries on the primary, then on the secondary
|
||||||
|
|
|
@ -10,7 +10,6 @@ package org.elasticsearch.repositories.azure;
|
||||||
|
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||||
|
@ -24,6 +23,7 @@ import java.util.Locale;
|
||||||
|
|
||||||
import static org.elasticsearch.repositories.azure.AzureStorageSettings.ACCOUNT_SETTING;
|
import static org.elasticsearch.repositories.azure.AzureStorageSettings.ACCOUNT_SETTING;
|
||||||
import static org.elasticsearch.repositories.azure.AzureStorageSettings.SAS_TOKEN_SETTING;
|
import static org.elasticsearch.repositories.azure.AzureStorageSettings.SAS_TOKEN_SETTING;
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.lessThan;
|
import static org.hamcrest.Matchers.lessThan;
|
||||||
|
@ -77,7 +77,7 @@ public class AzureSasTokenTests extends AbstractAzureServerTestCase {
|
||||||
});
|
});
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, LocationMode.PRIMARY_ONLY, clientName, secureSettings);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, LocationMode.PRIMARY_ONLY, clientName, secureSettings);
|
||||||
try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.SNAPSHOT, "sas_test")) {
|
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "sas_test")) {
|
||||||
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
|
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.collect.Iterators;
|
import org.elasticsearch.common.collect.Iterators;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
|
@ -59,6 +58,7 @@ import java.util.Map;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
|
||||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
|
||||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
|
||||||
|
@ -132,7 +132,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
||||||
f,
|
f,
|
||||||
() -> repository.blobStore()
|
() -> repository.blobStore()
|
||||||
.blobContainer(repository.basePath())
|
.blobContainer(repository.basePath())
|
||||||
.deleteBlobsIgnoringIfNotExists(OperationPurpose.SNAPSHOT, Iterators.single("foo"))
|
.deleteBlobsIgnoringIfNotExists(randomPurpose(), Iterators.single("foo"))
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
@ -198,7 +198,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
||||||
random().nextBytes(data);
|
random().nextBytes(data);
|
||||||
writeBlob(container, "foobar", new BytesArray(data), false);
|
writeBlob(container, "foobar", new BytesArray(data), false);
|
||||||
}
|
}
|
||||||
try (InputStream stream = container.readBlob(OperationPurpose.SNAPSHOT, "foobar")) {
|
try (InputStream stream = container.readBlob(randomPurpose(), "foobar")) {
|
||||||
BytesRefBuilder target = new BytesRefBuilder();
|
BytesRefBuilder target = new BytesRefBuilder();
|
||||||
while (target.length() < data.length) {
|
while (target.length() < data.length) {
|
||||||
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
|
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
|
||||||
|
@ -209,7 +209,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
||||||
assertEquals(data.length, target.length());
|
assertEquals(data.length, target.length());
|
||||||
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
|
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
|
||||||
}
|
}
|
||||||
container.delete(OperationPurpose.SNAPSHOT);
|
container.delete(randomPurpose());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
|
@ -64,6 +63,7 @@ import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeStart;
|
||||||
import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody;
|
import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody;
|
||||||
import static fixture.gcs.TestUtils.createServiceAccount;
|
import static fixture.gcs.TestUtils.createServiceAccount;
|
||||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
|
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
|
||||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH;
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH;
|
||||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
|
||||||
|
@ -188,7 +188,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
exchange.close();
|
exchange.close();
|
||||||
});
|
});
|
||||||
|
|
||||||
try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.SNAPSHOT, "large_blob_retries")) {
|
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "large_blob_retries")) {
|
||||||
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
|
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -231,7 +231,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
}));
|
}));
|
||||||
|
|
||||||
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
||||||
blobContainer.writeBlob(OperationPurpose.SNAPSHOT, "write_blob_max_retries", stream, bytes.length, false);
|
blobContainer.writeBlob(randomPurpose(), "write_blob_max_retries", stream, bytes.length, false);
|
||||||
}
|
}
|
||||||
assertThat(countDown.isCountedDown(), is(true));
|
assertThat(countDown.isCountedDown(), is(true));
|
||||||
}
|
}
|
||||||
|
@ -254,7 +254,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
|
|
||||||
Exception exception = expectThrows(StorageException.class, () -> {
|
Exception exception = expectThrows(StorageException.class, () -> {
|
||||||
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
||||||
blobContainer.writeBlob(OperationPurpose.SNAPSHOT, "write_blob_timeout", stream, bytes.length, false);
|
blobContainer.writeBlob(randomPurpose(), "write_blob_timeout", stream, bytes.length, false);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
|
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
|
||||||
|
@ -392,10 +392,10 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
|
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
|
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", data), data.length)) {
|
||||||
blobContainer.writeBlob(OperationPurpose.SNAPSHOT, "write_large_blob", stream, data.length, false);
|
blobContainer.writeBlob(randomPurpose(), "write_large_blob", stream, data.length, false);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
blobContainer.writeMetadataBlob(OperationPurpose.SNAPSHOT, "write_large_blob", false, randomBoolean(), out -> out.write(data));
|
blobContainer.writeMetadataBlob(randomPurpose(), "write_large_blob", false, randomBoolean(), out -> out.write(data));
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat(countInits.get(), equalTo(0));
|
assertThat(countInits.get(), equalTo(0));
|
||||||
|
@ -452,7 +452,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
exchange.getResponseBody().write(response);
|
exchange.getResponseBody().write(response);
|
||||||
}));
|
}));
|
||||||
|
|
||||||
blobContainer.deleteBlobsIgnoringIfNotExists(OperationPurpose.SNAPSHOT, blobNamesIterator);
|
blobContainer.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobNamesIterator);
|
||||||
|
|
||||||
// Ensure that the remaining deletes are sent in the last batch
|
// Ensure that the remaining deletes are sent in the last batch
|
||||||
if (pendingDeletes.get() > 0) {
|
if (pendingDeletes.get() > 0) {
|
||||||
|
|
|
@ -19,7 +19,6 @@ import com.google.cloud.storage.StorageException;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
|
@ -27,6 +26,7 @@ import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
|
@ -93,7 +93,7 @@ public class GoogleCloudStorageBlobStoreContainerTests extends ESTestCase {
|
||||||
|
|
||||||
IOException e = expectThrows(
|
IOException e = expectThrows(
|
||||||
IOException.class,
|
IOException.class,
|
||||||
() -> container.deleteBlobsIgnoringIfNotExists(OperationPurpose.SNAPSHOT, blobs.iterator())
|
() -> container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobs.iterator())
|
||||||
);
|
);
|
||||||
assertThat(e.getCause(), instanceOf(StorageException.class));
|
assertThat(e.getCause(), instanceOf(StorageException.class));
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.elasticsearch.repositories.RepositoryData;
|
||||||
import org.elasticsearch.repositories.RepositoryMissingException;
|
import org.elasticsearch.repositories.RepositoryMissingException;
|
||||||
import org.elasticsearch.repositories.RepositoryStats;
|
import org.elasticsearch.repositories.RepositoryStats;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
|
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||||
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
|
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.snapshots.SnapshotId;
|
import org.elasticsearch.snapshots.SnapshotId;
|
||||||
|
@ -78,6 +79,7 @@ import java.util.stream.Collectors;
|
||||||
import java.util.stream.StreamSupport;
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
import static org.elasticsearch.repositories.RepositoriesModule.METRIC_REQUESTS_COUNT;
|
import static org.elasticsearch.repositories.RepositoriesModule.METRIC_REQUESTS_COUNT;
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||||
import static org.hamcrest.Matchers.allOf;
|
import static org.hamcrest.Matchers.allOf;
|
||||||
|
@ -317,7 +319,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
|
||||||
assertThat(initialStats.keySet(), equalTo(allOperations));
|
assertThat(initialStats.keySet(), equalTo(allOperations));
|
||||||
|
|
||||||
// Collect more stats with an operation purpose other than the default
|
// Collect more stats with an operation purpose other than the default
|
||||||
final OperationPurpose purpose = randomValueOtherThan(OperationPurpose.SNAPSHOT, () -> randomFrom(OperationPurpose.values()));
|
final OperationPurpose purpose = randomValueOtherThan(OperationPurpose.SNAPSHOT, BlobStoreTestUtil::randomPurpose);
|
||||||
final BlobPath blobPath = repository.basePath().add(randomAlphaOfLength(10));
|
final BlobPath blobPath = repository.basePath().add(randomAlphaOfLength(10));
|
||||||
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
|
final BlobContainer blobContainer = blobStore.blobContainer(blobPath);
|
||||||
final BytesArray whatToWrite = new BytesArray(randomByteArrayOfLength(randomIntBetween(100, 1000)));
|
final BytesArray whatToWrite = new BytesArray(randomByteArrayOfLength(randomIntBetween(100, 1000)));
|
||||||
|
@ -394,7 +396,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
|
||||||
() -> repository.blobStore()
|
() -> repository.blobStore()
|
||||||
.blobContainer(repository.basePath())
|
.blobContainer(repository.basePath())
|
||||||
.writeBlobAtomic(
|
.writeBlobAtomic(
|
||||||
OperationPurpose.SNAPSHOT,
|
randomPurpose(),
|
||||||
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(),
|
BlobStoreRepository.INDEX_FILE_PREFIX + modifiedRepositoryData.getGenId(),
|
||||||
serialized,
|
serialized,
|
||||||
true
|
true
|
||||||
|
|
|
@ -17,7 +17,6 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.blobstore.OptionalBytesReference;
|
import org.elasticsearch.common.blobstore.OptionalBytesReference;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
@ -46,6 +45,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.Matchers.allOf;
|
import static org.hamcrest.Matchers.allOf;
|
||||||
import static org.hamcrest.Matchers.blankOrNullString;
|
import static org.hamcrest.Matchers.blankOrNullString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
@ -161,7 +161,7 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
|
||||||
class TestHarness {
|
class TestHarness {
|
||||||
boolean tryCompareAndSet(BytesReference expected, BytesReference updated) {
|
boolean tryCompareAndSet(BytesReference expected, BytesReference updated) {
|
||||||
return PlainActionFuture.<Boolean, RuntimeException>get(
|
return PlainActionFuture.<Boolean, RuntimeException>get(
|
||||||
future -> blobContainer.compareAndSetRegister(OperationPurpose.SNAPSHOT, "key", expected, updated, future),
|
future -> blobContainer.compareAndSetRegister(randomPurpose(), "key", expected, updated, future),
|
||||||
10,
|
10,
|
||||||
TimeUnit.SECONDS
|
TimeUnit.SECONDS
|
||||||
);
|
);
|
||||||
|
@ -169,11 +169,7 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
|
||||||
|
|
||||||
BytesReference readRegister() {
|
BytesReference readRegister() {
|
||||||
return PlainActionFuture.get(
|
return PlainActionFuture.get(
|
||||||
future -> blobContainer.getRegister(
|
future -> blobContainer.getRegister(randomPurpose(), "key", future.map(OptionalBytesReference::bytesReference)),
|
||||||
OperationPurpose.SNAPSHOT,
|
|
||||||
"key",
|
|
||||||
future.map(OptionalBytesReference::bytesReference)
|
|
||||||
),
|
|
||||||
10,
|
10,
|
||||||
TimeUnit.SECONDS
|
TimeUnit.SECONDS
|
||||||
);
|
);
|
||||||
|
@ -220,7 +216,7 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
|
||||||
assertThat(testHarness.listMultipartUploads(), hasSize(0));
|
assertThat(testHarness.listMultipartUploads(), hasSize(0));
|
||||||
assertEquals(bytes2, testHarness.readRegister());
|
assertEquals(bytes2, testHarness.readRegister());
|
||||||
} finally {
|
} finally {
|
||||||
blobContainer.delete(OperationPurpose.SNAPSHOT);
|
blobContainer.delete(randomPurpose());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
ThreadPool.terminate(threadpool, 10, TimeUnit.SECONDS);
|
ThreadPool.terminate(threadpool, 10, TimeUnit.SECONDS);
|
||||||
|
|
|
@ -55,6 +55,7 @@ import java.util.Objects;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING;
|
import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING;
|
||||||
import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING;
|
import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING;
|
||||||
import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_RETRIES_SETTING;
|
import static org.elasticsearch.repositories.s3.S3ClientSettings.MAX_RETRIES_SETTING;
|
||||||
|
@ -216,7 +217,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
||||||
blobContainer.writeBlob(OperationPurpose.SNAPSHOT, "write_blob_max_retries", stream, bytes.length, false);
|
blobContainer.writeBlob(randomPurpose(), "write_blob_max_retries", stream, bytes.length, false);
|
||||||
}
|
}
|
||||||
assertThat(countDown.isCountedDown(), is(true));
|
assertThat(countDown.isCountedDown(), is(true));
|
||||||
}
|
}
|
||||||
|
@ -239,7 +240,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
Exception exception = expectThrows(IOException.class, () -> {
|
Exception exception = expectThrows(IOException.class, () -> {
|
||||||
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
|
||||||
blobContainer.writeBlob(OperationPurpose.SNAPSHOT, "write_blob_timeout", stream, bytes.length, false);
|
blobContainer.writeBlob(randomPurpose(), "write_blob_timeout", stream, bytes.length, false);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
assertThat(
|
assertThat(
|
||||||
|
@ -345,7 +346,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
blobContainer.writeBlob(OperationPurpose.SNAPSHOT, "write_large_blob", new ZeroInputStream(blobSize), blobSize, false);
|
blobContainer.writeBlob(randomPurpose(), "write_large_blob", new ZeroInputStream(blobSize), blobSize, false);
|
||||||
|
|
||||||
assertThat(countDownInitiate.isCountedDown(), is(true));
|
assertThat(countDownInitiate.isCountedDown(), is(true));
|
||||||
assertThat(countDownUploads.get(), equalTo(0));
|
assertThat(countDownUploads.get(), equalTo(0));
|
||||||
|
@ -443,7 +444,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
blobContainer.writeMetadataBlob(OperationPurpose.SNAPSHOT, "write_large_blob_streaming", false, randomBoolean(), out -> {
|
blobContainer.writeMetadataBlob(randomPurpose(), "write_large_blob_streaming", false, randomBoolean(), out -> {
|
||||||
final byte[] buffer = new byte[16 * 1024];
|
final byte[] buffer = new byte[16 * 1024];
|
||||||
long outstanding = blobSize;
|
long outstanding = blobSize;
|
||||||
while (outstanding > 0) {
|
while (outstanding > 0) {
|
||||||
|
@ -518,7 +519,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), new FlakyReadHandler());
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), new FlakyReadHandler());
|
||||||
|
|
||||||
try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_max_retries")) {
|
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_blob_max_retries")) {
|
||||||
final int readLimit;
|
final int readLimit;
|
||||||
final InputStream wrappedStream;
|
final InputStream wrappedStream;
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
|
|
|
@ -26,7 +26,6 @@ import com.amazonaws.services.s3.model.UploadPartResult;
|
||||||
|
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStoreException;
|
import org.elasticsearch.common.blobstore.BlobStoreException;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.core.Tuple;
|
import org.elasticsearch.core.Tuple;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -40,6 +39,7 @@ import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
@ -59,7 +59,7 @@ public class S3BlobStoreContainerTests extends ESTestCase {
|
||||||
|
|
||||||
final IllegalArgumentException e = expectThrows(
|
final IllegalArgumentException e = expectThrows(
|
||||||
IllegalArgumentException.class,
|
IllegalArgumentException.class,
|
||||||
() -> blobContainer.executeSingleUpload(OperationPurpose.SNAPSHOT, blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
|
() -> blobContainer.executeSingleUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
|
||||||
);
|
);
|
||||||
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
|
assertEquals("Upload request size [" + blobSize + "] can't be larger than 5gb", e.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -74,7 +74,7 @@ public class S3BlobStoreContainerTests extends ESTestCase {
|
||||||
final IllegalArgumentException e = expectThrows(
|
final IllegalArgumentException e = expectThrows(
|
||||||
IllegalArgumentException.class,
|
IllegalArgumentException.class,
|
||||||
() -> blobContainer.executeSingleUpload(
|
() -> blobContainer.executeSingleUpload(
|
||||||
OperationPurpose.SNAPSHOT,
|
randomPurpose(),
|
||||||
blobStore,
|
blobStore,
|
||||||
blobName,
|
blobName,
|
||||||
new ByteArrayInputStream(new byte[0]),
|
new ByteArrayInputStream(new byte[0]),
|
||||||
|
@ -121,7 +121,7 @@ public class S3BlobStoreContainerTests extends ESTestCase {
|
||||||
when(client.putObject(argumentCaptor.capture())).thenReturn(new PutObjectResult());
|
when(client.putObject(argumentCaptor.capture())).thenReturn(new PutObjectResult());
|
||||||
|
|
||||||
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
|
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[blobSize]);
|
||||||
blobContainer.executeSingleUpload(OperationPurpose.SNAPSHOT, blobStore, blobName, inputStream, blobSize);
|
blobContainer.executeSingleUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
|
||||||
|
|
||||||
final PutObjectRequest request = argumentCaptor.getValue();
|
final PutObjectRequest request = argumentCaptor.getValue();
|
||||||
assertEquals(bucketName, request.getBucketName());
|
assertEquals(bucketName, request.getBucketName());
|
||||||
|
@ -142,13 +142,7 @@ public class S3BlobStoreContainerTests extends ESTestCase {
|
||||||
|
|
||||||
final IllegalArgumentException e = expectThrows(
|
final IllegalArgumentException e = expectThrows(
|
||||||
IllegalArgumentException.class,
|
IllegalArgumentException.class,
|
||||||
() -> blobContainer.executeMultipartUpload(
|
() -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
|
||||||
OperationPurpose.SNAPSHOT,
|
|
||||||
blobStore,
|
|
||||||
randomAlphaOfLengthBetween(1, 10),
|
|
||||||
null,
|
|
||||||
blobSize
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
|
assertEquals("Multipart upload request size [" + blobSize + "] can't be larger than 5tb", e.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -160,13 +154,7 @@ public class S3BlobStoreContainerTests extends ESTestCase {
|
||||||
|
|
||||||
final IllegalArgumentException e = expectThrows(
|
final IllegalArgumentException e = expectThrows(
|
||||||
IllegalArgumentException.class,
|
IllegalArgumentException.class,
|
||||||
() -> blobContainer.executeMultipartUpload(
|
() -> blobContainer.executeMultipartUpload(randomPurpose(), blobStore, randomAlphaOfLengthBetween(1, 10), null, blobSize)
|
||||||
OperationPurpose.SNAPSHOT,
|
|
||||||
blobStore,
|
|
||||||
randomAlphaOfLengthBetween(1, 10),
|
|
||||||
null,
|
|
||||||
blobSize
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
|
assertEquals("Multipart upload request size [" + blobSize + "] can't be smaller than 5mb", e.getMessage());
|
||||||
}
|
}
|
||||||
|
@ -230,7 +218,7 @@ public class S3BlobStoreContainerTests extends ESTestCase {
|
||||||
|
|
||||||
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]);
|
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]);
|
||||||
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
|
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
|
||||||
blobContainer.executeMultipartUpload(OperationPurpose.SNAPSHOT, blobStore, blobName, inputStream, blobSize);
|
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
|
||||||
|
|
||||||
final InitiateMultipartUploadRequest initRequest = initArgCaptor.getValue();
|
final InitiateMultipartUploadRequest initRequest = initArgCaptor.getValue();
|
||||||
assertEquals(bucketName, initRequest.getBucketName());
|
assertEquals(bucketName, initRequest.getBucketName());
|
||||||
|
@ -336,13 +324,7 @@ public class S3BlobStoreContainerTests extends ESTestCase {
|
||||||
|
|
||||||
final IOException e = expectThrows(IOException.class, () -> {
|
final IOException e = expectThrows(IOException.class, () -> {
|
||||||
final S3BlobContainer blobContainer = new S3BlobContainer(BlobPath.EMPTY, blobStore);
|
final S3BlobContainer blobContainer = new S3BlobContainer(BlobPath.EMPTY, blobStore);
|
||||||
blobContainer.executeMultipartUpload(
|
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize);
|
||||||
OperationPurpose.SNAPSHOT,
|
|
||||||
blobStore,
|
|
||||||
blobName,
|
|
||||||
new ByteArrayInputStream(new byte[0]),
|
|
||||||
blobSize
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage());
|
assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage());
|
||||||
|
|
|
@ -14,7 +14,6 @@ import com.amazonaws.services.s3.model.S3Object;
|
||||||
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
||||||
|
|
||||||
import org.apache.http.client.methods.HttpGet;
|
import org.apache.http.client.methods.HttpGet;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
@ -23,6 +22,7 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -94,11 +94,11 @@ public class S3RetryingInputStreamTests extends ESTestCase {
|
||||||
if (position != null && length != null) {
|
if (position != null && length != null) {
|
||||||
s3Object.getObjectMetadata().setContentLength(length);
|
s3Object.getObjectMetadata().setContentLength(length);
|
||||||
s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data, position, length), new HttpGet()));
|
s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data, position, length), new HttpGet()));
|
||||||
return new S3RetryingInputStream(OperationPurpose.SNAPSHOT, blobStore, "_blob", position, Math.addExact(position, length - 1));
|
return new S3RetryingInputStream(randomPurpose(), blobStore, "_blob", position, Math.addExact(position, length - 1));
|
||||||
} else {
|
} else {
|
||||||
s3Object.getObjectMetadata().setContentLength(data.length);
|
s3Object.getObjectMetadata().setContentLength(data.length);
|
||||||
s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data), new HttpGet()));
|
s3Object.setObjectContent(new S3ObjectInputStream(new ByteArrayInputStream(data), new HttpGet()));
|
||||||
return new S3RetryingInputStream(OperationPurpose.SNAPSHOT, blobStore, "_blob");
|
return new S3RetryingInputStream(randomPurpose(), blobStore, "_blob");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,6 @@ package org.elasticsearch.common.blobstore.url;
|
||||||
|
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
|
@ -21,6 +20,7 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.nio.file.NoSuchFileException;
|
import java.nio.file.NoSuchFileException;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.core.IsEqual.equalTo;
|
import static org.hamcrest.core.IsEqual.equalTo;
|
||||||
|
|
||||||
public abstract class AbstractURLBlobStoreTests extends ESTestCase {
|
public abstract class AbstractURLBlobStoreTests extends ESTestCase {
|
||||||
|
@ -34,7 +34,7 @@ public abstract class AbstractURLBlobStoreTests extends ESTestCase {
|
||||||
BytesArray data = getOriginalData();
|
BytesArray data = getOriginalData();
|
||||||
String blobName = getBlobName();
|
String blobName = getBlobName();
|
||||||
BlobContainer container = getBlobContainer();
|
BlobContainer container = getBlobContainer();
|
||||||
try (InputStream stream = container.readBlob(OperationPurpose.SNAPSHOT, blobName)) {
|
try (InputStream stream = container.readBlob(randomPurpose(), blobName)) {
|
||||||
BytesReference bytesRead = Streams.readFully(stream);
|
BytesReference bytesRead = Streams.readFully(stream);
|
||||||
assertThat(data, equalTo(bytesRead));
|
assertThat(data, equalTo(bytesRead));
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,7 @@ public abstract class AbstractURLBlobStoreTests extends ESTestCase {
|
||||||
BlobContainer container = getBlobContainer();
|
BlobContainer container = getBlobContainer();
|
||||||
int position = randomIntBetween(0, data.length() - 1);
|
int position = randomIntBetween(0, data.length() - 1);
|
||||||
int length = randomIntBetween(1, data.length() - position);
|
int length = randomIntBetween(1, data.length() - position);
|
||||||
try (InputStream stream = container.readBlob(OperationPurpose.SNAPSHOT, blobName, position, length)) {
|
try (InputStream stream = container.readBlob(randomPurpose(), blobName, position, length)) {
|
||||||
BytesReference bytesRead = Streams.readFully(stream);
|
BytesReference bytesRead = Streams.readFully(stream);
|
||||||
assertThat(data.slice(position, length), equalTo(bytesRead));
|
assertThat(data.slice(position, length), equalTo(bytesRead));
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,7 @@ public abstract class AbstractURLBlobStoreTests extends ESTestCase {
|
||||||
public void testNoBlobFound() throws IOException {
|
public void testNoBlobFound() throws IOException {
|
||||||
BlobContainer container = getBlobContainer();
|
BlobContainer container = getBlobContainer();
|
||||||
String incorrectBlobName = UUIDs.base64UUID();
|
String incorrectBlobName = UUIDs.base64UUID();
|
||||||
try (InputStream ignored = container.readBlob(OperationPurpose.SNAPSHOT, incorrectBlobName)) {
|
try (InputStream ignored = container.readBlob(randomPurpose(), incorrectBlobName)) {
|
||||||
ignored.read();
|
ignored.read();
|
||||||
fail("Should have thrown NoSuchFileException exception");
|
fail("Should have thrown NoSuchFileException exception");
|
||||||
} catch (NoSuchFileException e) {
|
} catch (NoSuchFileException e) {
|
||||||
|
|
|
@ -10,7 +10,6 @@ package org.elasticsearch.common.blobstore.url;
|
||||||
|
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.blobstore.url.http.URLHttpClient;
|
import org.elasticsearch.common.blobstore.url.http.URLHttpClient;
|
||||||
import org.elasticsearch.common.blobstore.url.http.URLHttpClientSettings;
|
import org.elasticsearch.common.blobstore.url.http.URLHttpClientSettings;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
@ -21,6 +20,7 @@ import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
public class FileURLBlobStoreTests extends AbstractURLBlobStoreTests {
|
public class FileURLBlobStoreTests extends AbstractURLBlobStoreTests {
|
||||||
|
@ -60,6 +60,6 @@ public class FileURLBlobStoreTests extends AbstractURLBlobStoreTests {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void testURLBlobStoreCanReadBlobRange() throws IOException {
|
public void testURLBlobStoreCanReadBlobRange() throws IOException {
|
||||||
expectThrows(UnsupportedOperationException.class, () -> getBlobContainer().readBlob(OperationPurpose.SNAPSHOT, "test", 0, 12));
|
expectThrows(UnsupportedOperationException.class, () -> getBlobContainer().readBlob(randomPurpose(), "test", 0, 12));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,6 @@ import com.sun.net.httpserver.HttpServer;
|
||||||
|
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.blobstore.url.http.URLHttpClient;
|
import org.elasticsearch.common.blobstore.url.http.URLHttpClient;
|
||||||
import org.elasticsearch.common.blobstore.url.http.URLHttpClientSettings;
|
import org.elasticsearch.common.blobstore.url.http.URLHttpClientSettings;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
@ -36,6 +35,8 @@ import java.net.URL;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
|
|
||||||
@SuppressForbidden(reason = "use http server")
|
@SuppressForbidden(reason = "use http server")
|
||||||
public class HttpURLBlobStoreTests extends AbstractURLBlobStoreTests {
|
public class HttpURLBlobStoreTests extends AbstractURLBlobStoreTests {
|
||||||
private static final Pattern RANGE_PATTERN = Pattern.compile("bytes=(\\d+)-(\\d+)$");
|
private static final Pattern RANGE_PATTERN = Pattern.compile("bytes=(\\d+)-(\\d+)$");
|
||||||
|
@ -127,14 +128,8 @@ public class HttpURLBlobStoreTests extends AbstractURLBlobStoreTests {
|
||||||
|
|
||||||
public void testRangeReadOutsideOfLegalRange() {
|
public void testRangeReadOutsideOfLegalRange() {
|
||||||
BlobContainer container = getBlobContainer();
|
BlobContainer container = getBlobContainer();
|
||||||
expectThrows(
|
expectThrows(IllegalArgumentException.class, () -> container.readBlob(randomPurpose(), blobName, -1, content.length).read());
|
||||||
IllegalArgumentException.class,
|
expectThrows(IOException.class, () -> container.readBlob(randomPurpose(), blobName, content.length + 1, content.length).read());
|
||||||
() -> container.readBlob(OperationPurpose.SNAPSHOT, blobName, -1, content.length).read()
|
|
||||||
);
|
|
||||||
expectThrows(
|
|
||||||
IOException.class,
|
|
||||||
() -> container.readBlob(OperationPurpose.SNAPSHOT, blobName, content.length + 1, content.length).read()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getEndpointForServer() {
|
private String getEndpointForServer() {
|
||||||
|
|
|
@ -20,7 +20,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.core.Streams;
|
import org.elasticsearch.core.Streams;
|
||||||
import org.elasticsearch.core.SuppressForbidden;
|
import org.elasticsearch.core.SuppressForbidden;
|
||||||
|
@ -44,6 +43,7 @@ import java.util.Collections;
|
||||||
|
|
||||||
import javax.security.auth.Subject;
|
import javax.security.auth.Subject;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
|
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
|
||||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.readBlobFully;
|
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.readBlobFully;
|
||||||
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.writeBlob;
|
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.writeBlob;
|
||||||
|
@ -131,7 +131,7 @@ public class HdfsBlobStoreContainerTests extends ESTestCase {
|
||||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||||
writeBlob(container, "foo", new BytesArray(data), randomBoolean());
|
writeBlob(container, "foo", new BytesArray(data), randomBoolean());
|
||||||
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
|
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
|
||||||
assertTrue(container.blobExists(OperationPurpose.SNAPSHOT, "foo"));
|
assertTrue(container.blobExists(randomPurpose(), "foo"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReadRange() throws Exception {
|
public void testReadRange() throws Exception {
|
||||||
|
@ -162,7 +162,7 @@ public class HdfsBlobStoreContainerTests extends ESTestCase {
|
||||||
int pos = randomIntBetween(0, data.length / 2);
|
int pos = randomIntBetween(0, data.length / 2);
|
||||||
int len = randomIntBetween(pos, data.length) - pos;
|
int len = randomIntBetween(pos, data.length) - pos;
|
||||||
assertArrayEquals(readBlobPartially(container, "foo", pos, len), Arrays.copyOfRange(data, pos, pos + len));
|
assertArrayEquals(readBlobPartially(container, "foo", pos, len), Arrays.copyOfRange(data, pos, pos + len));
|
||||||
assertTrue(container.blobExists(OperationPurpose.SNAPSHOT, "foo"));
|
assertTrue(container.blobExists(randomPurpose(), "foo"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReplicationFactor() throws Exception {
|
public void testReplicationFactor() throws Exception {
|
||||||
|
@ -209,24 +209,24 @@ public class HdfsBlobStoreContainerTests extends ESTestCase {
|
||||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||||
writeBlob(container, "foo", new BytesArray(data), randomBoolean());
|
writeBlob(container, "foo", new BytesArray(data), randomBoolean());
|
||||||
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
|
assertArrayEquals(readBlobFully(container, "foo", data.length), data);
|
||||||
assertTrue(container.blobExists(OperationPurpose.SNAPSHOT, "foo"));
|
assertTrue(container.blobExists(randomPurpose(), "foo"));
|
||||||
writeBlob(container, "bar", new BytesArray(data), randomBoolean());
|
writeBlob(container, "bar", new BytesArray(data), randomBoolean());
|
||||||
assertArrayEquals(readBlobFully(container, "bar", data.length), data);
|
assertArrayEquals(readBlobFully(container, "bar", data.length), data);
|
||||||
assertTrue(container.blobExists(OperationPurpose.SNAPSHOT, "bar"));
|
assertTrue(container.blobExists(randomPurpose(), "bar"));
|
||||||
|
|
||||||
assertEquals(2, container.listBlobsByPrefix(OperationPurpose.SNAPSHOT, null).size());
|
assertEquals(2, container.listBlobsByPrefix(randomPurpose(), null).size());
|
||||||
assertEquals(1, container.listBlobsByPrefix(OperationPurpose.SNAPSHOT, "fo").size());
|
assertEquals(1, container.listBlobsByPrefix(randomPurpose(), "fo").size());
|
||||||
assertEquals(0, container.listBlobsByPrefix(OperationPurpose.SNAPSHOT, "noSuchFile").size());
|
assertEquals(0, container.listBlobsByPrefix(randomPurpose(), "noSuchFile").size());
|
||||||
|
|
||||||
container.delete(OperationPurpose.SNAPSHOT);
|
container.delete(randomPurpose());
|
||||||
assertEquals(0, container.listBlobsByPrefix(OperationPurpose.SNAPSHOT, null).size());
|
assertEquals(0, container.listBlobsByPrefix(randomPurpose(), null).size());
|
||||||
assertEquals(0, container.listBlobsByPrefix(OperationPurpose.SNAPSHOT, "fo").size());
|
assertEquals(0, container.listBlobsByPrefix(randomPurpose(), "fo").size());
|
||||||
assertEquals(0, container.listBlobsByPrefix(OperationPurpose.SNAPSHOT, "noSuchFile").size());
|
assertEquals(0, container.listBlobsByPrefix(randomPurpose(), "noSuchFile").size());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static byte[] readBlobPartially(BlobContainer container, String name, int pos, int length) throws IOException {
|
public static byte[] readBlobPartially(BlobContainer container, String name, int pos, int length) throws IOException {
|
||||||
byte[] data = new byte[length];
|
byte[] data = new byte[length];
|
||||||
try (InputStream inputStream = container.readBlob(OperationPurpose.SNAPSHOT, name, pos, length)) {
|
try (InputStream inputStream = container.readBlob(randomPurpose(), name, pos, length)) {
|
||||||
assertThat(Streams.readFully(inputStream, data), CoreMatchers.equalTo(length));
|
assertThat(Streams.readFully(inputStream, data), CoreMatchers.equalTo(length));
|
||||||
assertThat(inputStream.read(), CoreMatchers.equalTo(-1));
|
assertThat(inputStream.read(), CoreMatchers.equalTo(-1));
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,6 @@ import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupReposi
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
|
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
|
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
|
||||||
|
@ -24,6 +23,7 @@ import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
@ -98,7 +98,7 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
|
||||||
garbageFuture,
|
garbageFuture,
|
||||||
() -> repository.blobStore()
|
() -> repository.blobStore()
|
||||||
.blobContainer(repository.basePath())
|
.blobContainer(repository.basePath())
|
||||||
.writeBlob(OperationPurpose.SNAPSHOT, "snap-foo.dat", new BytesArray(new byte[1]), true)
|
.writeBlob(randomPurpose(), "snap-foo.dat", new BytesArray(new byte[1]), true)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
garbageFuture.get();
|
garbageFuture.get();
|
||||||
|
@ -147,7 +147,7 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
|
||||||
() -> repository.blobStore()
|
() -> repository.blobStore()
|
||||||
.blobContainer(repository.basePath())
|
.blobContainer(repository.basePath())
|
||||||
.writeBlob(
|
.writeBlob(
|
||||||
OperationPurpose.SNAPSHOT,
|
randomPurpose(),
|
||||||
BlobStoreRepository.INDEX_FILE_PREFIX + generation,
|
BlobStoreRepository.INDEX_FILE_PREFIX + generation,
|
||||||
new BytesArray(new byte[1]),
|
new BytesArray(new byte[1]),
|
||||||
true
|
true
|
||||||
|
|
|
@ -13,7 +13,6 @@ import org.apache.lucene.tests.util.LuceneTestCase;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.blobstore.OptionalBytesReference;
|
import org.elasticsearch.common.blobstore.OptionalBytesReference;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
@ -47,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
@ -87,7 +87,7 @@ public class FsBlobContainerTests extends ESTestCase {
|
||||||
final long start = randomLongBetween(0L, Math.max(0L, blobData.length - 1));
|
final long start = randomLongBetween(0L, Math.max(0L, blobData.length - 1));
|
||||||
final long length = randomLongBetween(1L, blobData.length - start);
|
final long length = randomLongBetween(1L, blobData.length - start);
|
||||||
|
|
||||||
try (InputStream stream = container.readBlob(OperationPurpose.SNAPSHOT, blobName, start, length)) {
|
try (InputStream stream = container.readBlob(randomPurpose(), blobName, start, length)) {
|
||||||
assertThat(totalBytesRead.get(), equalTo(0L));
|
assertThat(totalBytesRead.get(), equalTo(0L));
|
||||||
assertThat(Streams.consumeFully(stream), equalTo(length));
|
assertThat(Streams.consumeFully(stream), equalTo(length));
|
||||||
assertThat(totalBytesRead.get(), equalTo(length));
|
assertThat(totalBytesRead.get(), equalTo(length));
|
||||||
|
@ -119,11 +119,11 @@ public class FsBlobContainerTests extends ESTestCase {
|
||||||
path
|
path
|
||||||
);
|
);
|
||||||
|
|
||||||
container.deleteBlobsIgnoringIfNotExists(OperationPurpose.SNAPSHOT, List.of(blobName).listIterator());
|
container.deleteBlobsIgnoringIfNotExists(randomPurpose(), List.of(blobName).listIterator());
|
||||||
// Should not throw exception
|
// Should not throw exception
|
||||||
container.deleteBlobsIgnoringIfNotExists(OperationPurpose.SNAPSHOT, List.of(blobName).listIterator());
|
container.deleteBlobsIgnoringIfNotExists(randomPurpose(), List.of(blobName).listIterator());
|
||||||
|
|
||||||
assertFalse(container.blobExists(OperationPurpose.SNAPSHOT, blobName));
|
assertFalse(container.blobExists(randomPurpose(), blobName));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static BytesReference getBytesAsync(Consumer<ActionListener<OptionalBytesReference>> consumer) {
|
private static BytesReference getBytesAsync(Consumer<ActionListener<OptionalBytesReference>> consumer) {
|
||||||
|
@ -150,11 +150,11 @@ public class FsBlobContainerTests extends ESTestCase {
|
||||||
|
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
switch (between(1, 4)) {
|
switch (between(1, 4)) {
|
||||||
case 1 -> assertEquals(expectedValue.get(), getBytesAsync(l -> container.getRegister(OperationPurpose.SNAPSHOT, key, l)));
|
case 1 -> assertEquals(expectedValue.get(), getBytesAsync(l -> container.getRegister(randomPurpose(), key, l)));
|
||||||
case 2 -> assertFalse(
|
case 2 -> assertFalse(
|
||||||
getAsync(
|
getAsync(
|
||||||
l -> container.compareAndSetRegister(
|
l -> container.compareAndSetRegister(
|
||||||
OperationPurpose.SNAPSHOT,
|
randomPurpose(),
|
||||||
key,
|
key,
|
||||||
randomValueOtherThan(expectedValue.get(), () -> new BytesArray(randomByteArrayOfLength(8))),
|
randomValueOtherThan(expectedValue.get(), () -> new BytesArray(randomByteArrayOfLength(8))),
|
||||||
new BytesArray(randomByteArrayOfLength(8)),
|
new BytesArray(randomByteArrayOfLength(8)),
|
||||||
|
@ -166,7 +166,7 @@ public class FsBlobContainerTests extends ESTestCase {
|
||||||
expectedValue.get(),
|
expectedValue.get(),
|
||||||
getBytesAsync(
|
getBytesAsync(
|
||||||
l -> container.compareAndExchangeRegister(
|
l -> container.compareAndExchangeRegister(
|
||||||
OperationPurpose.SNAPSHOT,
|
randomPurpose(),
|
||||||
key,
|
key,
|
||||||
randomValueOtherThan(expectedValue.get(), () -> new BytesArray(randomByteArrayOfLength(8))),
|
randomValueOtherThan(expectedValue.get(), () -> new BytesArray(randomByteArrayOfLength(8))),
|
||||||
new BytesArray(randomByteArrayOfLength(8)),
|
new BytesArray(randomByteArrayOfLength(8)),
|
||||||
|
@ -181,26 +181,20 @@ public class FsBlobContainerTests extends ESTestCase {
|
||||||
|
|
||||||
final var newValue = new BytesArray(randomByteArrayOfLength(8));
|
final var newValue = new BytesArray(randomByteArrayOfLength(8));
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
assertTrue(
|
assertTrue(getAsync(l -> container.compareAndSetRegister(randomPurpose(), key, expectedValue.get(), newValue, l)));
|
||||||
getAsync(l -> container.compareAndSetRegister(OperationPurpose.SNAPSHOT, key, expectedValue.get(), newValue, l))
|
|
||||||
);
|
|
||||||
} else {
|
} else {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
expectedValue.get(),
|
expectedValue.get(),
|
||||||
getBytesAsync(
|
getBytesAsync(l -> container.compareAndExchangeRegister(randomPurpose(), key, expectedValue.get(), newValue, l))
|
||||||
l -> container.compareAndExchangeRegister(OperationPurpose.SNAPSHOT, key, expectedValue.get(), newValue, l)
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
expectedValue.set(newValue);
|
expectedValue.set(newValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
container.writeBlob(OperationPurpose.SNAPSHOT, key, new BytesArray(new byte[17]), false);
|
container.writeBlob(randomPurpose(), key, new BytesArray(new byte[17]), false);
|
||||||
expectThrows(
|
expectThrows(
|
||||||
IllegalStateException.class,
|
IllegalStateException.class,
|
||||||
() -> getBytesAsync(
|
() -> getBytesAsync(l -> container.compareAndExchangeRegister(randomPurpose(), key, expectedValue.get(), BytesArray.EMPTY, l))
|
||||||
l -> container.compareAndExchangeRegister(OperationPurpose.SNAPSHOT, key, expectedValue.get(), BytesArray.EMPTY, l)
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,25 +228,20 @@ public class FsBlobContainerTests extends ESTestCase {
|
||||||
BlobPath.EMPTY,
|
BlobPath.EMPTY,
|
||||||
path
|
path
|
||||||
);
|
);
|
||||||
container.writeBlobAtomic(
|
container.writeBlobAtomic(randomPurpose(), blobName, new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512))), true);
|
||||||
OperationPurpose.SNAPSHOT,
|
|
||||||
blobName,
|
|
||||||
new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512))),
|
|
||||||
true
|
|
||||||
);
|
|
||||||
final var blobData = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512)));
|
final var blobData = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512)));
|
||||||
container.writeBlobAtomic(OperationPurpose.SNAPSHOT, blobName, blobData, false);
|
container.writeBlobAtomic(randomPurpose(), blobName, blobData, false);
|
||||||
assertEquals(blobData, Streams.readFully(container.readBlob(OperationPurpose.SNAPSHOT, blobName)));
|
assertEquals(blobData, Streams.readFully(container.readBlob(randomPurpose(), blobName)));
|
||||||
expectThrows(
|
expectThrows(
|
||||||
FileAlreadyExistsException.class,
|
FileAlreadyExistsException.class,
|
||||||
() -> container.writeBlobAtomic(
|
() -> container.writeBlobAtomic(
|
||||||
OperationPurpose.SNAPSHOT,
|
randomPurpose(),
|
||||||
blobName,
|
blobName,
|
||||||
new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512))),
|
new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512))),
|
||||||
true
|
true
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
for (String blob : container.listBlobs(OperationPurpose.SNAPSHOT).keySet()) {
|
for (String blob : container.listBlobs(randomPurpose()).keySet()) {
|
||||||
assertFalse("unexpected temp blob [" + blob + "]", FsBlobContainer.isTempBlobName(blob));
|
assertFalse("unexpected temp blob [" + blob + "]", FsBlobContainer.isTempBlobName(blob));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Numbers;
|
import org.elasticsearch.common.Numbers;
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -68,6 +67,7 @@ import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
|
import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.hamcrest.Matchers.allOf;
|
import static org.hamcrest.Matchers.allOf;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
|
@ -204,7 +204,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
for (int i = 0; i < 16; i++) {
|
for (int i = 0; i < 16; i++) {
|
||||||
repository.blobContainer()
|
repository.blobContainer()
|
||||||
.writeBlob(OperationPurpose.SNAPSHOT, BlobStoreRepository.INDEX_LATEST_BLOB, new BytesArray(buffer, 0, i), false);
|
.writeBlob(randomPurpose(), BlobStoreRepository.INDEX_LATEST_BLOB, new BytesArray(buffer, 0, i), false);
|
||||||
if (i == 8) {
|
if (i == 8) {
|
||||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(generation));
|
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(generation));
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -13,7 +13,6 @@ import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
|
||||||
import org.elasticsearch.common.blobstore.support.BlobMetadata;
|
import org.elasticsearch.common.blobstore.support.BlobMetadata;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
@ -32,6 +31,7 @@ import java.io.InputStream;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.Matchers.greaterThan;
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
|
||||||
public class BlobStoreFormatTests extends ESTestCase {
|
public class BlobStoreFormatTests extends ESTestCase {
|
||||||
|
@ -114,7 +114,7 @@ public class BlobStoreFormatTests extends ESTestCase {
|
||||||
BlobObj blobObj = new BlobObj(veryRedundantText.toString());
|
BlobObj blobObj = new BlobObj(veryRedundantText.toString());
|
||||||
checksumFormat.write(blobObj, blobContainer, "blob-comp", true);
|
checksumFormat.write(blobObj, blobContainer, "blob-comp", true);
|
||||||
checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false);
|
checksumFormat.write(blobObj, blobContainer, "blob-not-comp", false);
|
||||||
Map<String, BlobMetadata> blobs = blobContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT, "blob-");
|
Map<String, BlobMetadata> blobs = blobContainer.listBlobsByPrefix(randomPurpose(), "blob-");
|
||||||
assertEquals(blobs.size(), 2);
|
assertEquals(blobs.size(), 2);
|
||||||
assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length()));
|
assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length()));
|
||||||
}
|
}
|
||||||
|
@ -147,8 +147,8 @@ public class BlobStoreFormatTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void randomCorruption(BlobContainer blobContainer, String blobName) throws IOException {
|
protected void randomCorruption(BlobContainer blobContainer, String blobName) throws IOException {
|
||||||
final byte[] buffer = new byte[(int) blobContainer.listBlobsByPrefix(OperationPurpose.SNAPSHOT, blobName).get(blobName).length()];
|
final byte[] buffer = new byte[(int) blobContainer.listBlobsByPrefix(randomPurpose(), blobName).get(blobName).length()];
|
||||||
try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.SNAPSHOT, blobName)) {
|
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), blobName)) {
|
||||||
Streams.readFully(inputStream, buffer);
|
Streams.readFully(inputStream, buffer);
|
||||||
}
|
}
|
||||||
final BytesArray corruptedBytes;
|
final BytesArray corruptedBytes;
|
||||||
|
@ -164,7 +164,7 @@ public class BlobStoreFormatTests extends ESTestCase {
|
||||||
// another sequence of 8 zero bytes anywhere in the file, let alone such a sequence followed by a correct checksum.
|
// another sequence of 8 zero bytes anywhere in the file, let alone such a sequence followed by a correct checksum.
|
||||||
corruptedBytes = new BytesArray(buffer, 0, location);
|
corruptedBytes = new BytesArray(buffer, 0, location);
|
||||||
}
|
}
|
||||||
blobContainer.writeBlob(OperationPurpose.SNAPSHOT, blobName, corruptedBytes, false);
|
blobContainer.writeBlob(randomPurpose(), blobName, corruptedBytes, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,6 @@ import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.blobstore.support.BlobMetadata;
|
import org.elasticsearch.common.blobstore.support.BlobMetadata;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
@ -37,6 +36,7 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.hamcrest.Matchers.contains;
|
import static org.hamcrest.Matchers.contains;
|
||||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
|
@ -75,9 +75,7 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
|
||||||
private void deleteAndAssertEmpty(BlobPath path) {
|
private void deleteAndAssertEmpty(BlobPath path) {
|
||||||
final BlobStoreRepository repo = getRepository();
|
final BlobStoreRepository repo = getRepository();
|
||||||
final PlainActionFuture<Void> future = new PlainActionFuture<>();
|
final PlainActionFuture<Void> future = new PlainActionFuture<>();
|
||||||
repo.threadPool()
|
repo.threadPool().generic().execute(ActionRunnable.run(future, () -> repo.blobStore().blobContainer(path).delete(randomPurpose())));
|
||||||
.generic()
|
|
||||||
.execute(ActionRunnable.run(future, () -> repo.blobStore().blobContainer(path).delete(OperationPurpose.SNAPSHOT)));
|
|
||||||
future.actionGet();
|
future.actionGet();
|
||||||
final BlobPath parent = path.parent();
|
final BlobPath parent = path.parent();
|
||||||
if (parent == null) {
|
if (parent == null) {
|
||||||
|
@ -131,28 +129,16 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
|
||||||
final BlobStore blobStore = repo.blobStore();
|
final BlobStore blobStore = repo.blobStore();
|
||||||
blobStore.blobContainer(repo.basePath().add("foo"))
|
blobStore.blobContainer(repo.basePath().add("foo"))
|
||||||
.writeBlob(
|
.writeBlob(
|
||||||
OperationPurpose.SNAPSHOT,
|
randomPurpose(),
|
||||||
"nested-blob",
|
"nested-blob",
|
||||||
new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)),
|
new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)),
|
||||||
testBlobLen,
|
testBlobLen,
|
||||||
false
|
false
|
||||||
);
|
);
|
||||||
blobStore.blobContainer(repo.basePath().add("foo").add("nested"))
|
blobStore.blobContainer(repo.basePath().add("foo").add("nested"))
|
||||||
.writeBlob(
|
.writeBlob(randomPurpose(), "bar", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
|
||||||
OperationPurpose.SNAPSHOT,
|
|
||||||
"bar",
|
|
||||||
new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)),
|
|
||||||
testBlobLen,
|
|
||||||
false
|
|
||||||
);
|
|
||||||
blobStore.blobContainer(repo.basePath().add("foo").add("nested2"))
|
blobStore.blobContainer(repo.basePath().add("foo").add("nested2"))
|
||||||
.writeBlob(
|
.writeBlob(randomPurpose(), "blub", new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)), testBlobLen, false);
|
||||||
OperationPurpose.SNAPSHOT,
|
|
||||||
"blub",
|
|
||||||
new ByteArrayInputStream(randomByteArrayOfLength(testBlobLen)),
|
|
||||||
testBlobLen,
|
|
||||||
false
|
|
||||||
);
|
|
||||||
}));
|
}));
|
||||||
future.actionGet();
|
future.actionGet();
|
||||||
assertChildren(repo.basePath(), Collections.singleton("foo"));
|
assertChildren(repo.basePath(), Collections.singleton("foo"));
|
||||||
|
@ -265,7 +251,7 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
|
||||||
repository.blobStore()
|
repository.blobStore()
|
||||||
.blobContainer(repository.basePath())
|
.blobContainer(repository.basePath())
|
||||||
.readBlob(
|
.readBlob(
|
||||||
OperationPurpose.SNAPSHOT,
|
randomPurpose(),
|
||||||
// Deliberately not using BlobStoreRepository#INDEX_LATEST_BLOB here, it's important for external systems that a
|
// Deliberately not using BlobStoreRepository#INDEX_LATEST_BLOB here, it's important for external systems that a
|
||||||
// blob with literally this name is updated on each write:
|
// blob with literally this name is updated on each write:
|
||||||
"index.latest"
|
"index.latest"
|
||||||
|
@ -286,10 +272,10 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
|
||||||
genericExec.execute(ActionRunnable.run(future, () -> {
|
genericExec.execute(ActionRunnable.run(future, () -> {
|
||||||
final BlobStore blobStore = repo.blobStore();
|
final BlobStore blobStore = repo.blobStore();
|
||||||
blobStore.blobContainer(repo.basePath().add("indices").add("foo"))
|
blobStore.blobContainer(repo.basePath().add("indices").add("foo"))
|
||||||
.writeBlob(OperationPurpose.SNAPSHOT, "bar", new ByteArrayInputStream(new byte[3]), 3, false);
|
.writeBlob(randomPurpose(), "bar", new ByteArrayInputStream(new byte[3]), 3, false);
|
||||||
for (String prefix : Arrays.asList("snap-", "meta-")) {
|
for (String prefix : Arrays.asList("snap-", "meta-")) {
|
||||||
blobStore.blobContainer(repo.basePath())
|
blobStore.blobContainer(repo.basePath())
|
||||||
.writeBlob(OperationPurpose.SNAPSHOT, prefix + "foo.dat", new ByteArrayInputStream(new byte[3]), 3, false);
|
.writeBlob(randomPurpose(), prefix + "foo.dat", new ByteArrayInputStream(new byte[3]), 3, false);
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
future.get();
|
future.get();
|
||||||
|
@ -297,10 +283,10 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
|
||||||
final PlainActionFuture<Boolean> corruptionFuture = new PlainActionFuture<>();
|
final PlainActionFuture<Boolean> corruptionFuture = new PlainActionFuture<>();
|
||||||
genericExec.execute(ActionRunnable.supply(corruptionFuture, () -> {
|
genericExec.execute(ActionRunnable.supply(corruptionFuture, () -> {
|
||||||
final BlobStore blobStore = repo.blobStore();
|
final BlobStore blobStore = repo.blobStore();
|
||||||
return blobStore.blobContainer(repo.basePath().add("indices")).children(OperationPurpose.SNAPSHOT).containsKey("foo")
|
return blobStore.blobContainer(repo.basePath().add("indices")).children(randomPurpose()).containsKey("foo")
|
||||||
&& blobStore.blobContainer(repo.basePath().add("indices").add("foo")).blobExists(OperationPurpose.SNAPSHOT, "bar")
|
&& blobStore.blobContainer(repo.basePath().add("indices").add("foo")).blobExists(randomPurpose(), "bar")
|
||||||
&& blobStore.blobContainer(repo.basePath()).blobExists(OperationPurpose.SNAPSHOT, "meta-foo.dat")
|
&& blobStore.blobContainer(repo.basePath()).blobExists(randomPurpose(), "meta-foo.dat")
|
||||||
&& blobStore.blobContainer(repo.basePath()).blobExists(OperationPurpose.SNAPSHOT, "snap-foo.dat");
|
&& blobStore.blobContainer(repo.basePath()).blobExists(randomPurpose(), "snap-foo.dat");
|
||||||
}));
|
}));
|
||||||
assertTrue(corruptionFuture.get());
|
assertTrue(corruptionFuture.get());
|
||||||
}
|
}
|
||||||
|
@ -320,9 +306,7 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
|
||||||
final BlobStoreRepository repository = getRepository();
|
final BlobStoreRepository repository = getRepository();
|
||||||
repository.threadPool()
|
repository.threadPool()
|
||||||
.generic()
|
.generic()
|
||||||
.execute(
|
.execute(ActionRunnable.supply(future, () -> repository.blobStore().blobContainer(path).children(randomPurpose()).keySet()));
|
||||||
ActionRunnable.supply(future, () -> repository.blobStore().blobContainer(path).children(OperationPurpose.SNAPSHOT).keySet())
|
|
||||||
);
|
|
||||||
return future.actionGet();
|
return future.actionGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,6 @@ import com.sun.net.httpserver.HttpServer;
|
||||||
import org.apache.http.ConnectionClosedException;
|
import org.apache.http.ConnectionClosedException;
|
||||||
import org.apache.http.HttpStatus;
|
import org.apache.http.HttpStatus;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
@ -42,6 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.either;
|
import static org.hamcrest.Matchers.either;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
@ -94,9 +94,9 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
|
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
|
||||||
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
|
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
Streams.readFully(blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_nonexistent_blob"));
|
Streams.readFully(blobContainer.readBlob(randomPurpose(), "read_nonexistent_blob"));
|
||||||
} else {
|
} else {
|
||||||
Streams.readFully(blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_nonexistent_blob", 0, 1));
|
Streams.readFully(blobContainer.readBlob(randomPurpose(), "read_nonexistent_blob", 0, 1));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
final String fullBlobPath = blobContainer.path().buildAsString() + "read_nonexistent_blob";
|
final String fullBlobPath = blobContainer.path().buildAsString() + "read_nonexistent_blob";
|
||||||
|
@ -104,7 +104,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
assertThat(
|
assertThat(
|
||||||
expectThrows(
|
expectThrows(
|
||||||
NoSuchFileException.class,
|
NoSuchFileException.class,
|
||||||
() -> Streams.readFully(blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_nonexistent_blob", position, length))
|
() -> Streams.readFully(blobContainer.readBlob(randomPurpose(), "read_nonexistent_blob", position, length))
|
||||||
).getMessage().toLowerCase(Locale.ROOT),
|
).getMessage().toLowerCase(Locale.ROOT),
|
||||||
containsString("blob object [" + fullBlobPath + "] not found")
|
containsString("blob object [" + fullBlobPath + "] not found")
|
||||||
);
|
);
|
||||||
|
@ -146,7 +146,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_max_retries")) {
|
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_blob_max_retries")) {
|
||||||
final int readLimit;
|
final int readLimit;
|
||||||
final InputStream wrappedStream;
|
final InputStream wrappedStream;
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
|
@ -212,7 +212,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
|
|
||||||
final int position = randomIntBetween(0, bytes.length - 1);
|
final int position = randomIntBetween(0, bytes.length - 1);
|
||||||
final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
|
final int length = randomIntBetween(0, randomBoolean() ? bytes.length : Integer.MAX_VALUE);
|
||||||
try (InputStream inputStream = blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_range_blob_max_retries", position, length)) {
|
try (InputStream inputStream = blobContainer.readBlob(randomPurpose(), "read_range_blob_max_retries", position, length)) {
|
||||||
final int readLimit;
|
final int readLimit;
|
||||||
final InputStream wrappedStream;
|
final InputStream wrappedStream;
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
|
@ -252,7 +252,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
|
|
||||||
Exception exception = expectThrows(
|
Exception exception = expectThrows(
|
||||||
unresponsiveExceptionType(),
|
unresponsiveExceptionType(),
|
||||||
() -> Streams.readFully(blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_unresponsive"))
|
() -> Streams.readFully(blobContainer.readBlob(randomPurpose(), "read_blob_unresponsive"))
|
||||||
);
|
);
|
||||||
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
|
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
|
||||||
assertThat(exception.getCause(), instanceOf(SocketTimeoutException.class));
|
assertThat(exception.getCause(), instanceOf(SocketTimeoutException.class));
|
||||||
|
@ -269,8 +269,8 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
exception = expectThrows(Exception.class, () -> {
|
exception = expectThrows(Exception.class, () -> {
|
||||||
try (
|
try (
|
||||||
InputStream stream = randomBoolean()
|
InputStream stream = randomBoolean()
|
||||||
? blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_incomplete")
|
? blobContainer.readBlob(randomPurpose(), "read_blob_incomplete")
|
||||||
: blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_incomplete", position, length)
|
: blobContainer.readBlob(randomPurpose(), "read_blob_incomplete", position, length)
|
||||||
) {
|
) {
|
||||||
Streams.readFully(stream);
|
Streams.readFully(stream);
|
||||||
}
|
}
|
||||||
|
@ -298,9 +298,9 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
|
|
||||||
Exception exception = expectThrows(unresponsiveExceptionType(), () -> {
|
Exception exception = expectThrows(unresponsiveExceptionType(), () -> {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
Streams.readFully(blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_no_response"));
|
Streams.readFully(blobContainer.readBlob(randomPurpose(), "read_blob_no_response"));
|
||||||
} else {
|
} else {
|
||||||
Streams.readFully(blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_no_response", 0, 1));
|
Streams.readFully(blobContainer.readBlob(randomPurpose(), "read_blob_no_response", 0, 1));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
assertThat(
|
assertThat(
|
||||||
|
@ -323,8 +323,8 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
final Exception exception = expectThrows(Exception.class, () -> {
|
final Exception exception = expectThrows(Exception.class, () -> {
|
||||||
try (
|
try (
|
||||||
InputStream stream = randomBoolean()
|
InputStream stream = randomBoolean()
|
||||||
? blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_incomplete", 0, 1)
|
? blobContainer.readBlob(randomPurpose(), "read_blob_incomplete", 0, 1)
|
||||||
: blobContainer.readBlob(OperationPurpose.SNAPSHOT, "read_blob_incomplete")
|
: blobContainer.readBlob(randomPurpose(), "read_blob_incomplete")
|
||||||
) {
|
) {
|
||||||
Streams.readFully(stream);
|
Streams.readFully(stream);
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.apache.lucene.tests.util.LuceneTestCase.random;
|
import static org.apache.lucene.tests.util.LuceneTestCase.random;
|
||||||
|
import static org.elasticsearch.test.ESTestCase.randomFrom;
|
||||||
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
|
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
|
||||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
import static org.hamcrest.Matchers.empty;
|
import static org.hamcrest.Matchers.empty;
|
||||||
|
@ -104,7 +105,7 @@ public final class BlobStoreTestUtil {
|
||||||
try {
|
try {
|
||||||
final BlobContainer blobContainer = repository.blobContainer();
|
final BlobContainer blobContainer = repository.blobContainer();
|
||||||
final long latestGen;
|
final long latestGen;
|
||||||
try (DataInputStream inputStream = new DataInputStream(blobContainer.readBlob(OperationPurpose.SNAPSHOT, "index.latest"))) {
|
try (DataInputStream inputStream = new DataInputStream(blobContainer.readBlob(randomPurpose(), "index.latest"))) {
|
||||||
latestGen = inputStream.readLong();
|
latestGen = inputStream.readLong();
|
||||||
} catch (NoSuchFileException e) {
|
} catch (NoSuchFileException e) {
|
||||||
throw new AssertionError("Could not find index.latest blob for repo [" + repository + "]");
|
throw new AssertionError("Could not find index.latest blob for repo [" + repository + "]");
|
||||||
|
@ -112,7 +113,7 @@ public final class BlobStoreTestUtil {
|
||||||
assertIndexGenerations(blobContainer, latestGen);
|
assertIndexGenerations(blobContainer, latestGen);
|
||||||
final RepositoryData repositoryData;
|
final RepositoryData repositoryData;
|
||||||
try (
|
try (
|
||||||
InputStream blob = blobContainer.readBlob(OperationPurpose.SNAPSHOT, BlobStoreRepository.INDEX_FILE_PREFIX + latestGen);
|
InputStream blob = blobContainer.readBlob(randomPurpose(), BlobStoreRepository.INDEX_FILE_PREFIX + latestGen);
|
||||||
XContentParser parser = XContentType.JSON.xContent()
|
XContentParser parser = XContentType.JSON.xContent()
|
||||||
.createParser(XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE), blob)
|
.createParser(XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE), blob)
|
||||||
) {
|
) {
|
||||||
|
@ -153,7 +154,7 @@ public final class BlobStoreTestUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {
|
private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {
|
||||||
final long[] indexGenerations = repoRoot.listBlobsByPrefix(OperationPurpose.SNAPSHOT, BlobStoreRepository.INDEX_FILE_PREFIX)
|
final long[] indexGenerations = repoRoot.listBlobsByPrefix(randomPurpose(), BlobStoreRepository.INDEX_FILE_PREFIX)
|
||||||
.keySet()
|
.keySet()
|
||||||
.stream()
|
.stream()
|
||||||
.map(s -> s.replace(BlobStoreRepository.INDEX_FILE_PREFIX, ""))
|
.map(s -> s.replace(BlobStoreRepository.INDEX_FILE_PREFIX, ""))
|
||||||
|
@ -165,12 +166,12 @@ public final class BlobStoreTestUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertShardIndexGenerations(BlobContainer repoRoot, ShardGenerations shardGenerations) throws IOException {
|
private static void assertShardIndexGenerations(BlobContainer repoRoot, ShardGenerations shardGenerations) throws IOException {
|
||||||
final BlobContainer indicesContainer = repoRoot.children(OperationPurpose.SNAPSHOT).get("indices");
|
final BlobContainer indicesContainer = repoRoot.children(randomPurpose()).get("indices");
|
||||||
for (IndexId index : shardGenerations.indices()) {
|
for (IndexId index : shardGenerations.indices()) {
|
||||||
final List<ShardGeneration> gens = shardGenerations.getGens(index);
|
final List<ShardGeneration> gens = shardGenerations.getGens(index);
|
||||||
if (gens.isEmpty() == false) {
|
if (gens.isEmpty() == false) {
|
||||||
final BlobContainer indexContainer = indicesContainer.children(OperationPurpose.SNAPSHOT).get(index.getId());
|
final BlobContainer indexContainer = indicesContainer.children(randomPurpose()).get(index.getId());
|
||||||
final Map<String, BlobContainer> shardContainers = indexContainer.children(OperationPurpose.SNAPSHOT);
|
final Map<String, BlobContainer> shardContainers = indexContainer.children(randomPurpose());
|
||||||
for (int i = 0; i < gens.size(); i++) {
|
for (int i = 0; i < gens.size(); i++) {
|
||||||
final ShardGeneration generation = gens.get(i);
|
final ShardGeneration generation = gens.get(i);
|
||||||
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
|
assertThat(generation, not(ShardGenerations.DELETED_SHARD_GEN));
|
||||||
|
@ -178,8 +179,7 @@ public final class BlobStoreTestUtil {
|
||||||
final String shardId = Integer.toString(i);
|
final String shardId = Integer.toString(i);
|
||||||
assertThat(shardContainers, hasKey(shardId));
|
assertThat(shardContainers, hasKey(shardId));
|
||||||
assertThat(
|
assertThat(
|
||||||
shardContainers.get(shardId)
|
shardContainers.get(shardId).listBlobsByPrefix(randomPurpose(), BlobStoreRepository.INDEX_FILE_PREFIX),
|
||||||
.listBlobsByPrefix(OperationPurpose.SNAPSHOT, BlobStoreRepository.INDEX_FILE_PREFIX),
|
|
||||||
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
|
hasKey(BlobStoreRepository.INDEX_FILE_PREFIX + generation)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -190,13 +190,13 @@ public final class BlobStoreTestUtil {
|
||||||
|
|
||||||
private static void assertIndexUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException {
|
private static void assertIndexUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException {
|
||||||
final List<String> expectedIndexUUIDs = repositoryData.getIndices().values().stream().map(IndexId::getId).toList();
|
final List<String> expectedIndexUUIDs = repositoryData.getIndices().values().stream().map(IndexId::getId).toList();
|
||||||
final BlobContainer indicesContainer = repository.blobContainer().children(OperationPurpose.SNAPSHOT).get("indices");
|
final BlobContainer indicesContainer = repository.blobContainer().children(randomPurpose()).get("indices");
|
||||||
final List<String> foundIndexUUIDs;
|
final List<String> foundIndexUUIDs;
|
||||||
if (indicesContainer == null) {
|
if (indicesContainer == null) {
|
||||||
foundIndexUUIDs = Collections.emptyList();
|
foundIndexUUIDs = Collections.emptyList();
|
||||||
} else {
|
} else {
|
||||||
// Skip Lucene MockFS extraN directory
|
// Skip Lucene MockFS extraN directory
|
||||||
foundIndexUUIDs = indicesContainer.children(OperationPurpose.SNAPSHOT)
|
foundIndexUUIDs = indicesContainer.children(randomPurpose())
|
||||||
.keySet()
|
.keySet()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(s -> s.startsWith("extra") == false)
|
.filter(s -> s.startsWith("extra") == false)
|
||||||
|
@ -204,9 +204,9 @@ public final class BlobStoreTestUtil {
|
||||||
}
|
}
|
||||||
assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY)));
|
assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY)));
|
||||||
for (String indexId : foundIndexUUIDs) {
|
for (String indexId : foundIndexUUIDs) {
|
||||||
final Set<String> indexMetaGenerationsFound = indicesContainer.children(OperationPurpose.SNAPSHOT)
|
final Set<String> indexMetaGenerationsFound = indicesContainer.children(randomPurpose())
|
||||||
.get(indexId)
|
.get(indexId)
|
||||||
.listBlobsByPrefix(OperationPurpose.SNAPSHOT, BlobStoreRepository.METADATA_PREFIX)
|
.listBlobsByPrefix(randomPurpose(), BlobStoreRepository.METADATA_PREFIX)
|
||||||
.keySet()
|
.keySet()
|
||||||
.stream()
|
.stream()
|
||||||
.map(p -> p.replace(BlobStoreRepository.METADATA_PREFIX, "").replace(".dat", ""))
|
.map(p -> p.replace(BlobStoreRepository.METADATA_PREFIX, "").replace(".dat", ""))
|
||||||
|
@ -231,7 +231,7 @@ public final class BlobStoreTestUtil {
|
||||||
final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
|
final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
|
||||||
final List<String> expectedSnapshotUUIDs = snapshotIds.stream().map(SnapshotId::getUUID).toList();
|
final List<String> expectedSnapshotUUIDs = snapshotIds.stream().map(SnapshotId::getUUID).toList();
|
||||||
for (String prefix : new String[] { BlobStoreRepository.SNAPSHOT_PREFIX, BlobStoreRepository.METADATA_PREFIX }) {
|
for (String prefix : new String[] { BlobStoreRepository.SNAPSHOT_PREFIX, BlobStoreRepository.METADATA_PREFIX }) {
|
||||||
final Collection<String> foundSnapshotUUIDs = repoRoot.listBlobs(OperationPurpose.SNAPSHOT)
|
final Collection<String> foundSnapshotUUIDs = repoRoot.listBlobs(randomPurpose())
|
||||||
.keySet()
|
.keySet()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(p -> p.startsWith(prefix))
|
.filter(p -> p.startsWith(prefix))
|
||||||
|
@ -240,12 +240,12 @@ public final class BlobStoreTestUtil {
|
||||||
assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY)));
|
assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY)));
|
||||||
}
|
}
|
||||||
|
|
||||||
final BlobContainer indicesContainer = repository.getBlobContainer().children(OperationPurpose.SNAPSHOT).get("indices");
|
final BlobContainer indicesContainer = repository.getBlobContainer().children(randomPurpose()).get("indices");
|
||||||
final Map<String, BlobContainer> indices;
|
final Map<String, BlobContainer> indices;
|
||||||
if (indicesContainer == null) {
|
if (indicesContainer == null) {
|
||||||
indices = Collections.emptyMap();
|
indices = Collections.emptyMap();
|
||||||
} else {
|
} else {
|
||||||
indices = indicesContainer.children(OperationPurpose.SNAPSHOT);
|
indices = indicesContainer.children(randomPurpose());
|
||||||
}
|
}
|
||||||
if (snapshotIds.isEmpty()) {
|
if (snapshotIds.isEmpty()) {
|
||||||
listener.onResponse(null);
|
listener.onResponse(null);
|
||||||
|
@ -298,7 +298,7 @@ public final class BlobStoreTestUtil {
|
||||||
assertThat(indices, hasKey(indexId.getId()));
|
assertThat(indices, hasKey(indexId.getId()));
|
||||||
final BlobContainer indexContainer = indices.get(indexId.getId());
|
final BlobContainer indexContainer = indices.get(indexId.getId());
|
||||||
assertThat(
|
assertThat(
|
||||||
indexContainer.listBlobs(OperationPurpose.SNAPSHOT),
|
indexContainer.listBlobs(randomPurpose()),
|
||||||
hasKey(
|
hasKey(
|
||||||
String.format(
|
String.format(
|
||||||
Locale.ROOT,
|
Locale.ROOT,
|
||||||
|
@ -308,7 +308,7 @@ public final class BlobStoreTestUtil {
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId);
|
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId);
|
||||||
for (Map.Entry<String, BlobContainer> entry : indexContainer.children(OperationPurpose.SNAPSHOT).entrySet()) {
|
for (Map.Entry<String, BlobContainer> entry : indexContainer.children(randomPurpose()).entrySet()) {
|
||||||
// Skip Lucene MockFS extraN directory
|
// Skip Lucene MockFS extraN directory
|
||||||
if (entry.getKey().startsWith("extra")) {
|
if (entry.getKey().startsWith("extra")) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -322,10 +322,7 @@ public final class BlobStoreTestUtil {
|
||||||
final BlobContainer shardContainer = entry.getValue();
|
final BlobContainer shardContainer = entry.getValue();
|
||||||
// TODO: we shouldn't be leaking empty shard directories when a shard (but not all of the index it belongs to)
|
// TODO: we shouldn't be leaking empty shard directories when a shard (but not all of the index it belongs to)
|
||||||
// becomes unreferenced. We should fix that and remove this conditional once its fixed.
|
// becomes unreferenced. We should fix that and remove this conditional once its fixed.
|
||||||
if (shardContainer.listBlobs(OperationPurpose.SNAPSHOT)
|
if (shardContainer.listBlobs(randomPurpose()).keySet().stream().anyMatch(blob -> blob.startsWith("extra") == false)) {
|
||||||
.keySet()
|
|
||||||
.stream()
|
|
||||||
.anyMatch(blob -> blob.startsWith("extra") == false)) {
|
|
||||||
final int impliedCount = shardId - 1;
|
final int impliedCount = shardId - 1;
|
||||||
maxShardCountsSeen.compute(
|
maxShardCountsSeen.compute(
|
||||||
indexId,
|
indexId,
|
||||||
|
@ -336,7 +333,7 @@ public final class BlobStoreTestUtil {
|
||||||
&& snapshotInfo.shardFailures()
|
&& snapshotInfo.shardFailures()
|
||||||
.stream()
|
.stream()
|
||||||
.noneMatch(shardFailure -> shardFailure.index().equals(index) && shardFailure.shardId() == shardId)) {
|
.noneMatch(shardFailure -> shardFailure.index().equals(index) && shardFailure.shardId() == shardId)) {
|
||||||
final Map<String, BlobMetadata> shardPathContents = shardContainer.listBlobs(OperationPurpose.SNAPSHOT);
|
final Map<String, BlobMetadata> shardPathContents = shardContainer.listBlobs(randomPurpose());
|
||||||
assertThat(
|
assertThat(
|
||||||
shardPathContents,
|
shardPathContents,
|
||||||
hasKey(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()))
|
hasKey(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotId.getUUID()))
|
||||||
|
@ -376,10 +373,7 @@ public final class BlobStoreTestUtil {
|
||||||
repository.threadPool()
|
repository.threadPool()
|
||||||
.generic()
|
.generic()
|
||||||
.execute(
|
.execute(
|
||||||
ActionRunnable.supply(
|
ActionRunnable.supply(future, () -> repository.blobStore().blobContainer(path).listBlobsByPrefix(randomPurpose(), prefix))
|
||||||
future,
|
|
||||||
() -> repository.blobStore().blobContainer(path).listBlobsByPrefix(OperationPurpose.SNAPSHOT, prefix)
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
Map<String, BlobMetadata> foundBlobs = future.actionGet();
|
Map<String, BlobMetadata> foundBlobs = future.actionGet();
|
||||||
if (blobs.isEmpty()) {
|
if (blobs.isEmpty()) {
|
||||||
|
@ -464,4 +458,8 @@ public final class BlobStoreTestUtil {
|
||||||
when(clusterApplierService.threadPool()).thenReturn(threadPool);
|
when(clusterApplierService.threadPool()).thenReturn(threadPool);
|
||||||
return clusterService;
|
return clusterService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static OperationPurpose randomPurpose() {
|
||||||
|
return randomFrom(OperationPurpose.values());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,6 @@ import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.blobstore.support.BlobMetadata;
|
import org.elasticsearch.common.blobstore.support.BlobMetadata;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -63,6 +62,7 @@ import java.util.stream.Stream;
|
||||||
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
|
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
|
||||||
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_INDEX_NAME_FORMAT;
|
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_INDEX_NAME_FORMAT;
|
||||||
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT;
|
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT;
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
@ -124,7 +124,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
try (BlobStore store = newBlobStore()) {
|
try (BlobStore store = newBlobStore()) {
|
||||||
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
|
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
|
||||||
expectThrows(NoSuchFileException.class, () -> {
|
expectThrows(NoSuchFileException.class, () -> {
|
||||||
try (InputStream is = container.readBlob(OperationPurpose.SNAPSHOT, "non-existing")) {
|
try (InputStream is = container.readBlob(randomPurpose(), "non-existing")) {
|
||||||
is.read();
|
is.read();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -141,7 +141,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||||
writeBlob(container, "foobar", new BytesArray(data), false);
|
writeBlob(container, "foobar", new BytesArray(data), false);
|
||||||
}
|
}
|
||||||
try (InputStream stream = container.readBlob(OperationPurpose.SNAPSHOT, "foobar")) {
|
try (InputStream stream = container.readBlob(randomPurpose(), "foobar")) {
|
||||||
BytesRefBuilder target = new BytesRefBuilder();
|
BytesRefBuilder target = new BytesRefBuilder();
|
||||||
while (target.length() < data.length) {
|
while (target.length() < data.length) {
|
||||||
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
|
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
|
||||||
|
@ -156,14 +156,14 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
assertEquals(data.length, target.length());
|
assertEquals(data.length, target.length());
|
||||||
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
|
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
|
||||||
}
|
}
|
||||||
container.delete(OperationPurpose.SNAPSHOT);
|
container.delete(randomPurpose());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testList() throws IOException {
|
public void testList() throws IOException {
|
||||||
try (BlobStore store = newBlobStore()) {
|
try (BlobStore store = newBlobStore()) {
|
||||||
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
|
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
|
||||||
assertThat(container.listBlobs(OperationPurpose.SNAPSHOT).size(), CoreMatchers.equalTo(0));
|
assertThat(container.listBlobs(randomPurpose()).size(), CoreMatchers.equalTo(0));
|
||||||
int numberOfFooBlobs = randomIntBetween(0, 10);
|
int numberOfFooBlobs = randomIntBetween(0, 10);
|
||||||
int numberOfBarBlobs = randomIntBetween(3, 20);
|
int numberOfBarBlobs = randomIntBetween(3, 20);
|
||||||
Map<String, Long> generatedBlobs = new HashMap<>();
|
Map<String, Long> generatedBlobs = new HashMap<>();
|
||||||
|
@ -184,7 +184,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
generatedBlobs.put(name, (long) length);
|
generatedBlobs.put(name, (long) length);
|
||||||
writeRandomBlob(container, name, length);
|
writeRandomBlob(container, name, length);
|
||||||
|
|
||||||
Map<String, BlobMetadata> blobs = container.listBlobs(OperationPurpose.SNAPSHOT);
|
Map<String, BlobMetadata> blobs = container.listBlobs(randomPurpose());
|
||||||
assertThat(blobs.size(), CoreMatchers.equalTo(numberOfFooBlobs + numberOfBarBlobs));
|
assertThat(blobs.size(), CoreMatchers.equalTo(numberOfFooBlobs + numberOfBarBlobs));
|
||||||
for (Map.Entry<String, Long> generated : generatedBlobs.entrySet()) {
|
for (Map.Entry<String, Long> generated : generatedBlobs.entrySet()) {
|
||||||
BlobMetadata blobMetadata = blobs.get(generated.getKey());
|
BlobMetadata blobMetadata = blobs.get(generated.getKey());
|
||||||
|
@ -193,10 +193,10 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
assertThat(blobMetadata.length(), CoreMatchers.equalTo(blobLengthFromContentLength(generated.getValue())));
|
assertThat(blobMetadata.length(), CoreMatchers.equalTo(blobLengthFromContentLength(generated.getValue())));
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat(container.listBlobsByPrefix(OperationPurpose.SNAPSHOT, "foo-").size(), CoreMatchers.equalTo(numberOfFooBlobs));
|
assertThat(container.listBlobsByPrefix(randomPurpose(), "foo-").size(), CoreMatchers.equalTo(numberOfFooBlobs));
|
||||||
assertThat(container.listBlobsByPrefix(OperationPurpose.SNAPSHOT, "bar-").size(), CoreMatchers.equalTo(numberOfBarBlobs));
|
assertThat(container.listBlobsByPrefix(randomPurpose(), "bar-").size(), CoreMatchers.equalTo(numberOfBarBlobs));
|
||||||
assertThat(container.listBlobsByPrefix(OperationPurpose.SNAPSHOT, "baz-").size(), CoreMatchers.equalTo(0));
|
assertThat(container.listBlobsByPrefix(randomPurpose(), "baz-").size(), CoreMatchers.equalTo(0));
|
||||||
container.delete(OperationPurpose.SNAPSHOT);
|
container.delete(randomPurpose());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,17 +204,17 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
try (BlobStore store = newBlobStore()) {
|
try (BlobStore store = newBlobStore()) {
|
||||||
final List<String> blobNames = Arrays.asList("foobar", "barfoo");
|
final List<String> blobNames = Arrays.asList("foobar", "barfoo");
|
||||||
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
|
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
|
||||||
container.deleteBlobsIgnoringIfNotExists(OperationPurpose.SNAPSHOT, blobNames.iterator()); // does not raise when blobs
|
container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobNames.iterator()); // does not raise when blobs
|
||||||
// don't exist
|
// don't exist
|
||||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||||
final BytesArray bytesArray = new BytesArray(data);
|
final BytesArray bytesArray = new BytesArray(data);
|
||||||
for (String blobName : blobNames) {
|
for (String blobName : blobNames) {
|
||||||
writeBlob(container, blobName, bytesArray, randomBoolean());
|
writeBlob(container, blobName, bytesArray, randomBoolean());
|
||||||
}
|
}
|
||||||
assertEquals(container.listBlobs(OperationPurpose.SNAPSHOT).size(), 2);
|
assertEquals(container.listBlobs(randomPurpose()).size(), 2);
|
||||||
container.deleteBlobsIgnoringIfNotExists(OperationPurpose.SNAPSHOT, blobNames.iterator());
|
container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobNames.iterator());
|
||||||
assertTrue(container.listBlobs(OperationPurpose.SNAPSHOT).isEmpty());
|
assertTrue(container.listBlobs(randomPurpose()).isEmpty());
|
||||||
container.deleteBlobsIgnoringIfNotExists(OperationPurpose.SNAPSHOT, blobNames.iterator()); // does not raise when blobs
|
container.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobNames.iterator()); // does not raise when blobs
|
||||||
// don't exist
|
// don't exist
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -226,9 +226,9 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
boolean failIfAlreadyExists
|
boolean failIfAlreadyExists
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
container.writeBlob(OperationPurpose.SNAPSHOT, blobName, bytesArray, failIfAlreadyExists);
|
container.writeBlob(randomPurpose(), blobName, bytesArray, failIfAlreadyExists);
|
||||||
} else {
|
} else {
|
||||||
container.writeBlobAtomic(OperationPurpose.SNAPSHOT, blobName, bytesArray, failIfAlreadyExists);
|
container.writeBlobAtomic(randomPurpose(), blobName, bytesArray, failIfAlreadyExists);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,10 +244,10 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
assertArrayEquals(readBlobFully(containerFoo, "test", data1.length), data1);
|
assertArrayEquals(readBlobFully(containerFoo, "test", data1.length), data1);
|
||||||
assertArrayEquals(readBlobFully(containerBar, "test", data2.length), data2);
|
assertArrayEquals(readBlobFully(containerBar, "test", data2.length), data2);
|
||||||
|
|
||||||
assertTrue(containerFoo.blobExists(OperationPurpose.SNAPSHOT, "test"));
|
assertTrue(containerFoo.blobExists(randomPurpose(), "test"));
|
||||||
assertTrue(containerBar.blobExists(OperationPurpose.SNAPSHOT, "test"));
|
assertTrue(containerBar.blobExists(randomPurpose(), "test"));
|
||||||
containerBar.delete(OperationPurpose.SNAPSHOT);
|
containerBar.delete(randomPurpose());
|
||||||
containerFoo.delete(OperationPurpose.SNAPSHOT);
|
containerFoo.delete(randomPurpose());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,7 +259,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
|
|
||||||
public static byte[] readBlobFully(BlobContainer container, String name, int length) throws IOException {
|
public static byte[] readBlobFully(BlobContainer container, String name, int length) throws IOException {
|
||||||
byte[] data = new byte[length];
|
byte[] data = new byte[length];
|
||||||
try (InputStream inputStream = container.readBlob(OperationPurpose.SNAPSHOT, name)) {
|
try (InputStream inputStream = container.readBlob(randomPurpose(), name)) {
|
||||||
assertThat(Streams.readFully(inputStream, data), CoreMatchers.equalTo(length));
|
assertThat(Streams.readFully(inputStream, data), CoreMatchers.equalTo(length));
|
||||||
assertThat(inputStream.read(), CoreMatchers.equalTo(-1));
|
assertThat(inputStream.read(), CoreMatchers.equalTo(-1));
|
||||||
}
|
}
|
||||||
|
@ -275,7 +275,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
}
|
}
|
||||||
|
|
||||||
protected static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException {
|
protected static void writeBlob(BlobContainer container, String blobName, BytesArray bytesArray) throws IOException {
|
||||||
container.writeBlob(OperationPurpose.SNAPSHOT, blobName, bytesArray, true);
|
container.writeBlob(randomPurpose(), blobName, bytesArray, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected BlobStore newBlobStore() {
|
protected BlobStore newBlobStore() {
|
||||||
|
@ -488,7 +488,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
|
|
||||||
for (IndexId indexId : repositoryData.actionGet().getIndices().values()) {
|
for (IndexId indexId : repositoryData.actionGet().getIndices().values()) {
|
||||||
if (indexId.getName().equals("test-idx-3")) {
|
if (indexId.getName().equals("test-idx-3")) {
|
||||||
assertFalse(indicesBlobContainer.get().blobExists(OperationPurpose.SNAPSHOT, indexId.getId())); // deleted index
|
assertFalse(indicesBlobContainer.get().blobExists(randomPurpose(), indexId.getId())); // deleted index
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -507,7 +507,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
for (int j = 0; j < numberOfBlobsPerContainer; j++) {
|
for (int j = 0; j < numberOfBlobsPerContainer; j++) {
|
||||||
byte[] bytes = randomBytes(randomInt(100));
|
byte[] bytes = randomBytes(randomInt(100));
|
||||||
String blobName = randomAlphaOfLength(10);
|
String blobName = randomAlphaOfLength(10);
|
||||||
container.writeBlob(OperationPurpose.SNAPSHOT, blobName, new BytesArray(bytes), false);
|
container.writeBlob(randomPurpose(), blobName, new BytesArray(bytes), false);
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
blobsToDelete.add(containerPath.buildAsString() + blobName);
|
blobsToDelete.add(containerPath.buildAsString() + blobName);
|
||||||
} else {
|
} else {
|
||||||
|
@ -516,14 +516,14 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
store.deleteBlobsIgnoringIfNotExists(OperationPurpose.SNAPSHOT, blobsToDelete.iterator());
|
store.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobsToDelete.iterator());
|
||||||
for (var containerEntry : expectedBlobsPerContainer.entrySet()) {
|
for (var containerEntry : expectedBlobsPerContainer.entrySet()) {
|
||||||
BlobContainer blobContainer = store.blobContainer(containerEntry.getKey());
|
BlobContainer blobContainer = store.blobContainer(containerEntry.getKey());
|
||||||
Map<String, BlobMetadata> blobsInContainer = blobContainer.listBlobs(OperationPurpose.SNAPSHOT);
|
Map<String, BlobMetadata> blobsInContainer = blobContainer.listBlobs(randomPurpose());
|
||||||
for (String expectedBlob : containerEntry.getValue()) {
|
for (String expectedBlob : containerEntry.getValue()) {
|
||||||
assertThat(blobsInContainer, hasKey(expectedBlob));
|
assertThat(blobsInContainer, hasKey(expectedBlob));
|
||||||
}
|
}
|
||||||
blobContainer.delete(OperationPurpose.SNAPSHOT);
|
blobContainer.delete(randomPurpose());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -556,7 +556,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
|
|
||||||
// Create an extra dangling blob as if from an earlier snapshot that failed to clean up
|
// Create an extra dangling blob as if from an earlier snapshot that failed to clean up
|
||||||
shardContainer.writeBlob(
|
shardContainer.writeBlob(
|
||||||
OperationPurpose.SNAPSHOT,
|
randomPurpose(),
|
||||||
BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(random()),
|
BlobStoreRepository.UPLOADED_DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(random()),
|
||||||
BytesArray.EMPTY,
|
BytesArray.EMPTY,
|
||||||
true
|
true
|
||||||
|
@ -580,7 +580,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
||||||
assertAcked(client.admin().cluster().prepareDeleteSnapshot(repoName, "snapshot-1"));
|
assertAcked(client.admin().cluster().prepareDeleteSnapshot(repoName, "snapshot-1"));
|
||||||
|
|
||||||
// Retrieve the blobs actually present
|
// Retrieve the blobs actually present
|
||||||
final var actualBlobs = shardContainer.listBlobs(OperationPurpose.SNAPSHOT)
|
final var actualBlobs = shardContainer.listBlobs(randomPurpose())
|
||||||
.keySet()
|
.keySet()
|
||||||
.stream()
|
.stream()
|
||||||
.filter(f -> ExtrasFS.isExtra(f) == false)
|
.filter(f -> ExtrasFS.isExtra(f) == false)
|
||||||
|
|
|
@ -11,7 +11,6 @@ import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobPath;
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.core.IOUtils;
|
import org.elasticsearch.core.IOUtils;
|
||||||
|
@ -24,6 +23,7 @@ import java.nio.file.Path;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
|
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
@ -114,7 +114,7 @@ public abstract class ESFsBasedRepositoryIntegTestCase extends ESBlobStoreReposi
|
||||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||||
writeBlob(container, "test", new BytesArray(data));
|
writeBlob(container, "test", new BytesArray(data));
|
||||||
assertArrayEquals(readBlobFully(container, "test", data.length), data);
|
assertArrayEquals(readBlobFully(container, "test", data.length), data);
|
||||||
assertTrue(container.blobExists(OperationPurpose.SNAPSHOT, "test"));
|
assertTrue(container.blobExists(randomPurpose(), "test"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue