[CI] Fix for SearchCancellationIT (#102774)

This commit is contained in:
Panagiotis Bailis 2023-12-01 12:18:49 +02:00 committed by GitHub
parent c9f3a6c6c7
commit bc0751d392
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 32 deletions

View file

@ -8,7 +8,6 @@
package org.elasticsearch.search; package org.elasticsearch.search;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.MultiSearchResponse;
@ -50,8 +49,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE) @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/102257")
public class SearchCancellationIT extends AbstractSearchCancellationTestCase { public class SearchCancellationIT extends AbstractSearchCancellationTestCase {
@Override @Override
@ -288,12 +286,11 @@ public class SearchCancellationIT extends AbstractSearchCancellationTestCase {
assertTrue("All SearchShardTasks should then be cancelled", shardQueryTask.isCancelled()); assertTrue("All SearchShardTasks should then be cancelled", shardQueryTask.isCancelled());
} }
}, 30, TimeUnit.SECONDS); }, 30, TimeUnit.SECONDS);
shardTaskLatch.countDown(); // unblock the shardTasks, allowing the test to conclude.
} finally { } finally {
shardTaskLatch.countDown(); // unblock the shardTasks, allowing the test to conclude.
searchThread.join(); searchThread.join();
for (ScriptedBlockPlugin plugin : plugins) { plugins.forEach(plugin -> plugin.setBeforeExecution(() -> {}));
plugin.setBeforeExecution(() -> {}); searchShardBlockingPlugins.forEach(plugin -> plugin.setRunOnNewReaderContext((ReaderContext c) -> {}));
}
} }
} }

View file

@ -35,7 +35,8 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -152,7 +153,7 @@ public class AbstractSearchCancellationTestCase extends ESIntegTestCase {
private final AtomicInteger hits = new AtomicInteger(); private final AtomicInteger hits = new AtomicInteger();
private final AtomicBoolean shouldBlock = new AtomicBoolean(true); private final Semaphore shouldBlock = new Semaphore(Integer.MAX_VALUE);
private final AtomicReference<Runnable> beforeExecution = new AtomicReference<>(); private final AtomicReference<Runnable> beforeExecution = new AtomicReference<>();
@ -161,11 +162,16 @@ public class AbstractSearchCancellationTestCase extends ESIntegTestCase {
} }
public void disableBlock() { public void disableBlock() {
shouldBlock.set(false); shouldBlock.release(Integer.MAX_VALUE);
} }
public void enableBlock() { public void enableBlock() {
shouldBlock.set(true); try {
shouldBlock.acquire(Integer.MAX_VALUE);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError(e);
}
} }
public void setBeforeExecution(Runnable runnable) { public void setBeforeExecution(Runnable runnable) {
@ -196,6 +202,23 @@ public class AbstractSearchCancellationTestCase extends ESIntegTestCase {
); );
} }
public void logIfBlocked(String logMessage) {
if (shouldBlock.tryAcquire(1) == false) {
LogManager.getLogger(AbstractSearchCancellationTestCase.class).info(logMessage);
} else {
shouldBlock.release(1);
}
}
public void waitForLock(int timeout, TimeUnit timeUnit) {
try {
assertTrue(shouldBlock.tryAcquire(timeout, timeUnit));
shouldBlock.release(1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Object searchBlockScript(Map<String, Object> params) { private Object searchBlockScript(Map<String, Object> params) {
final Runnable runnable = beforeExecution.get(); final Runnable runnable = beforeExecution.get();
if (runnable != null) { if (runnable != null) {
@ -204,11 +227,7 @@ public class AbstractSearchCancellationTestCase extends ESIntegTestCase {
LeafStoredFieldsLookup fieldsLookup = (LeafStoredFieldsLookup) params.get("_fields"); LeafStoredFieldsLookup fieldsLookup = (LeafStoredFieldsLookup) params.get("_fields");
LogManager.getLogger(AbstractSearchCancellationTestCase.class).info("Blocking on the document {}", fieldsLookup.get("_id")); LogManager.getLogger(AbstractSearchCancellationTestCase.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
hits.incrementAndGet(); hits.incrementAndGet();
try { waitForLock(10, TimeUnit.SECONDS);
assertBusy(() -> assertFalse(shouldBlock.get()));
} catch (Exception e) {
throw new RuntimeException(e);
}
return true; return true;
} }
@ -226,15 +245,9 @@ public class AbstractSearchCancellationTestCase extends ESIntegTestCase {
if (runnable != null) { if (runnable != null) {
runnable.run(); runnable.run();
} }
if (shouldBlock.get()) { logIfBlocked("Blocking in reduce");
LogManager.getLogger(AbstractSearchCancellationTestCase.class).info("Blocking in reduce");
}
hits.incrementAndGet(); hits.incrementAndGet();
try { waitForLock(10, TimeUnit.SECONDS);
assertBusy(() -> assertFalse(shouldBlock.get()));
} catch (Exception e) {
throw new RuntimeException(e);
}
return 42; return 42;
} }
@ -243,15 +256,9 @@ public class AbstractSearchCancellationTestCase extends ESIntegTestCase {
if (runnable != null) { if (runnable != null) {
runnable.run(); runnable.run();
} }
if (shouldBlock.get()) { logIfBlocked("Blocking in map");
LogManager.getLogger(AbstractSearchCancellationTestCase.class).info("Blocking in map");
}
hits.incrementAndGet(); hits.incrementAndGet();
try { waitForLock(10, TimeUnit.SECONDS);
assertBusy(() -> assertFalse(shouldBlock.get()));
} catch (Exception e) {
throw new RuntimeException(e);
}
return 1; return 1;
} }