Add systemd native access (#106151)

This commit moves systemd access to the NativeAccess lib.

relates #104876
This commit is contained in:
Ryan Ernst 2024-03-12 07:35:02 -07:00 committed by GitHub
parent d8da8fa61a
commit 10dcb8e8bd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 281 additions and 123 deletions

View file

@ -11,3 +11,7 @@ esplugin {
classname 'org.elasticsearch.systemd.SystemdPlugin'
}
dependencies {
implementation project(':libs:elasticsearch-native')
}

View file

@ -12,5 +12,5 @@ module org.elasticsearch.systemd {
requires org.elasticsearch.xcontent;
requires org.apache.logging.log4j;
requires org.apache.lucene.core;
requires com.sun.jna;
requires org.elasticsearch.nativeaccess;
}

View file

@ -1,38 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.systemd;
import com.sun.jna.Native;
import java.security.AccessController;
import java.security.PrivilegedAction;
/**
* Provides access to the native method sd_notify from libsystemd.
*/
class Libsystemd {
static {
AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
Native.register(Libsystemd.class, "libsystemd.so.0");
return null;
});
}
/**
* Notify systemd of state changes.
*
* @param unset_environment if non-zero, the NOTIFY_SOCKET environment variable will be unset before returning and further calls to
* sd_notify will fail
* @param state a new-line separated list of variable assignments; some assignments are understood directly by systemd
* @return a negative error code on failure, and positive if status was successfully sent
*/
static native int sd_notify(int unset_environment, String state);
}

View file

@ -14,6 +14,8 @@ import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Build;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.nativeaccess.NativeAccess;
import org.elasticsearch.nativeaccess.Systemd;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.Scheduler;
@ -26,6 +28,7 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
private static final Logger logger = LogManager.getLogger(SystemdPlugin.class);
private final boolean enabled;
private final Systemd systemd;
final boolean isEnabled() {
return enabled;
@ -44,18 +47,21 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
}
if (isPackageDistribution == false) {
logger.debug("disabling sd_notify as the build type [{}] is not a package distribution", buildType);
enabled = false;
this.enabled = false;
this.systemd = null;
return;
}
logger.trace("ES_SD_NOTIFY is set to [{}]", esSDNotify);
if (esSDNotify == null) {
enabled = false;
this.enabled = false;
this.systemd = null;
return;
}
if (Boolean.TRUE.toString().equals(esSDNotify) == false && Boolean.FALSE.toString().equals(esSDNotify) == false) {
throw new RuntimeException("ES_SD_NOTIFY set to unexpected value [" + esSDNotify + "]");
}
enabled = Boolean.TRUE.toString().equals(esSDNotify);
this.enabled = Boolean.TRUE.toString().equals(esSDNotify);
this.systemd = enabled ? NativeAccess.instance().systemd() : null;
}
private final SetOnce<Scheduler.Cancellable> extender = new SetOnce<>();
@ -77,19 +83,25 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
* Therefore, every fifteen seconds we send systemd a message via sd_notify to extend the timeout by thirty seconds. We will cancel
* this scheduled task after we successfully notify systemd that we are ready.
*/
extender.set(services.threadPool().scheduleWithFixedDelay(() -> {
final int rc = sd_notify(0, "EXTEND_TIMEOUT_USEC=30000000");
if (rc < 0) {
logger.warn("extending startup timeout via sd_notify failed with [{}]", rc);
}
}, TimeValue.timeValueSeconds(15), EsExecutors.DIRECT_EXECUTOR_SERVICE));
extender.set(
services.threadPool()
.scheduleWithFixedDelay(
() -> { systemd.notify_extend_timeout(30); },
TimeValue.timeValueSeconds(15),
EsExecutors.DIRECT_EXECUTOR_SERVICE
)
);
return List.of();
}
int sd_notify(@SuppressWarnings("SameParameterValue") final int unset_environment, final String state) {
final int rc = Libsystemd.sd_notify(unset_environment, state);
logger.trace("sd_notify({}, {}) returned [{}]", unset_environment, state, rc);
return rc;
void notifyReady() {
assert systemd != null;
systemd.notify_ready();
}
void notifyStopping() {
assert systemd != null;
systemd.notify_stopping();
}
@Override
@ -98,11 +110,7 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
assert extender.get() == null;
return;
}
final int rc = sd_notify(0, "READY=1");
if (rc < 0) {
// treat failure to notify systemd of readiness as a startup failure
throw new RuntimeException("sd_notify returned error [" + rc + "]");
}
notifyReady();
assert extender.get() != null;
final boolean cancelled = extender.get().cancel();
assert cancelled;
@ -113,11 +121,7 @@ public class SystemdPlugin extends Plugin implements ClusterPlugin {
if (enabled == false) {
return;
}
final int rc = sd_notify(0, "STOPPING=1");
if (rc < 0) {
// do not treat failure to notify systemd of stopping as a failure
logger.warn("sd_notify returned error [{}]", rc);
}
notifyStopping();
}
}

View file

