Refactor GCS fixture multipart parser (#125828)

This commit is contained in:
Mikhail Berezovskiy 2025-04-15 10:09:53 -07:00 committed by GitHub
parent 299bf443bb
commit 5a7a425bd0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 347 additions and 129 deletions

View file

@ -10,6 +10,7 @@ package org.elasticsearch.repositories.gcs;
import fixture.gcs.FakeOAuth2HttpHandler;
import fixture.gcs.GoogleCloudStorageHttpHandler;
import fixture.gcs.MultipartUpload;
import com.google.api.client.http.HttpExecuteInterceptor;
import com.google.api.client.http.HttpRequestInitializer;
@ -43,7 +44,6 @@ import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.http.ResponseInjectingHttpHandler;
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
@ -62,7 +62,6 @@ import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -70,7 +69,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody;
import static fixture.gcs.TestUtils.createServiceAccount;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
@ -80,7 +78,6 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSetting
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -268,17 +265,16 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
final CountDown countDown = new CountDown(maxRetries);
final BlobContainer blobContainer = blobContainerBuilder().maxRetries(maxRetries).build();
final byte[] bytes = randomBlobContent();
final byte[] bytes = randomBlobContent(0);
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
if (countDown.countDown()) {
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(exchange.getRequestBody());
assertThat(content, isPresent());
assertThat(content.get().v1(), equalTo(blobContainer.path().buildAsString() + "write_blob_max_retries"));
if (Objects.deepEquals(bytes, BytesReference.toBytes(content.get().v2()))) {
MultipartUpload multipartUpload = MultipartUpload.parseBody(exchange, exchange.getRequestBody());
assertEquals(multipartUpload.name(), blobContainer.path().buildAsString() + "write_blob_max_retries");
if (multipartUpload.content().equals(new BytesArray(bytes))) {
byte[] response = Strings.format("""
{"bucket":"bucket","name":"%s"}
""", content.get().v1()).getBytes(UTF_8);
""", multipartUpload.name()).getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);

View file

@ -18,7 +18,6 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.test.fixture.HttpHeaderParser;
@ -27,26 +26,17 @@ import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import static fixture.gcs.MockGcsBlobStore.failAndThrow;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.joining;
import static org.elasticsearch.core.Strings.format;
/**
* Minimal HTTP handler that acts as a Google Cloud Storage compliant server
@ -183,26 +173,18 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) {
// Multipart upload
Optional<Tuple<String, BytesReference>> content = parseMultipartRequestBody(requestBody.streamInput());
if (content.isPresent()) {
try {
final var multipartUpload = MultipartUpload.parseBody(exchange, requestBody.streamInput());
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
final MockGcsBlobStore.BlobVersion newBlobVersion = mockGcsBlobStore.updateBlob(
content.get().v1(),
multipartUpload.name(),
ifGenerationMatch,
content.get().v2()
multipartUpload.content()
);
writeBlobVersionAsJson(exchange, newBlobVersion);
} else {
throw new AssertionError(
"Could not read multi-part request to ["
+ request
+ "] with headers ["
+ new HashMap<>(exchange.getRequestHeaders())
+ "]"
);
} catch (IllegalArgumentException e) {
throw new AssertionError(e);
}
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=resumable*", request)) {
// Resumable upload initialization https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
final Map<String, String> params = new HashMap<>();
@ -328,81 +310,6 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
return "http://" + exchange.getRequestHeaders().get("HOST").get(0);
}
private static final Pattern NAME_PATTERN = Pattern.compile("\"name\":\"([^\"]*)\"");
public static Optional<Tuple<String, BytesReference>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
Tuple<String, BytesReference> content = null;
final BytesReference fullRequestBody;
try (InputStream in = new GZIPInputStream(requestBody)) {
fullRequestBody = Streams.readFully(in);
}
String name = null;
boolean skippedEmptyLine = false;
int startPos = 0;
int endPos = 0;
while (startPos < fullRequestBody.length()) {
do {
endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1);
} while (endPos >= 0 && fullRequestBody.get(endPos + 1) != '\n');
boolean markAndContinue = false;
final String bucketPrefix = "{\"bucket\":";
if (startPos > 0) {
startPos += 2;
}
if (name == null || skippedEmptyLine == false) {
if ((skippedEmptyLine == false && endPos == startPos)
|| (fullRequestBody.get(startPos) == '-' && fullRequestBody.get(startPos + 1) == '-')) {
markAndContinue = true;
} else {
final String start = fullRequestBody.slice(startPos, Math.min(endPos - startPos, bucketPrefix.length())).utf8ToString();
if (start.toLowerCase(Locale.ROOT).startsWith("content")) {
markAndContinue = true;
} else if (start.startsWith(bucketPrefix)) {
markAndContinue = true;
final String line = fullRequestBody.slice(
startPos + bucketPrefix.length(),
endPos - startPos - bucketPrefix.length()
).utf8ToString();
Matcher matcher = NAME_PATTERN.matcher(line);
if (matcher.find()) {
name = matcher.group(1);
}
}
}
skippedEmptyLine = markAndContinue && endPos == startPos;
startPos = endPos;
} else {
while (isEndOfPart(fullRequestBody, endPos) == false) {
endPos = fullRequestBody.indexOf((byte) '\r', endPos + 1);
}
content = Tuple.tuple(name, fullRequestBody.slice(startPos, endPos - startPos));
break;
}
}
if (content == null) {
final InputStream stream = fullRequestBody.streamInput();
logger.warn(
() -> format(
"Failed to find multi-part upload in [%s]",
new BufferedReader(new InputStreamReader(stream)).lines().collect(joining("\n"))
)
);
}
return Optional.ofNullable(content);
}
private static final byte[] END_OF_PARTS_MARKER = "\r\n--__END_OF_PART__".getBytes(UTF_8);
private static boolean isEndOfPart(BytesReference fullRequestBody, int endPos) {
for (int i = 0; i < END_OF_PARTS_MARKER.length; i++) {
final byte b = END_OF_PARTS_MARKER[i];
if (fullRequestBody.get(endPos + i) != b) {
return false;
}
}
return true;
}
private static String requireHeader(HttpExchange exchange, String headerName) {
final String headerValue = exchange.getRequestHeaders().getFirst(headerName);
if (headerValue != null) {

View file

@ -0,0 +1,200 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package fixture.gcs;
import com.sun.net.httpserver.HttpExchange;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
public record MultipartUpload(String bucket, String name, String generation, String crc32, String md5, BytesReference content) {
static final byte[] BODY_PART_HEADERS_DELIMITER = new byte[] { '\r', '\n', '\r', '\n' };
static final Pattern METADATA_PATTERN = Pattern.compile("\"(bucket|name|generation|crc32c|md5Hash)\":\"([^\"]*)\"");
static final Pattern BOUNDARY_HEADER_PATTERN = Pattern.compile("multipart/\\w+; boundary=\\\"?(.*)\\\"?");
/**
* Reads HTTP content of MultipartUpload. First part is always json metadata, followed by binary parts.
* Every part has own headers and content. Parts are separated by dash-boundary(--boundary) delimiter,
* and boundary is defined in the HTTP header Content-Type,
* like this {@code multipart/related; boundary=__END_OF_PART__4914cd49-4065-44f6-9846-ce805fe1e77f__}.
* Last part, close-delimiter, is dashed from both sides {@code --boundary--}.
* Part headers are separated from the content by double CRLF.
* More details here <a href=https://www.rfc-editor.org/rfc/rfc2046.html#page-19>rfc2046</a>.
*
* <pre>
* {@code
* --boundary CRLF
* headers CRLF
* CRLF
* content CRLF
* --boundary CRLF
* headers CRLF
* CRLF
* content CRLF
* --boundary--
* }
* </pre>
*/
public static MultipartUpload parseBody(HttpExchange exchange, InputStream gzipInput) throws IOException {
var m = BOUNDARY_HEADER_PATTERN.matcher(exchange.getRequestHeaders().getFirst("Content-Type"));
if (m.matches() == false) {
throw new IllegalStateException("boundary header is not present");
}
var boundary = m.group(1);
try (var input = new GZIPInputStream(gzipInput)) {
return parseBody(boundary, input);
}
}
// for tests
static MultipartUpload parseBody(String boundary, InputStream input) throws IOException {
var reader = new MultipartContentReader(boundary, input);
// read first body-part - blob metadata json
var metadataBytes = reader.next();
var match = METADATA_PATTERN.matcher(metadataBytes.utf8ToString());
String bucket = "", name = "", gen = "", crc = "", md5 = "";
while (match.find()) {
switch (match.group(1)) {
case "bucket" -> bucket = match.group(2);
case "name" -> name = match.group(2);
case "generation" -> gen = match.group(2);
case "crc32c" -> crc = match.group(2);
case "md5Hash" -> md5 = match.group(2);
}
}
// read and combine remaining parts
var blobParts = new ArrayList<BytesReference>();
while (reader.hasNext()) {
blobParts.add(reader.next());
}
var compositeBuf = CompositeBytesReference.of(blobParts.toArray(new BytesReference[0]));
return new MultipartUpload(bucket, name, gen, crc, md5, compositeBuf);
}
/**
* Must call after reading body-part-delimiter to see if there are more parts.
* If there are no parts, a closing double dash is expected, otherwise CRLF.
*/
static boolean readCloseDelimiterOrCRLF(InputStream is) throws IOException {
var d1 = is.read();
var d2 = is.read();
if (d1 == '-' && d2 == '-') {
return true;
} else if (d1 == '\r' && d2 == '\n') {
return false;
} else {
throw new IllegalStateException("expect '--' or CRLF, got " + d1 + " " + d2);
}
}
/**
* Read bytes from stream into buffer until reach given delimiter. The delimiter is consumed too.
*/
static BytesReference readUntilDelimiter(InputStream is, byte[] delimiter) throws IOException {
assert delimiter.length > 0;
var out = new ByteArrayOutputStream(1024);
var delimiterMatchLen = 0;
while (true) {
var c = is.read();
if (c == -1) {
throw new IllegalStateException("expected delimiter, but reached end of stream ");
}
var b = (byte) c;
out.write(b);
if (delimiter[delimiterMatchLen] == b) {
delimiterMatchLen++;
if (delimiterMatchLen >= delimiter.length) {
var bytes = out.toByteArray();
return new BytesArray(bytes, 0, bytes.length - delimiter.length);
}
} else {
if (delimiter[0] == b) {
delimiterMatchLen = 1;
} else {
delimiterMatchLen = 0;
}
}
}
}
/**
* Discard bytes from stream until reach given delimiter. The delimiter is consumed too.
*/
static void skipUntilDelimiter(InputStream is, byte[] delimiter) throws IOException {
assert delimiter.length > 0;
var delimiterMatchLen = 0;
while (true) {
var c = is.read();
if (c == -1) {
throw new IllegalStateException("expected delimiter, but reached end of stream ");
}
var b = (byte) c;
if (delimiter[delimiterMatchLen] == b) {
delimiterMatchLen++;
if (delimiterMatchLen >= delimiter.length) {
return;
}
} else {
if (delimiter[0] == b) {
delimiterMatchLen = 1;
} else {
delimiterMatchLen = 0;
}
}
}
}
/**
* Multipart content iterator.
*/
static class MultipartContentReader implements Iterator<BytesReference> {
private final InputStream input;
private final byte[] bodyPartDelimiter;
private boolean done;
MultipartContentReader(String boundary, InputStream input) throws IOException {
this.input = input;
this.bodyPartDelimiter = ("\r\n--" + boundary).getBytes();
byte[] dashBoundary = ("--" + boundary).getBytes();
skipUntilDelimiter(input, dashBoundary);
readCloseDelimiterOrCRLF(input);
}
@Override
public boolean hasNext() {
return done == false;
}
@Override
public BytesReference next() {
try {
skipUntilDelimiter(input, BODY_PART_HEADERS_DELIMITER);
BytesReference buf = readUntilDelimiter(input, bodyPartDelimiter);
done = readCloseDelimiterOrCRLF(input);
return buf;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View file

@ -102,9 +102,10 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
assertEquals(new TestHttpResponse(RestStatus.OK, """
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "some/other/path", null));
var boundary = newMultipartBoundary();
assertEquals(
new TestHttpResponse(RestStatus.OK, """
--__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__
--$boundary
Content-Length: 168
Content-Type: application/http
content-id: 1
@ -115,13 +116,13 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
--__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__
""".replaceAll("\n", "\r\n")),
--$boundary--
""".replace("\n", "\r\n").replace("$boundary", boundary)),
handleRequest(
handler,
"POST",
"/batch/storage/v1",
createBatchDeleteRequest(bucket, blobName),
createBatchDeleteRequest(bucket, boundary, blobName),
Headers.of("Content-Type", "mixed/multipart")
)
);
@ -131,7 +132,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
handler,
"POST",
"/batch/storage/v1",
createBatchDeleteRequest(bucket, blobName),
createBatchDeleteRequest(bucket, boundary, blobName),
Headers.of("Content-Type", "mixed/multipart")
).restStatus()
);
@ -615,11 +616,16 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
BytesReference bytes,
Long ifGenerationMatch
) {
var headers = new Headers();
// multipart upload is required to provide boundary header
var boundary = newMultipartBoundary();
headers.put("Content-Type", List.of("multipart/related; boundary=" + boundary));
return handleRequest(
handler,
"POST",
"/upload/storage/v1/b/" + bucket + "/" + generateQueryString("uploadType", "multipart", "ifGenerationMatch", ifGenerationMatch),
createGzipCompressedMultipartUploadBody(bucket, blobName, bytes)
createGzipCompressedMultipartUploadBody(bucket, blobName, bytes, boundary),
headers
);
}
@ -785,25 +791,37 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
return Headers.of("Range", Strings.format("bytes=%d-%d", start, end));
}
private static BytesReference createGzipCompressedMultipartUploadBody(String bucketName, String path, BytesReference content) {
private static String newMultipartBoundary() {
return "__END_OF_PART__" + randomUUID();
}
private static BytesReference createGzipCompressedMultipartUploadBody(
String bucketName,
String path,
BytesReference content,
String boundary
) {
final String metadataString = Strings.format("{\"bucket\":\"%s\", \"name\":\"%s\"}", bucketName, path);
final BytesReference header = new BytesArray(Strings.format("""
--__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__
Content-Length: %d
final String headerStr = """
--$boundary
Content-Length: $metadata-length
Content-Type: application/json; charset=UTF-8
content-transfer-encoding: binary
%s
--__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__
$metadata
--$boundary
Content-Type: application/octet-stream
content-transfer-encoding: binary
""".replaceAll("\n", "\r\n"), metadataString.length(), metadataString).getBytes(StandardCharsets.UTF_8));
""".replace("\n", "\r\n")
.replace("$boundary", boundary)
.replace("$metadata-length", Integer.toString(metadataString.length()))
.replace("$metadata", metadataString);
final BytesReference header = new BytesArray(headerStr.getBytes(StandardCharsets.UTF_8));
final BytesReference footer = new BytesArray("""
--__END_OF_PART__a607a67c-6df7-4b87-b8a1-81f639a75a97__--
""".replaceAll("\n", "\r\n"));
--$boundary--
""".replace("\n", "\r\n").replace("$boundary", boundary));
final ByteArrayOutputStream out = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(out)) {
gzipOutputStream.write(BytesReference.toBytes(CompositeBytesReference.of(header, content, footer)));
@ -813,7 +831,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
return new BytesArray(out.toByteArray());
}
private static String createBatchDeleteRequest(String bucketName, String... paths) {
private static String createBatchDeleteRequest(String bucketName, String boundary, String... paths) {
final String deleteRequestTemplate = """
DELETE %s/storage/v1/b/%s/o/%s HTTP/1.1
Authorization: Bearer foo
@ -822,14 +840,14 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
""";
final String partTemplate = """
--__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__
--$boundary
Content-Length: %d
Content-Type: application/http
content-id: %d
content-transfer-encoding: binary
%s
""";
""".replace("$boundary", boundary);
StringBuilder builder = new StringBuilder();
AtomicInteger contentId = new AtomicInteger();
Arrays.stream(paths).forEach(p -> {
@ -837,7 +855,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
final String part = Strings.format(partTemplate, deleteRequest.length(), contentId.incrementAndGet(), deleteRequest);
builder.append(part);
});
builder.append("--__END_OF_PART__d8b50acb-87dc-4630-a3d3-17d187132ebc__");
builder.append("--").append(boundary).append("--");
return builder.toString();
}

View file

@ -0,0 +1,97 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package fixture.gcs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Arrays;
import static java.nio.charset.StandardCharsets.UTF_8;
public class MultipartUploadTests extends ESTestCase {
// produces content that does not contain boundary
static String randomPartContent(int len, String boundary) {
assert len > 0 && boundary.isEmpty() == false;
var content = randomAlphanumericOfLength(len);
var replacement = boundary.getBytes(UTF_8);
replacement[0]++; // change single char to make it different from original
return content.replace(boundary, Arrays.toString(replacement));
}
public void testGenericMultipart() throws IOException {
var boundary = randomAlphanumericOfLength(between(1, 70));
var part1 = "plain text\nwith line break";
var part2 = "";
var part3 = randomPartContent(between(1, 1024), boundary);
var strInput = """
--$boundary\r
\r
\r
$part1\r
--$boundary\r
X-Header: x-man\r
\r
$part2\r
--$boundary\r
Content-Type: application/octet-stream\r
\r
$part3\r
--$boundary--""".replace("$boundary", boundary).replace("$part1", part1).replace("$part2", part2).replace("$part3", part3);
var reader = new MultipartUpload.MultipartContentReader(boundary, new ByteArrayStreamInput(strInput.getBytes()));
assertEquals(part1, reader.next().utf8ToString());
assertEquals(part2, reader.next().utf8ToString());
assertEquals(part3, reader.next().utf8ToString());
assertFalse(reader.hasNext());
}
public void testReadUntilDelimiter() throws IOException {
for (int run = 0; run < 100; run++) {
var delimitedContent = DelimitedContent.randomContent();
var inputStream = delimitedContent.toBytesReference().streamInput();
var readBytes = MultipartUpload.readUntilDelimiter(inputStream, delimitedContent.delimiter);
assertEquals(new BytesArray(delimitedContent.before), readBytes);
var readRemaining = inputStream.readAllBytes();
assertArrayEquals(delimitedContent.after, readRemaining);
}
}
public void testSkipUntilDelimiter() throws IOException {
for (int run = 0; run < 100; run++) {
var delimitedContent = DelimitedContent.randomContent();
var inputStream = delimitedContent.toBytesReference().streamInput();
MultipartUpload.skipUntilDelimiter(inputStream, delimitedContent.delimiter);
var readRemaining = inputStream.readAllBytes();
assertArrayEquals(delimitedContent.after, readRemaining);
}
}
record DelimitedContent(byte[] before, byte[] delimiter, byte[] after) {
static DelimitedContent randomContent() {
var before = randomAlphanumericOfLength(between(0, 1024 * 1024)).getBytes(UTF_8);
var delimiter = randomByteArrayOfLength(between(1, 70));
delimiter[0] = '\r'; // make it distinguishable from the initial bytes
var after = randomAlphanumericOfLength(between(0, 1024 * 1024)).getBytes(UTF_8);
return new DelimitedContent(before, delimiter, after);
}
BytesReference toBytesReference() {
return CompositeBytesReference.of(new BytesArray(before), new BytesArray(delimiter), new BytesArray(after));
}
}
}