Use systemd socket directly instead of libsystemd (#111131)

The libsystemd library function sd_notify is just a thin wrapper
around opeing and writing to a unix filesystem socket. This commit
replaces using libsystemd with opening the socket provided by systemd
directly.

relates #86475
This commit is contained in:
Ryan Ernst 2024-08-19 16:31:59 -07:00 committed by GitHub
parent dc24003540
commit 69293e28dc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 248 additions and 16 deletions

View file

@ -16,6 +16,7 @@ import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
import com.sun.jna.Structure;
import org.elasticsearch.nativeaccess.CloseableByteBuffer;
import org.elasticsearch.nativeaccess.lib.PosixCLibrary;
import java.util.Arrays;
@ -109,6 +110,16 @@ class JnaPosixCLibrary implements PosixCLibrary {
}
}
public static class JnaSockAddr implements SockAddr {
final Memory memory;
JnaSockAddr(String path) {
this.memory = new Memory(110);
memory.setShort(0, AF_UNIX);
memory.setString(2, path, "UTF-8");
}
}
private interface NativeFunctions extends Library {
int geteuid();
@ -126,6 +137,12 @@ class JnaPosixCLibrary implements PosixCLibrary {
int close(int fd);
int socket(int domain, int type, int protocol);
int connect(int sockfd, Pointer addr, int addrlen);
long send(int sockfd, Pointer buf, long buflen, int flags);
String strerror(int errno);
}
@ -235,6 +252,30 @@ class JnaPosixCLibrary implements PosixCLibrary {
return fstat64.fstat64(fd, jnaStats.memory);
}
@Override
public int socket(int domain, int type, int protocol) {
return functions.socket(domain, type, protocol);
}
@Override
public SockAddr newUnixSockAddr(String path) {
return new JnaSockAddr(path);
}
@Override
public int connect(int sockfd, SockAddr addr) {
assert addr instanceof JnaSockAddr;
var jnaAddr = (JnaSockAddr) addr;
return functions.connect(sockfd, jnaAddr.memory, (int) jnaAddr.memory.size());
}
@Override
public long send(int sockfd, CloseableByteBuffer buffer, int flags) {
assert buffer instanceof JnaCloseableByteBuffer;
var nativeBuffer = (JnaCloseableByteBuffer) buffer;
return functions.send(sockfd, nativeBuffer.memory, nativeBuffer.buffer().remaining(), flags);
}
@Override
public String strerror(int errno) {
return functions.strerror(errno);

View file

@ -12,7 +12,7 @@ import org.elasticsearch.nativeaccess.lib.LinuxCLibrary;
import org.elasticsearch.nativeaccess.lib.LinuxCLibrary.SockFProg;
import org.elasticsearch.nativeaccess.lib.LinuxCLibrary.SockFilter;
import org.elasticsearch.nativeaccess.lib.NativeLibraryProvider;
import org.elasticsearch.nativeaccess.lib.SystemdLibrary;
import org.elasticsearch.nativeaccess.lib.PosixCLibrary;
import java.util.Map;
@ -92,7 +92,14 @@ class LinuxNativeAccess extends PosixNativeAccess {
LinuxNativeAccess(NativeLibraryProvider libraryProvider) {
super("Linux", libraryProvider, new PosixConstants(-1L, 9, 1, 8, 64, 144, 48, 64));
this.linuxLibc = libraryProvider.getLibrary(LinuxCLibrary.class);
this.systemd = new Systemd(libraryProvider.getLibrary(SystemdLibrary.class));
String socketPath = System.getenv("NOTIFY_SOCKET");
if (socketPath == null) {
this.systemd = null; // not running under systemd
} else {
logger.debug("Systemd socket path: {}", socketPath);
var buffer = newBuffer(64);
this.systemd = new Systemd(libraryProvider.getLibrary(PosixCLibrary.class), socketPath, buffer);
}
}
@Override

View file

@ -10,17 +10,28 @@ package org.elasticsearch.nativeaccess;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.nativeaccess.lib.SystemdLibrary;
import org.elasticsearch.nativeaccess.lib.PosixCLibrary;
import java.util.Locale;
import java.nio.charset.StandardCharsets;
/**
* Wraps access to notifications to systemd.
* <p>
* Systemd notifications are done through a Unix socket. Although Java does support
* opening unix sockets, it unfortunately does not support datagram sockets. This class
* instead opens and communicates with the socket using native methods.
*/
public class Systemd {
private static final Logger logger = LogManager.getLogger(Systemd.class);
private final SystemdLibrary lib;
private final PosixCLibrary libc;
private final String socketPath;
private final CloseableByteBuffer buffer;
Systemd(SystemdLibrary lib) {
this.lib = lib;
Systemd(PosixCLibrary libc, String socketPath, CloseableByteBuffer buffer) {
this.libc = libc;
this.socketPath = socketPath;
this.buffer = buffer;
}
/**
@ -41,15 +52,61 @@ public class Systemd {
}
private void notify(String state, boolean warnOnError) {
int rc = lib.sd_notify(0, state);
logger.trace("sd_notify({}, {}) returned [{}]", 0, state, rc);
if (rc < 0) {
String message = String.format(Locale.ROOT, "sd_notify(%d, %s) returned error [%d]", 0, state, rc);
int sockfd = libc.socket(PosixCLibrary.AF_UNIX, PosixCLibrary.SOCK_DGRAM, 0);
if (sockfd < 0) {
throwOrLog("Could not open systemd socket: " + libc.strerror(libc.errno()), warnOnError);
return;
}
RuntimeException error = null;
try {
var sockAddr = libc.newUnixSockAddr(socketPath);
if (libc.connect(sockfd, sockAddr) != 0) {
throwOrLog("Could not connect to systemd socket: " + libc.strerror(libc.errno()), warnOnError);
return;
}
byte[] bytes = state.getBytes(StandardCharsets.US_ASCII);
final long bytesSent;
synchronized (buffer) {
buffer.buffer().clear();
buffer.buffer().put(0, bytes);
buffer.buffer().limit(bytes.length);
bytesSent = libc.send(sockfd, buffer, 0);
}
if (bytesSent == -1) {
throwOrLog("Failed to send message (" + state + ") to systemd socket: " + libc.strerror(libc.errno()), warnOnError);
} else if (bytesSent != bytes.length) {
throwOrLog("Not all bytes of message (" + state + ") sent to systemd socket (sent " + bytesSent + ")", warnOnError);
} else {
logger.trace("Message (" + state + ") sent to systemd");
}
} catch (RuntimeException e) {
error = e;
} finally {
if (libc.close(sockfd) != 0) {
try {
throwOrLog("Could not close systemd socket: " + libc.strerror(libc.errno()), warnOnError);
} catch (RuntimeException e) {
if (error != null) {
error.addSuppressed(e);
throw error;
} else {
throw e;
}
}
} else if (error != null) {
throw error;
}
}
}
private void throwOrLog(String message, boolean warnOnError) {
if (warnOnError) {
logger.warn(message);
} else {
logger.error(message);
throw new RuntimeException(message);
}
}
}
}

View file

@ -8,11 +8,19 @@
package org.elasticsearch.nativeaccess.lib;
import org.elasticsearch.nativeaccess.CloseableByteBuffer;
/**
* Provides access to methods in libc.so available on POSIX systems.
*/
public non-sealed interface PosixCLibrary extends NativeLibrary {
/** socket domain indicating unix file socket */
short AF_UNIX = 1;
/** socket type indicating a datagram-oriented socket */
int SOCK_DGRAM = 2;
/**
* Gets the effective userid of the current process.
*
@ -68,8 +76,6 @@ public non-sealed interface PosixCLibrary extends NativeLibrary {
int open(String pathname, int flags);
int close(int fd);
int fstat64(int fd, Stat64 stats);
int ftruncate(int fd, long length);
@ -90,6 +96,55 @@ public non-sealed interface PosixCLibrary extends NativeLibrary {
int fcntl(int fd, int cmd, FStore fst);
/**
* Open a file descriptor to connect to a socket.
*
* @param domain The socket protocol family, eg AF_UNIX
* @param type The socket type, eg SOCK_DGRAM
* @param protocol The protocol for the given protocl family, normally 0
* @return an open file descriptor, or -1 on failure with errno set
* @see <a href="https://man7.org/linux/man-pages/man2/socket.2.html">socket manpage</a>
*/
int socket(int domain, int type, int protocol);
/**
* Marker interface for sockaddr struct implementations.
*/
interface SockAddr {}
/**
* Create a sockaddr for the AF_UNIX family.
*/
SockAddr newUnixSockAddr(String path);
/**
* Connect a socket to an address.
*
* @param sockfd An open socket file descriptor
* @param addr The address to connect to
* @return 0 on success, -1 on failure with errno set
*/
int connect(int sockfd, SockAddr addr);
/**
* Send a message to a socket.
*
* @param sockfd The open socket file descriptor
* @param buffer The message bytes to send
* @param flags Flags that may adjust how the message is sent
* @return The number of bytes sent, or -1 on failure with errno set
* @see <a href="https://man7.org/linux/man-pages/man2/sendto.2.html">send manpage</a>
*/
long send(int sockfd, CloseableByteBuffer buffer, int flags);
/**
* Close a file descriptor
* @param fd The file descriptor to close
* @return 0 on success, -1 on failure with errno set
* @see <a href="https://man7.org/linux/man-pages/man2/close.2.html">close manpage</a>
*/
int close(int fd);
/**
* Return a string description for an error.
*

View file

@ -10,6 +10,7 @@ package org.elasticsearch.nativeaccess.jdk;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.nativeaccess.CloseableByteBuffer;
import org.elasticsearch.nativeaccess.lib.PosixCLibrary;
import java.lang.foreign.Arena;
@ -24,8 +25,10 @@ import java.lang.invoke.VarHandle;
import static java.lang.foreign.MemoryLayout.PathElement.groupElement;
import static java.lang.foreign.ValueLayout.ADDRESS;
import static java.lang.foreign.ValueLayout.JAVA_BYTE;
import static java.lang.foreign.ValueLayout.JAVA_INT;
import static java.lang.foreign.ValueLayout.JAVA_LONG;
import static java.lang.foreign.ValueLayout.JAVA_SHORT;
import static org.elasticsearch.nativeaccess.jdk.LinkerHelper.downcallHandle;
import static org.elasticsearch.nativeaccess.jdk.MemorySegmentUtil.varHandleWithoutOffset;
@ -89,6 +92,18 @@ class JdkPosixCLibrary implements PosixCLibrary {
}
fstat$mh = fstat;
}
private static final MethodHandle socket$mh = downcallHandleWithErrno(
"socket",
FunctionDescriptor.of(JAVA_INT, JAVA_INT, JAVA_INT, JAVA_INT)
);
private static final MethodHandle connect$mh = downcallHandleWithErrno(
"connect",
FunctionDescriptor.of(JAVA_INT, JAVA_INT, ADDRESS, JAVA_INT)
);
private static final MethodHandle send$mh = downcallHandleWithErrno(
"send",
FunctionDescriptor.of(JAVA_LONG, JAVA_INT, ADDRESS, JAVA_LONG, JAVA_INT)
);
static final MemorySegment errnoState = Arena.ofAuto().allocate(CAPTURE_ERRNO_LAYOUT);
@ -226,6 +241,44 @@ class JdkPosixCLibrary implements PosixCLibrary {
}
}
@Override
public int socket(int domain, int type, int protocol) {
try {
return (int) socket$mh.invokeExact(errnoState, domain, type, protocol);
} catch (Throwable t) {
throw new AssertionError(t);
}
}
@Override
public SockAddr newUnixSockAddr(String path) {
return new JdkSockAddr(path);
}
@Override
public int connect(int sockfd, SockAddr addr) {
assert addr instanceof JdkSockAddr;
var jdkAddr = (JdkSockAddr) addr;
try {
return (int) connect$mh.invokeExact(errnoState, sockfd, jdkAddr.segment, (int) jdkAddr.segment.byteSize());
} catch (Throwable t) {
throw new AssertionError(t);
}
}
@Override
public long send(int sockfd, CloseableByteBuffer buffer, int flags) {
assert buffer instanceof JdkCloseableByteBuffer;
var nativeBuffer = (JdkCloseableByteBuffer) buffer;
var segment = nativeBuffer.segment;
try {
logger.info("Sending {} bytes to socket", buffer.buffer().remaining());
return (long) send$mh.invokeExact(errnoState, sockfd, segment, (long) buffer.buffer().remaining(), flags);
} catch (Throwable t) {
throw new AssertionError(t);
}
}
static class JdkRLimit implements RLimit {
private static final MemoryLayout layout = MemoryLayout.structLayout(JAVA_LONG, JAVA_LONG);
private static final VarHandle rlim_cur$vh = varHandleWithoutOffset(layout, groupElement(0));
@ -326,4 +379,15 @@ class JdkPosixCLibrary implements PosixCLibrary {
return (long) st_bytesalloc$vh.get(segment);
}
}
private static class JdkSockAddr implements SockAddr {
private static final MemoryLayout layout = MemoryLayout.structLayout(JAVA_SHORT, MemoryLayout.sequenceLayout(108, JAVA_BYTE));
final MemorySegment segment;
JdkSockAddr(String path) {
segment = Arena.ofAuto().allocate(layout);
segment.set(JAVA_SHORT, 0, AF_UNIX);
MemorySegmentUtil.setString(segment, 2, path);
}
}
}

View file

@ -22,6 +22,10 @@ class MemorySegmentUtil {
return segment.getUtf8String(offset);
}
static void setString(MemorySegment segment, long offset, String value) {
segment.setUtf8String(offset, value);
}
static MemorySegment allocateString(Arena arena, String s) {
return arena.allocateUtf8String(s);
}

View file

@ -20,6 +20,10 @@ class MemorySegmentUtil {
return segment.getString(offset);
}
static void setString(MemorySegment segment, long offset, String value) {
segment.setString(offset, value);
}
static MemorySegment allocateString(Arena arena, String s) {
return arena.allocateFrom(s);
}