@ -21,16 +21,14 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresentWith;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
@ -104,83 +102,68 @@ public class SystemdPluginTests extends ESTestCase {
}
public void testOnNodeStartedSuccess() {
runTestOnNodeStarted(Boolean.TRUE.toString(), randomIntBetween(0, Integer.MAX_VALUE), (maybe, plugin) -> {
runTestOnNodeStarted(Boolean.TRUE.toString(), false, (maybe, plugin) -> {
assertThat(maybe, OptionalMatchers.isEmpty());
assertThat(plugin.invokedReady.get(), is(true));
verify(plugin.extender()).cancel();
});
}
public void testOnNodeStartedFailure() {
final int rc = randomIntBetween(Integer.MIN_VALUE, -1);
runTestOnNodeStarted(
Boolean.TRUE.toString(),
rc,
(maybe, plugin) -> assertThat(
maybe,
isPresentWith(
allOf(instanceOf(RuntimeException.class), hasToString(containsString("sd_notify returned error [" + rc + "]")))
)
)
);
runTestOnNodeStarted(Boolean.TRUE.toString(), true, (maybe, plugin) -> {
assertThat(maybe, isPresentWith(allOf(instanceOf(RuntimeException.class), hasToString(containsString("notify ready failed")))));
assertThat(plugin.invokedReady.get(), is(true));
});
}
public void testOnNodeStartedNotEnabled() {
runTestOnNodeStarted(Boolean.FALSE.toString(), randomInt(), (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
runTestOnNodeStarted(Boolean.FALSE.toString(), randomBoolean(), (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
}
private void runTestOnNodeStarted(
final String esSDNotify,
final int rc,
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions
final boolean invokeFailure,
final BiConsumer<Optional<Exception>, TestSystemdPlugin> assertions
) {
runTest(esSDNotify, rc, assertions, SystemdPlugin::onNodeStarted, "READY=1");
runTest(esSDNotify, invokeFailure, assertions, SystemdPlugin::onNodeStarted);
}
public void testCloseSuccess() {
runTestClose(
Boolean.TRUE.toString(),
randomIntBetween(1, Integer.MAX_VALUE),
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty())
);
runTestClose(Boolean.TRUE.toString(), false, (maybe, plugin) -> {
assertThat(maybe, OptionalMatchers.isEmpty());
assertThat(plugin.invokedStopping.get(), is(true));
});
}
public void testCloseFailure() {
runTestClose(
Boolean.TRUE.toString(),
randomIntBetween(Integer.MIN_VALUE, -1),
(maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty())
);
runTestClose(Boolean.TRUE.toString(), true, (maybe, plugin) -> {
assertThat(maybe, OptionalMatchers.isEmpty());
assertThat(plugin.invokedStopping.get(), is(true));
});
}
public void testCloseNotEnabled() {
runTestClose(Boolean.FALSE.toString(), randomInt(), (maybe, plugin) -> assertThat(maybe, OptionalMatchers.isEmpty()));
runTestClose(Boolean.FALSE.toString(), randomBoolean(), (maybe, plugin) -> {
assertThat(maybe, OptionalMatchers.isEmpty());
assertThat(plugin.invokedStopping.get(), is(false));
});
}
private void runTestClose(final String esSDNotify, final int rc, final BiConsumer<Optional<Exception>, SystemdPlugin> assertions) {
runTest(esSDNotify, rc, assertions, SystemdPlugin::close, "STOPPING=1");
private void runTestClose(
final String esSDNotify,
boolean invokeFailure,
final BiConsumer<Optional<Exception>, TestSystemdPlugin> assertions
) {
runTest(esSDNotify, invokeFailure, assertions, SystemdPlugin::close);
}
private void runTest(
final String esSDNotify,
final int rc,
final BiConsumer<Optional<Exception>, SystemdPlugin> assertions,
final CheckedConsumer<SystemdPlugin, IOException> invocation,
final String expectedState
final boolean invokeReadyFailure,
final BiConsumer<Optional<Exception>, TestSystemdPlugin> assertions,
final CheckedConsumer<SystemdPlugin, IOException> invocation
) {
final AtomicBoolean invoked = new AtomicBoolean();
final AtomicInteger invokedUnsetEnvironment = new AtomicInteger();
final AtomicReference<String> invokedState = new AtomicReference<>();
final SystemdPlugin plugin = new SystemdPlugin(false, randomPackageBuildType, esSDNotify) {
@Override
int sd_notify(final int unset_environment, final String state) {
invoked.set(true);
invokedUnsetEnvironment.set(unset_environment);
invokedState.set(state);
return rc;
}
};
final TestSystemdPlugin plugin = new TestSystemdPlugin(esSDNotify, invokeReadyFailure);
startPlugin(plugin);
if (Boolean.TRUE.toString().equals(esSDNotify)) {
assertNotNull(plugin.extender());
@ -198,13 +181,29 @@ public class SystemdPluginTests extends ESTestCase {
if (success) {
assertions.accept(Optional.empty(), plugin);
}
if (Boolean.TRUE.toString().equals(esSDNotify)) {
assertTrue(invoked.get());
assertThat(invokedUnsetEnvironment.get(), equalTo(0));
assertThat(invokedState.get(), equalTo(expectedState));
} else {
assertFalse(invoked.get());
}
}
class TestSystemdPlugin extends SystemdPlugin {
final AtomicBoolean invokedReady = new AtomicBoolean();
final AtomicBoolean invokedStopping = new AtomicBoolean();
final boolean invokeReadyFailure;
TestSystemdPlugin(String esSDNotify, boolean invokeFailure) {
super(false, randomPackageBuildType, esSDNotify);
this.invokeReadyFailure = invokeFailure;
}
@Override
void notifyReady() {
invokedReady.set(true);
if (invokeReadyFailure) {
throw new RuntimeException("notify ready failed");
}
}
@Override
void notifyStopping() {
invokedStopping.set(true);
}
}
}